Open3

Piscinaを使ったNode.jsのお手軽Worker threads実装

yamadashyyamadashy

Piscina(ピシーナ)は、Node.jsのWorker Threadsを簡単に使えるようにしたライブラリ。

https://github.com/piscinajs/piscina

Node.jsでWorker Threadsを使う場合、スレッドの管理やメッセージングが複雑になりがち。
Piscinaを使うと、そういった面倒な部分を抽象化して、シンプルなAPIでマルチスレッド処理を実装できる。
特に、タスクを並列で実行したい場合や、スレッドプールを簡単に管理したい場合に便利。

yamadashyyamadashy

Worker threadsとPiscinaの比較

タスクの定義

2つの数値を受け取って足し算を行う単純なタスクを考えてみる。

task.ts
export default ({ a, b }: { a: number; b: number }): number => {
  return a + b;
};

Worker Threadsを直接使う場合

Worker Threadsを直接使うと、以下のようなコードになる。
ここで気になるのは、メインスレッドとワーカースレッド間の通信がpostMessageon('message')で行われること。
イベント駆動なので処理の流れがわかりづらい。

main-worker.ts
import { Worker, isMainThread, parentPort } from 'worker_threads';

if (isMainThread) {
  // メインスレッド
  const worker = new Worker(__filename);

  // 結果の受け取り
  worker.on('message', (result: number) => {
    console.log(`Worker Threads Result: ${result}`); // 3
  });

  // タスクを送信
  worker.postMessage({ a: 1, b: 2 });
} else {
  // ワーカースレッド
  import('./task').then((task) => {
    parentPort!.on('message', (message: { a: number; b: number }) => {
      const result = task.default(message);
      parentPort!.postMessage(result);
    });
  });
}

Piscinaを使う場合

同じタスクをPiscinaで書くとかなりシンプルになる。
イベント駆動型のメッセージング処理を抽象化しており、Promise APIで簡潔に記述できる。

main-piscina.ts
import Piscina from 'piscina';
import path from 'path';

// Piscinaインスタンスを作成
const piscina = new Piscina({
  filename: path.resolve(__dirname, 'task.ts'), // タスクファイルを指定
});

// タスクを実行
(async () => {
  const result = await piscina.run({ a: 1, b: 2 });
  console.log(`Piscina Result: ${result}`); // 3
})();
yamadashyyamadashy

スレッドプールの自動管理

Piscinaはスレッドプールの管理も自動でやってくれる。
例えば、以下のコードでは、piscina.run()を複数回呼び出して、それぞれのタスクを並列で実行している。
Piscinaが自動でスレッドプールを管理してくれるので、スレッドの作成や終了を気にする必要がない。

main-piscina.ts
import Piscina from 'piscina';
import path from 'path';

// Piscinaインスタンスを作成
const piscina = new Piscina({
  filename: path.resolve(__dirname, 'task.ts'), // タスクファイルを指定
});

// 複数のタスクを並列で実行
(async () => {
  const tasks = [
    { a: 1, b: 2 },
    { a: 3, b: 4 },
    { a: 5, b: 6 },
  ];

  const results = await Promise.all(
    tasks.map((task) => piscina.run(task))
  );

  console.log(`Piscina Results: ${results}`); // [3, 7, 11]
})();

実際にPiscinaを使っている例

https://github.com/yamadashy/repomix/blob/main/src/core/file/fileCollect.ts

スレッド数の制御も簡単

const piscina = new Piscina({
  filename: path.resolve(__dirname, 'task.ts'), // タスクファイルを指定
  maxThreads: 4, // 最大4つのスレッドを使用
});;