Closed49

ハンズオンNode.jsを進める【第4章 マルチプロセス、マルチスレッド】

ハガユウキハガユウキ

マルチコアのシステムを用いる場合、Node.jsのようなシングルスレッドのアプリケーションをそのまま動かすだけでは、マシンリソースを有効に活用できない。
Node.jsではマルチコアシステム上でアプリケーションを効率的に動かすための仕組みとして、clusterモジュールによるマルチプロセス化の機能が提供されている。
また、Node.js v10以降では、worker_threadsモジュールがコアのAPIとして新たに提供されるようになり、マルチスレッドプログラミングも可能になった

ハガユウキハガユウキ

プロセスとスレッド

  • コンピュータは同時に複数のプロセスを起動でき、それぞれのプロセスの内部では1つ異常のスレッドによって処理が行われている。
  • プロセス同士は互いに独立しているが、同一プロセス内の複数のスレッドはメモリ空間などのリソースを共有している。通信のためのチャンネルを介した「プロセス間通信(Inter Process Communication, IPC)」と比べ、多くのリソースを共有するスレッド同士の通信は比較的容易で効率的である。
  • マルチコアシステムにおいては、複数のプロセスも、同一プロセス内の複数のスレッドも、別々のコアを使って並列に実行できる。しかし、シングルスレッドなNode.jsのプロセスを1つだけ起動した場合、それはマルチコアシステムにおいても1コア上でしか実行されない。 Node.jsのプログラムはシングルスレッドでもイベントループにより並行に動作するが、1コアしか使わなければ並列に動作することはない。
ハガユウキハガユウキ

メモリとメモリ空間の違い

メモリ空間はプログラムやデータが配置される抽象的な領域を指し、メモリはその実際の物理デバイスを指す

ハガユウキハガユウキ

1つのCPUコアは同時に1つの処理しか実際に実行されている状態に保てない。
そのため、並列処理を実現するにはマルチコアが必要である。
並行処理ならシングルコアでも可能である。
https://wa3.i-3-i.info/word18078.html

ハガユウキハガユウキ

コンビニでいう、1人の店員が客の弁当を温めている間に別の客の対応を行うのが並行で、2人の店員がそれぞれの客の対応を同時に行うのが並列である。
(並行と並列は互いに排他的ではない(被る部分がある)。コンビニ店員が2人の時でも1人の店員が弁当を温めている間に次の客を対応するでしょう)

ハガユウキハガユウキ

スレッドとイベントループはこの並列、並行という観点からも異なる。
複数のスレッドは別々のコア上で並列に実行できるため、マルチスレッドは、シングルコアシステムでは並行に動作するが、マルチコアシステムでは並行かつ並列に動作する。
(マルチスレッドはコア数に応じて柔軟に並列にしたり並行にしたりできる)

一方イベントループはシングルスレッドであることから、シングルコアシステムでもマルチコアシステムでも単一のCPUコア上で動作し、並列には動作しない。

マルチコアシステムでも1つのコアしか使えないとあっては、アプリケーションのスケーラビリティに支障をきたす。このため、Node.jsでWebアプリケーションなどパフォーマンスが要求されるアプリケーションを開発する場合、マルチコアシステム上ではこれをマルチプロセスで動かすことが基本となる。

ハガユウキハガユウキ

このマックのcpuコアの数を調べてみたら、クアッドコアだった
つまり、1つのcpuに対して4つのコアがある
(nodeで確認した時はなぜか8個だった)

ハガユウキハガユウキ

clursterモジュールによるマルチプロセス化

Node.jsアプリケーションをマルチプロセスで動かす場合、clusterモジュールを使います。

ハガユウキハガユウキ

Webアプリケーションのマルチプロセス化

マルチプロセス化対象のWebアプリケーションを実装する。

ハガユウキハガユウキ

平均レイテンシは、処理にかかる時間
loadtestのコンカレンシーは同時に接続する台数みたいな感じ。

ハガユウキハガユウキ

現状の実装だと、1リクエストで2.8秒ほど時間がかかっている
このアプリケーションをマルチプロセスで動かしてみる。

ハガユウキハガユウキ

マルチプロセスでHTTPリクエストを捌く場合、
親プロセスをマスタープロセス、子プロセスをワーカープロセスとして捉える
マスタープロセスの目的は、コア数に応じたワーカープロセスを作成すること。
ワーカープロセスがどのモジュールをを実行するかを指定しなかった場合、ワーカープロセスがフォークされたモジュールを先頭から再実行する

別々のワーカ間に分散されたリクエストを処理するロードバランサが cluster モジュール内に組み込まれているそう。

https://postd.cc/setting-up-a-node-js-cluster/

ハガユウキハガユウキ

clusterモジュールでは、マスタープロセスがどのワーカープロセスにリクエストを処理させるかを、ロードバランシングしているそう。
ソケットはプロセス間で共有できる。
クライアントからのリクエストをマスタープロセスがアクセプトしたら、接続済みソケットをマスタからワーカに受け渡す。そうすることで,接続した後のクライアントとの送受信はワーカに任せることが実現できる。
http://blog.nodejs.jp/2011/11/cluster.html
https://kakts-tec.hatenablog.com/entry/2017/01/11/023735

https://e-words.jp/w/ロードバランシング.html

ハガユウキハガユウキ

プロセスのフォーク

forkとは、プロセスが自身の複製を作成して新たなプロセスとして起動すること。 元のプロセスをparent process、新たに生成されたプロセスをchild processと呼ぶ
それぞれ別のPIC(Process ID)が割り当てられ区別される
https://e-words.jp/w/フォーク.html

ハガユウキハガユウキ

通常すでに使用中のポート(たとえば3000とか)は他のところから使えない。しかし、cluster.fork()によって、フォークされたサブプロセス同士はポートを共有できる

ハガユウキハガユウキ

マルチプロセスにしたら、
平均レイテンシが0.9秒になった。すごい

↓ マルチプロセス前

Completed requests:  1084
Total errors:        10
Total time:          10.006 s
Mean latency:        2820.5 ms
Effective rps:       108

↓マルチプロセス後

Completed requests:  3972
Total errors:        0
Total time:          10.01 s
Mean latency:        944.9 ms
Effective rps:       397

処理したリクエストの数も増えとるし。シングルプロセスで動かす場合よりパフォーマンスが良くなっている。

↓サーバー側の実行結果
3000~はプロセスid。

node multi-process.js
メインプロセス 3123
/Users/yuuki_haga/repos/node/node-practice/chapter_4/src/multi-process
{ cpuCount: 8 }
サブプロセス 3137
サブプロセス 3138
サブプロセス 3139
サブプロセス 3140
サブプロセス 3141
サブプロセス 3142
サブプロセス 3143
サブプロセス 3144
ハガユウキハガユウキ

IPC(プロセス間通信)

clusterモジュールによってフォークされたプロセス同士は、IPC(プロセス間通信)チャンネルを介して通信できる。先ほど

↓ web-app.js

"use strict"

const http = require("http")
const fibonacci = require("./fibonacci")
const pid = process.pid

// サブプロセスが起動すると、このファイルを上から順に実行していく
// つまり、サブプロセスにmessageイベントが登録されている
// IPCでメッセージを受信して指定されたポート番号でWebサーバーを起動
// IPCでのメッセージ受信はprocess.on("message", ハンドラ)で、メッセージ送信はprocess.send()で行います。
process.on("message", port => {
  console.log(pid, `ポート${port}でWebサーバーを起動します`);
  http.createServer((req, res) => {
    // リクエストがきた際の処理
    const url = new URL(req.url, `http://${req.headers.host}`);
    const pathname = url.pathname;
    const n = Number(pathname.slice(1))

    if (Number.isNaN(n)) {
      // Number.isNaNで数値かどうかを判定し、数値でなかった場合は無視
      return res.end()
    }

    const response = fibonacci.fibonacci(n)
    // 結果をIPCで送信
    process.send({ pid, response })
    res.end(response.toString())
  }).listen(port)
})

↓ multi-process.js

"use strict"

const { fork, setupMaster } = require("cluster")
const cpuCount = require("os").cpus().length

console.log("メインプロセス", process.pid);

// サブプロセスが実行するファイルの指定
setupMaster({ exec: `${__dirname}/web-app`})

// CPUコアの数だけプロセスをフォーク
// 並列度の上限はコアの数までなので、コア数以上にプロセスをフォークすることに意味がない(逆に同一コア上でのプロセス間のコンテキストスイッチによるコストを発生させてしまう)
for (let i = 0; i < cpuCount; i++) {
  const sub = fork()
  console.log("サブプロセス", sub.process.pid);

  // マスタープロセスが、IPCでサブプロセスにポート番号を送信
  sub.send(3000)
  // IPCで受信したメッセージをハンドリング
  sub.on("message", ({ pid, response }) => {
    console.log(process.pid, `${pid}${response}を返します`)
  })
}

実行すると、メインプロセスからサブプロセスにメッセージが送信され、Webサーバーを起動することを確認できる。

メインプロセス 16175
サブプロセス 16188
サブプロセス 16189
サブプロセス 16190
サブプロセス 16191
サブプロセス 16192
サブプロセス 16193
サブプロセス 16194
サブプロセス 16195
16188 ポート3000でWebサーバーを起動します
16190 ポート3000でWebサーバーを起動します
16191 ポート3000でWebサーバーを起動します
16193 ポート3000でWebサーバーを起動します
16189 ポート3000でWebサーバーを起動します
16192 ポート3000でWebサーバーを起動します
16194 ポート3000でWebサーバーを起動します
16195 ポート3000でWebサーバーを起動します

起動したWebサーバーに対してhttp://localhost:3000/10のようなURLでHTTP理稀有ストを送ると以下のようなログが確認できる。

16175 16188が55を返します

これでサブプロセスからメインプロセスへのメッセージ送信も確認できた。

ハガユウキハガユウキ

リスニングソケット

リスニングソケットとは、クライアントからの接続を待ち受けるためのソケットである。

ハガユウキハガユウキ

メッセージのシリアライズ方法の指定

デフォルトでは、IPCはプロセス間でやり取りするメッセージをJSON形式の文字列にシリアライズする
この結果、メッセージの内容によっては送信した通りの値を受信側で取得できない場合がある。

たとえばDateのインスタンスをメッセージとして送信すると、受信側では2020 ~ のような文字列として取得される(Dateインスタンスが欲しいのに、文字列として取得できちゃう)

↓マスタープロセス

  // IPCで受信したメッセージをハンドリング
  sub.on("message", ({ pid, response, loggedAt }) => {
    console.log(process.pid, `${pid}${response}を返します`)
    console.log("loggedAt", loggedAt);
  })

↓ ワーカープロセス

    const loggedAt = new Date()

    // 結果をIPCで送信
    process.send({ pid, response, loggedAt })

↓ 実行結果(dateインスタンスが返されてなくて、文字列が返されちゃっている。パースはちゃんとしているけど、Dateインタンスを正しくJSONパースできないことが分かった)

18897 1891055を返します
loggedAt 2023-09-19T07:31:43.315Z
ハガユウキハガユウキ

また、JSONは循環参照に対応していないので、内部に循環参照を含むオブジェクトをJSON.stringifyするとエラーが発生する。したがって、デフォルトではIPCで循環参照を含むオブジェクトをやり取りできない。

ハガユウキハガユウキ

シリアライズはこのデフォルトの方法のほかに、構造化クローンアルゴリズムというアルゴリズムに基づく方法も利用できる。

これを利用する場合は、setupMaster()メソッドでserializationにadvancedを指定する。
(advanced以外に指定可能な値はjsonで、これがデフォルトになっている)

ハガユウキハガユウキ
// サブプロセスが実行するファイルの指定
setupMaster({
  exec: `${__dirname}/web-app`,
  //  IPCでやりとりするメッセージをJSON形式にシリアライズするわけではなく、構造化クローンアルゴリズムというアルゴリズムでシリアライズするようにする
  serialization: "advanced",
})
ハガユウキハガユウキ

worker_threadsモジュールによるマルチスレッド化

複数のスレッドは多くのリソースを共有しているので、通信が頻繁に発生したり、やり取りするメッセージが大きかったりする場合は、マルチスレッドの方が向いている。
マルチプロセスを使うなら、各プロセスが独立して動作し通信を必要としないような場合が適している。

Node.jsでマルチスレッドを実現するためには、worker_threadsモジュールを使う。
Web標準にはマルチスレッド化のための機能として、Webワーカーがありましたが、worker_threadsモジュールは、Node.jsでWebワーカーと同様の機能を提供するものである。

ハガユウキハガユウキ

マルチスレッドで処理することを想定したコード

"use strict"

const { Worker, threadId } = require("worker_threads")

console.log("メインスレッド", threadId);

// CPUコアの数だけスレッドを起動
const cpuCount = require("os").cpus().length

for(let i = 0; i < cpuCount; i++) {
  // サブスレッドで実行するファイルのパスを指定してWorkerをnew
  // Workerインスタンスを生成すると、指定したファイルパスのファイルが別スレッドで実行される
  // このWorkerはcluster.Workerとは別物
  const worker = new Worker(`${__dirname}/web-app.js`)
  console.log("サブスレッド", worker.threadId);
}
ハガユウキハガユウキ

このコードには問題がある。clusterモジュールの場合と異なり、worker_threadsが生成したスレッド同士はポートを共有できない。したがって、このファイルを実行すると次のように実行されて終了する

node multi-thread.js
メインスレッド 0
サブスレッド 1
サブスレッド 2
サブスレッド 3
サブスレッド 4
サブスレッド 5
サブスレッド 6
サブスレッド 7
サブスレッド 8
ハガユウキハガユウキ

Webアプリケーションそのものの並列化は、まさにそのようなユースケースを想定して提供されている、clusterモジュールによるマルチプロセス化に任せるべきである。
マルチスレッド化は、アプリケーション内部で現れるCPU負荷の高い処理を並列化することで、スループット(単位時間当たりの処理能力やデータ転送量)をあげたりメインスレッドでの処理の進行を妨げないようにしたりするのに有用である。

ハガユウキハガユウキ

今回のWebアプリケーションの例で言えば、フィボナッチ数の計算処理はマルチスレッド化の良いターゲットである。

ハガユウキハガユウキ

↓ web-app.js

"use strict"

const http = require("http")
const { Worker } = require("worker_threads")

http.createServer((req, res) => {
  // リクエストがきた際の処理
  const url = new URL(req.url, `http://${req.headers.host}`);
  const pathname = url.pathname;
  const n = Number(pathname.slice(1))

  if (Number.isNaN(n)) {
    // Number.isNaNで数値かどうかを判定し、数値でなかった場合は無視
    return res.end()
  }

  // コンストラクタの第二引数でサブスレッドに値を渡しつつ、サブスレッドを生成
  // サブスレッドからworker_threads.parentPort.postMessage()で送信されたメッセージは、Workerインスタンスの
  // messageイベントで取得できる
  // フィボナッチ数の計算は別スレッドで行う。Webアプリケーションを動すしているスレッドとは、別のスレッドである。
  new Worker(`${__dirname}/fibonacci.js`, { workerData: n })
    .on("message", result => res.end(result.toString()))
}).listen(8000)

↓ fibonacci.js

"use strict"

// 指定された位置のフィボナッチ数を返す
function fibonacci(n) {
  // nが1以下の場合は、それ以外の場合は直前の2つのフィボナッチ数の和を返す
  return n <= 1 ? n : fibonacci(n - 1) + fibonacci(n - 2)
}

const { workerData, parentPort } = require("worker_threads")

// フィボナッチ数の計算結果をメインスレッドに送信
// workerDataでメインスレッドから入力を受け取り、その値に対応するフィボナッチ数を計算して
// parentPort.postMessage()でメインスレッドに返す。
parentPort.postMessage(fibonacci(workerData))

module.exports = {
  fibonacci,
}
ハガユウキハガユウキ

↓ 実行結果(シングルプロセス、マルチスレッド)

Completed requests:  576
Total errors:        7
Total time:          10.012 s
Mean latency:        4868.6 ms
Effective rps:       58

↓ 実行結果(シングルプロセス、シングルスレッド)

Completed requests:  1084
Total errors:        10
Total time:          10.006 s
Mean latency:        2820.5 ms
Effective rps:       108

CPU負荷の高い処理をマルチスレッド化したが、平均処理速度がめちゃくちゃ上がっているし、10秒間で捌けるリクエスト数もだいぶ下がった。

これは、Webアプリケーションがリクエストのたびに新しいスレッドを生成することによるオーバーヘッドが、マルチスレッド化によるメリットを上回ってしまうためである。マルチスレッド化するのは良いけど、実際に処理が高速化していないとなんの意味もないから、ちゃんと負荷テストした方が良いね

ハガユウキハガユウキ

スレッドプールの実装

  • 今回の例のようにマルチスレッドでの処理が継続的に発生する場合、生成したスレッドをプールして使い回さなければマルチスレッドの恩恵を受けられない。
  • Node.jsのコアのAPIではスレッドプールの機能が提供されていないため、独自に実装する。
  • 現状のfibonacci.jsは、起動時にworkerDataを介してメインスレッドから値を受け取り、計算を一度だけ実行したらすぐに処理を終えてしまう書き方になっている。スレッドをプールして使いまわせるようにするには、メインスレッドからのメッセージの受信を継続して待機して、受信のたびに計算を実行する必要がある。

スレッド間のメッセージは、messageイベントでハンドリングできる(おそらく)。

https://qiita.com/suin/items/8fb7f77dd0a994b6f524

ハガユウキハガユウキ

↓ fibonacci.js

const { parentPort } = require("worker_threads")

// messageイベントの監視により、メインスレッドからのメッセージの受信を待機
// 受信したらフィボナッチ数を計算して結果をメインスレッドに送信
parentPort.on("message", n => parentPort.postMessage(fibonacci(n)))

このファイルの実行スレッドをプールすれば、メッセージを繰り返し送信して使いまわせる。

ハガユウキハガユウキ

スレッドをプールの実装コード

↓ fibonacci.js

"use strict"

// 指定された位置のフィボナッチ数を返す
function fibonacci(n) {
  // nが1以下の場合は、それ以外の場合は直前の2つのフィボナッチ数の和を返す
  return n <= 1 ? n : fibonacci(n - 1) + fibonacci(n - 2)
}

const { parentPort } = require("worker_threads")

// messageイベントの監視により、メインスレッドからのメッセージの受信を待機
// 受信したらフィボナッチ数を計算して結果をメインスレッドに送信
// parentPortオブジェクトには、onメソッドが生えており、
// 'message'イベントを処理するイベントハンドラーを登録することで、メインスレッドからのデータを受信できるようになります:
parentPort.on("message", n => parentPort.postMessage(fibonacci(n)))

↓ thread-pool.js

"use strict"

const { Worker } = require("worker_threads")

class ThreadPool {
  // 空きスレッド、キューを初期化

  // インスタンスのメンバ
  availableWorkers = []
  queue = []

  // sizeは生成するスレッドの数
  // Workerコンストラクタの第一引数(サブスレッドで実行するファイルパス)と第二引数(オプション)
  // は利用元から指定できるようにしておく。
  // リクエストを受け付けた時、空いているスレッドがあればそれを使って処理をするが、なければリクエストをキューに積む。
  // それぞれのスレッドは、割り当てられた処理が完了したら、キューからリクエストを(あれば)取り出して処理する
  constructor(size, filePath, options) {
    // 引数で指定された通りにスレッドを生成してプール
    for(let i = 0; i < size; i++) {
      // この別スレッドの生成時にメッセージイベントを登録していた
      // Workerインスタンスはされ続けている限り、GCで消されたりしないはず
      this.availableWorkers.push(new Worker(filePath, options))
    }
  }

  // 外部からの処理要求を受け付けるメソッド(このメソッドは非同期APIだね)
  executeInThread(arg) {
    return new Promise(resolve => {
      const request = { resolve, arg }
      // 空きスレッドがあればリクエストを処理しなければキューにつむ
      const worker = this.availableWorkers.pop()
      // jsでは#をつけるとプライベートになるそう
      // TSのprivateの方がまだわかりやすいな。#ってなんやねんってなりそう
      worker ? this.#process(worker, request) : this.queue.push(request)
    })
  }

  // 実際にスレッドで処理を実行するprivateメソッド
  // resolve, argはスレッド呼び出し時のパラメータ
  #process(worker, { resolve, arg }) {
    // 一回限りのリスナを登録している
    worker.once("message", result => {
      // リクエスト元に結果を返す
      // このresultがフィボナッチ数。
      resolve(result)

      // キューに積まれたリクエストがあれば処理をして、なければ空きスレッドに戻す
      const request = this.queue.shift()
      request
        ? this.#process(worker, request)
        : this.availableWorkers.push(worker)
    })
    // workerにargを返す
    // filePathを実行する予定のワーカーに対して、メッセージを送る
    //
    worker.postMessage(arg)
  }
}

module.exports = {
  ThreadPool,
}

↓ web-app.js

"use strict"

const http = require("http")
const { Worker } = require("worker_threads")
const cpuCount = require("os").cpus().length
const { ThreadPool } = require("./thread-pool")

// CPUコア数と同じサイズのスレッドプールを作成
const threadPool = new ThreadPool(cpuCount, `${__dirname}/fibonacci.js`)

// リクエストを受け付ける処理自体は非同期的な処理(async)だから、前の処理が後続のリクエストが捌けないとかはなさそう
http.createServer(async (req, res) => {
  // リクエストがきた際の処理
  const url = new URL(req.url, `http://${req.headers.host}`);
  const pathname = url.pathname;
  const n = Number(pathname.slice(1))

  if (Number.isNaN(n)) {
    // Number.isNaNで数値かどうかを判定し、数値でなかった場合は無視
    return res.end()
  }

  // ここでawaitされるのか
  const result = await threadPool.executeInThread(n)
  res.end(result.toString())

}).listen(3000)

ハガユウキハガユウキ

負荷テスト実行結果

Completed requests:  4162
Total errors:        98
Total time:          10.003 s
Mean latency:        908.9 ms
Effective rps:       416
ハガユウキハガユウキ

スレッドプールしたおかげで、10秒間で前よりかなりリクエストを捌けるようになった(前は576)。あとリクエストの処理時間も0.9秒なので、だいぶ早い(前は4.8秒)

ハガユウキハガユウキ

clusterモジュールでマルチプロセス化した時と近いところまでパフォーマンスが改善された。

Node.jsはイベントループとノンブロッキングI/Oにより、I/O負荷の高いアプリケーションではシングルスレッドで効率的に並行性を実現できる。worker_threadsモジュールによるマルチスレッド化は、I/Oの並行処理ではイベントループほど効率的に機能しないので、利用すべきではない。イベントループが苦手とするCPU負荷の高い処理では、worker_threadsモジュールを利用したマルチスレッドでの処理が有効である。

ハガユウキハガユウキ

スレッド間通信とIPCの違い

ハガユウキハガユウキ
  • IPCではプロセス間のメッセージ送信にsendメソッドが使われる
    • メッセージの受け取りには、messageイベントでリスナを登録しておく
  • スレッド間通信では、postMessageというメソッドが使われ、Workerのインスタンス化時のworkerDataによる値の受け渡しも可能である。
    • workerも同じくmessageイベントで受け取る
  • IPCではデフォルトではメッセージをJSON形式にシリアライズするが、スレッド間通信ではpostMessage, workerDataともデフォルトで構造化クローンアルゴリズムを使う
  • IPCはclusterモジュール、スレッド間通信はworker_threadsモジュールを使う
  • スレッド間では値のやり取りをコピーではなく、直接別スレッドに値を渡すこともできる
    • スレッド間ではプロセス間とは異なりメモリを共有できるためである。
ハガユウキハガユウキ

スレッド間での値の転送

  • 値をコピーしないで他スレッドに渡す方法の1つに、転送(transfer)と呼ばれるものがある
  • postMessage()やWorkderインスタンス化時に転送対象のオブジェクトを指定すると、そのオブジェクトはコピーされることなく他スレッドに渡され、それに伴い所有権も譲渡される。すなわち、元のスレッドではそのオブジェクトを利用できなくなる。
  • 転送では値のコピーによるオーバーヘッドがないため、特に大きなオブジェクトを渡す際はパフォーマンス面でメリットがある。
  • 転送対象に指定できるオブジェクトは、転送可能オブジェクト(transferable object)と呼ばれ、次のようなものが含まれる。
    • ArrayBuffer: 固定長のバイナリデータのバッファを表すオブジェクト
    • MessagePort: スレッド間通信のためのポートを表すオブジェクト
    • FileHandle: ファイル記述子のラッパーオブジェクト
ハガユウキハガユウキ

workerDataでメインスレッドから値を受け取った値をそのままpostMessageで返すサブスレッドを実装する。この時、workerData.transferの値に応じて、転送を使うかどうか決定する。
postMessage()で転送を使う場合、第二引数に転送対象オブジェクトを配列で指定する

↓ maybe-transfer.js

"use strict"

const { parentPort, workerData } = require("worker_threads")

// メインスレッドに返す
parentPort.postMessage(
  workerData.buffer,
  // postMessage()の第二引数に転送対象オブジェクトを指定
  workerData.transfer ? [workerData.buffer] : []
)

↓ main.js

"use strict"

const perf_hooks = require("perf_hooks")
const { Worker } = require("worker_threads")

function useMaybeTransfer(transfer) {
  // 1GBのArrayBufferを生成
  const buffer = new ArrayBuffer(1024 * 1024 * 1024)
  // 現在時刻を記録
  const start = perf_hooks.performance.now()
  new Worker(
    "./maybe-transfer.js",
    {
      workerData: { buffer, transfer },
      // transferListプロパティに転送対象オブジェクトを指定
      transferList: transfer ? [buffer] : []
    }
  ).on("message", () => {
    // サブスレッドから値が戻ってくるまでにかかった時間を出力
    console.log(perf_hooks.performance.now() - start)
  })

  // サブスレッドに渡した値がどう見えるか確認
  console.log(buffer);
}

// 転送を利用する場合
useMaybeTransfer(true)

// 転送を利用しない場合(値のコピー)
useMaybeTransfer(false)

↓ 実行結果

node main.js
ArrayBuffer { (detached), byteLength: 0 }
ArrayBuffer {
  [Uint8Contents]: <00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 ... 1073741724 more bytes>,
  byteLength: 1073741824
}
1174.1023473739624
3604.235155105591

転送を使うことでパフォーマンスが上がることが確認できた。

ハガユウキハガユウキ

スレッド間での値の共有

  • 値をコピーしないで他スレッドに渡すもう一つの方法は、SharedArrayBufferというオブジェクトを利用してスレッド間で値を共有するというものである
  • Node.jsがworker_threadsモジュールによって生成するスレッドは、非常に独立性が高い。スレッドはそれぞれ独自のイベントループを持ち、グローバルを含め変数はスレッド間で共有されない。また、スレッド間で値をやり取りする際も、これまでに紹介した方法では複数スレッドが同じ値を共有することはない。値をコピーした場合はそれぞれのスレッドで個別の値を持つことになるし、転送した場合は転送元ではその値にアクセスできなくなる。
    • 従来のプログラミング言語ではスレッド間で変数を共有している。そのため、処理の途中で別のスレッドから変数の値を書き換えられるのを防ぐために、最初にロックを取得して、更新が完了したらロックを解放するような処理を入れる必要がある。Node.jsのマルチスレッドの場合、値が共有されないので、他のスレッドを気にせず自由に値を更新できる
    • しかし、処理の要件によっては、スレッド間で同じ値を共有したい場合もある。そのような要件に対応するための仕組みが、SharedArrayBufferを利用した値の共有である。SharedArrayBufferは固定長の共有可能なバイナリデータのバッファである。
    • SharedArrayBufferそのものはバイナリデータなので、これをプログラム上で操作する際はビューと呼ばれるものでラップする必要がある。ビューにはTypedArrayが使われる。TypedArrayはUint8ArrayやInt32Arrayなど、決まった型(Uint8Arrayなら符号なし8ビット整数, Int32Arrayなら符号付32ビット整数)の値が入る配列です。
ハガユウキハガユウキ

値を共有できても、スレッドセーフにやらないと意味がない。
waitやnotifyなど、スレッドセーフにやりたいなら、Atomicsというグローバルオブジェクトが提供するメソッドを使う。

ハガユウキハガユウキ

この記事地味に参考になる
やっぱCPU(のコア)の実行単位がスレッドか
https://hogetech.info/linux/kernel/cpu

ハガユウキハガユウキ

てなると、「プロセスを実行する」というのは厳密に言うと、「プロセスに存在するスレッドをCPUコアが実行する」ということか。プロセスはOSから見たプログラムの実行単位。スレッドはCPUコアからみたプログラムの実行単位

このスクラップは2023/09/20にクローズされました