🧵

【TypeScript/C#】 軽量スレッドライブラリ

に公開

俗に軽量スレッドとかコルーチンとか言われるヤツを操作するライブラリです。スレッド(糸)より細かいのでファイバー(繊維)と呼ばれる事も多いです。技術的には普通の非同期と変わらないというか、ライブラリ内部では普通に async/await を使っています。

スレッドと違って軽量スレッド/コルーチン/ファイバーに明確な定義はない(ハズ)ので、ファイバーがスレッドを使っても問題ない。

ライブラリは端的に言えば「便利な Promise.raceTask.WhenAny ラッパー」で、キューを中断・再開したり並列実行数の動的な制御等が可能になっています。

AI に喰わせるためのドキュメントやウェブページを一括ダウンロード&処理する等々、100、1,000、10,000 の非同期処理を並列で実行したいけど単純に並列実行すると負荷が高い、それ以前に直列で実行しないと 429 Too Many Requests になる、みたいな時に使えます。

つかいかた

ファクトリーメソッドが二つあります。

※ サンプルとしてダウンロード処理を並列で行っていますが、クラウド API には大きなファイルを分割ダウンロードする機能が標準で備わっていたりするので、まずはそちらを試した方が良いでしょう。

Fibers.ForEach

TypeScript 版と C# 翻訳版は言語仕様上の差異のみで根本は変わりません。

import { Fibers } from 'ts-fibers';

const fibers = Fibers.forEach(
  4,     // 同時実行数
  urls,  // 配列

  // 👇 配列の要素に対して非同期処理を行う関数(遅延実行)
  async (url) => {
    return await downloadAsync(url);
  }
);

// 👇 終了したジョブから順に列挙
for await (const downloadedData of fibers) {
  console.log(`Download completed: ${downloadedData.title}`);
}

C# 移植版

await using var fibers = Fibers.ForEach(
    concurrency: 4,
    urls,
    async (url) =>
    {
        return await DownloadAsync(url);
    });

await foreach (var downloadedData in fibers)
{
    Console.WriteLine($"Download completed: {downloadedData.title}");
}

Fibers.For

配列ではなくインデックスに対して非同期処理を行います。

// 例えば C# の全てのコンパイルエラー(CS0001~CS9999)情報のダウンロードを試みる等
await foreach (var x in Fibers.For(4, 1, 9999, 1, async...))
{
    //...
}

TS 版

const fibers = Fibers.for(
  5,
  0, 310, 1,
  async (index) => {
    return await downloadAsync(`https://...${index}.bin`);
  }
);

元の情報を保持する

Fibers は終了したタスクから順に列挙しますが、終了順ではなく元の順序や配列要素が後続の処理で必要な場合もあると思います。

情報の保持のためにいちいち型を定義するのは面倒なので、ファイバー非同期関数からは匿名型でも何でも返せるようになっています。

const fibers = Fibers.for(concurrency, startIndex, endIndex, step,
  async (index) => {
    const value = await fooAsync(index);

    // 👇 インデックスと結果を組み合わせて返す
    return { index, value };
  });

// 👇 ファイバー関数に「適切な順序で結果を格納する」処理を含めなくて良くなる
for await (const x of fibers) {
  results[x.index] = x.result;
}

C#(同上)

await using var fibers = Fibers.for(
    ...,
    async (index) =>
    {
        var value = await FooAsync(index);
        return (index, value);
    }
)

大量のウェブページを収集&後処理した際のログ

TypeScript 版、C# 移植版どちらも言語機能のイテレーターを使っているので、CPU/メモリ効率共に良い感じで処理出来てるんじゃないでしょうか。

  • TS 初心者なので意味のあるログか不明
  • C# 移植版は「TS 版が便利だったから移植した」なので特にログとかはありません

Concurrency = 1

1:50:37.341 (h:mm:ss.mmm)

# 実行直後
Memory Usage:
  RSS         : 64.09 MB
  Heap Total  : 30.70 MB
  Heap Used   : 16.53 MB
  External    : 3.68 MB
  Array Buffs : 0.18 MB

# 終了直前
Memory Usage:
  RSS         : 252.32 MB
  Heap Total  : 30.45 MB
  Heap Used   : 23.06 MB
  External    : 6.77 MB
  Array Buffs : 3.09 MB

Concurrency = 3

x2.6 高速です。

42:54.500 (m:ss.mmm)

# 実行直後
Memory Usage:
  RSS         : 64.09 MB
  Heap Total  : 31.45 MB
  Heap Used   : 13.85 MB
  External    : 3.67 MB
  Array Buffs : 0.17 MB

# 終了直前
Memory Usage:
  RSS         : 210.70 MB
  Heap Total  : 32.20 MB
  Heap Used   : 23.86 MB
  External    : 6.54 MB
  Array Buffs : 2.91 MB

FiberScheduler

C# 移植版にはファイバーを使った非同期処理スケジューラーがあります。

好きなタイミングでジョブを投入して同時実行数を動的に変更する等々が可能です。

FiberScheduler.Default.Schedule(...);

// Default を止めて優先的に処理を行うスケジューラー(画面黄色のログ)
FiberScheduler.Priority.Schedule(...);

その他

fibers.promise.finally(...) fibers.AsTask().ContinueWith(...) 等と組み合わせてファイバーをバックグラウンド実行できます。

👇 詳細

TypeScript 版

https://github.com/sator-imaging/ts-fibers

C# 移植版

https://github.com/sator-imaging/Unity-Fundamentals/blob/main/Runtime/Threading/Fibers.cs

おまけ

FUnit.Run//:funit:include ... を使って Unity のテストを GitHub で実行できるようになりました 🎉

👇 実行結果

https://github.com/sator-imaging/Unity-Fundamentals/actions/runs/18935460730

テスト用アセンブリ

テスト用の .asmdef でディスコンになった Stadia を指定することにより、Unity で許されない最新の C# 構文を使用してもエラーが出ない状態にできます。

Tests~ 等の Unity で非表示になるフォルダー名にする手もありますが、AI にテストを書かせる際に「Terminal で開く」等の Unity エディター拡張が使えなくなるのが地味に不便。

👇 FUnit 専用のテストエクスプローラーも完備

ついつい全部作っちゃう悪癖何とかしたい

おわりに

C# 移植版は ts-fibers のテストを C# + FUnit 向けに翻訳させて、そのテストをパスしたから大丈夫だろうという代物です。

そして ts-fibers のテストは AI が書いています。つまり、何か問題が起きた場合は全て AI が悪いという事になります。

以上です。お疲れ様でした。

Discussion