💼

C++でジョブシステムを作ってみる(2)

2023/11/09に公開

はじめに

この記事ではC++でジョブを並列に実行するためのジョブシステムを実装していきます。

前回 は下記のステップ1まで実装しました。

  1. ジョブを並列実行する機能
  2. ジョブ間の依存関係を設定する機能
  3. 大きなジョブを分割して並列化する機能

前回の記事はこちら ↓
https://zenn.dev/nishiki/articles/01ff1417f0b85f

今回はステップ2ステップ3を実装してみます。前回と同様、この記事の全てのコードはパブリックドメインとします。

2. ジョブ間の依存関係を設定する機能

ステップ2ではジョブ間の依存関係を設定する機能を実装します。

例えば、ゲームエンジンであれば、物理演算をしている途中でレンダリングを始めてはいけません。また、スケルタルアニメーションを計算している途中にコリジョン判定をしてはいけません。このような場合には、ジョブ間に依存関係を設定する機能が必要になります。

今回の実装では、下のようにハンドルを介してジョブ間の依存関係を表現することにします。

サンプルコード
// ジョブAをスケジュール
// ジョブハンドルを取得しておく
JobHandle handleA = jobSystem.schedule([]() {
    doWork();
});

// ジョブBをスケジュール
// ジョブAのハンドルを渡すと、ジョブBはジョブAが終わるまで待機する
JobHandle handleB = jobSystem.schedule([]() {
    doWork();
}, {handleA});

実行してみると、下のようにジョブAが終わってからジョブBが開始されるようになります。

Thread 10032 |    AAAAAA  BBBBBB  |
Thread 17636 |      AAAAAA  BBBBBB|
Thread 21128 |AAAAAA      BBBBBB  |
Thread 25696 |  AAAAAA    BBBBBB  |

2.1 特定ジョブを待機する機能

前回は全ジョブの完了を待つ waitForAll() というメンバー関数を実装しました。しかし、依存関係を設定するには、特定ジョブの完了を待つ機能が必要です。

そのための機能を提供してくれるのが、std::packaged_taskstd::shared_future です。以下のように、ジョブが完了するまで待機することができます。今回は利用しませんが、戻り値がある場合は std::shared_future::get() で取得することも可能です。

サンプルコード
// 別スレッドで実行する関数を登録
std::packaged_task<void()> task([](){ ... });

// 待機するためのハンドルを取得
std::shared_future<void> future = task.get_future().share();

// 別スレッドで実行
std::thread thread(std::move(task));

// 待機
future.wait();

これまでジョブキューは std::function を保持していましたが、別スレッドで完了を待つために std::packaged_task に変更します。

+ #include <future>

class JobSystem {
    // ...
    
-   std::queue<std::function<void()>> jobQueue;
+   std::queue<std::packaged_task<void()>> jobQueue;

    void workerThreadFunction() {
        while (true) {
-           std::function<void()> job;
+           std::packaged_task<void()> job;

            // ...
        }
    }
};

次に schedule() を修正し、std::shared_futureジョブハンドルとして返すことで、登録したジョブを待機できるようになります。

+ using JobHandle = std::shared_future<void>;

class JobSystem {
    // ...
    
+   // JobHandleを返すように変更
+   JobHandle schedule(const std::function<void()>& job) {
-   void schedule(const std::function<void()>& job) {

+       // packaged_task と shared_future を作成
+       std::packaged_task<void()> task(job);
+       auto future = task.get_future().share();
        {
            std::lock_guard<std::mutex> lock(queueMutex);
            jobCounter.fetch_add(1);
            completeFlag.store(false);

-           jobQueue.push(job);
+           jobQueue.push(std::move(task));
        }
	
        condition.notify_one();
+       return future;
    }
    
    // ...
};

これで、以下のように特定ジョブを待つことができるようになりました。

サンプルコード
JobHandle handle = jobSystem.schedule([]() {
    doWork();
});

// 別の処理

handle.wait();

2.2 依存ジョブを待機してから実行する機能

特定ジョブを待機できるようになったので、これを使って依存ジョブを待つ機能を実装します。

まず、依存ジョブを受け取れる schedule() のオーバーロードを追加します。複数の依存ジョブにも対応できるように、std::vector としておきます。

    // 依存なしの schedule 関数
    JobHandle schedule(const std::function<void()>& job) { ... }

    // 依存を持つジョブをスケジュールする
    JobHandle schedule(const std::function<void()>& job, 
                       const std::vector<JobHandle>& dependencies) {
        // TODO: 実装
    }

実装はシンプルです。dependencies が完了するまで待機してから job を実行するようなラッパー的なラムダを作成し、それをジョブとして追加します。

    JobHandle schedule(const std::function<void()>& job, 
                       const std::vector<JobHandle>& dependencies) {
        auto wrapper = [job, dependencies]() {
            // 全ての依存ジョブが終わるまで待機する
            for (auto& dep : dependencies) {
                if (dep.valid()) {
                    dep.wait();
                }
            }

            job();
        };
        return schedule(wrapper);
    }

2.3 動作確認

依存関係を処理できるようになったので、動作確認をしてみます。前回の動作確認コードのうち、ジョブをスケジュールする部分を書き換えます。

void func(std::unordered_map<std::thread::id, std::vector<JobData>>& jobData, 
          int name) {
    JobData data;
    data.name = name;
    data.start = now();
    
    int sleepTime = generateRandomInt(5, 10);
    busyWait(std::chrono::milliseconds(sleepTime));
    
    data.end = now();

    auto thisId = std::this_thread::get_id();
    std::lock_guard<std::mutex> lock(dataMutex);
    if (!jobData.contains(thisId)) {
        jobData[thisId] = {};
    }
    jobData[thisId].push_back(data);
}

int main() {
    // ジョブシステムの作成...
    // ジョブデータの格納先...
    // 開始時間を記録...

    // ジョブAをスケジュールする
    std::vector<JobHandle> jobAHandles;
    for (int i = 0; i < 4; ++i) {
	auto handle = jobSystem.schedule([&jobData]() { 
	    func(jobData, 'A'); 
        });

        // ハンドルを保持しておく
        jobAHandles.push_back(handle);

        busyWait(std::chrono::milliseconds(2));
    }

    // ジョブBをスケジュールする
    for (int i = 0; i < 4; ++i) {
        // ジョブAを待機するためハンドルを渡す
        jobSystem.schedule([&jobData]() { 
	    func(jobData, 'B'); 
        }, jobAHandles);

        busyWait(std::chrono::milliseconds(2));
    }

    // 全てのジョブが完了するのを待つ...
    // 終了時間を記録...
    // タイムラインを表示...
}

実際に実行してみると、以下のように同期がとれたタイムラインが表示されます。

Total duration: 20 ms
Thread 15544 |AAAAAA        BBBBBB| (2 jobs)
Thread 45912 |    AAAAAA  BBBBBB  | (2 jobs)
Thread 59396 |      AAAAAABBBBBB  | (2 jobs)
Thread 61240 |  AAAAAA    BBBBBB  | (2 jobs)

2.4 複雑な依存関係の問題

簡単な依存関係を処理できていることは確認できましたが、一般的に複雑な依存関係を処理するのは難しい問題です。

不適切な順序

例えば、こんなコードを考えてみます。

サンプルコード
JobHandle handleB = jobSystem.schedule(); // 先にBをスケジュール
JobHandle handleA = jobSystem.schedule(); // 後にAをスケジュール
handleB.setDepends(handleA); // BはAが終わってから実行

今回のジョブシステムはこう書けないようなインターフェイスにしていますが、上記のようなコードを書いたとき、ジョブキューは以下のような状態になります。

// <- こっちが先頭
| JobB | JobA | .... | .... |

ワーカースレッドはジョブBの実行をはじめて、ジョブAの完了を待ちます。しかし、ジョブAはジョブBのあとに実行されるため、スレッドが永遠に待機してしまいます。いわゆるデッドロックです。

これは、今回の実装のようにインターフェイスによる制限を加えることで回避できますが、正しい順序でスケジュールしなければならない点が不便な場合もあると思います。

単純なキューを利用するのではなく、優先度グラフを扱えるデータ構造を利用することで、改善できるかもしれません。

また、デッドロックを解消する方法として、タイムアウトを導入する手もあるかもしれません。std::condition_variable::wait_for() を使えば、待機をタイムアウトで抜けてジョブを後回しにするなど、対処することができます。ただゲームエンジンには適していなさそうですね。

依存の循環

順序の問題と似ていますが、複数のジョブが互いに依存し、循環が生まれてしまう場合もデッドロックになります。

JobHandle handleA = jobSystem.schedule();
JobHandle handleB = jobSystem.schedule();
handleA.setDepends(handleB); // AはBが終わってから実行
handleB.setDepends(handleA); // BはAが終わってから実行

この場合はジョブシステムどうこうというよりは、そもそも計算できるのかという話のような気もします。ひとつのジョブにまとめたり、ジョブをさらに細分化したりすることで循環を回避できるかもしれません。また、ゲームエンジンであれば片方は前フレームの情報を利用することでも解決できそうです。

効率性

3つのジョブA、B、Cがあり、BはAに依存し、Cは独立しているとします。

JobHandle handleA = jobSystem.schedule();
JobHandle handleB = jobSystem.schedule();
JobHandle handleC = jobSystem.schedule();
handleA.setDepends(handleB); // BはAが終わってから実行

単純なキューを使っている以上、ジョブキューにA、B、Cと順番に追加したとき、Bの前にCを実行することはできません

Thread 15544 |AAAAAA         BBB CCC    |
Thread 45912 |    AAAAAA     BBBBBBBB CC|
Thread 59396 |      AAAAAAAAABBBBB  CCC |
Thread 61240 |  AAAAAA       BBBBBB  CCC|

上のタイムラインを見ると、BがAを待機している時間が無駄になっています。

Thread 15544 |AAAAAA CCC CC  BBB     |
Thread 45912 |    AAAAAA CCC BBBBBBBB|
Thread 59396 |      AAAAAAAAABBBBB   |
Thread 61240 |  AAAAAA  CCC  BBBBBB  |

このように、独立しているCを待機中に実行できれば、より効率的にスレッドを活用できます。つまり、単純に追加された順にジョブを実行するのではなく、依存関係を考慮して最適なジョブを取得できるようなデータ構造にすると、より効率的な実装ができるかもしれません。

3. 大きなジョブを分割して並列化する機能

次に、大きなジョブを分割して並列化する機能を実装してみます。

ゲームでは大量のゲームオブジェクトが存在します。例えば、全ゲームオブジェクトの物理演算を一つのジョブとしてスケジュールしてしまうと、マルチスレッドを全く活用できません。このような場合、ジョブを分割する機能があると便利です。

サンプルコード
// 処理を 8 つのジョブに分割してスケジュールする
// ラムダ式には jobIndex が渡される
std::vector<int> data(1000);
jobSystem.schedule(8, [](uint32_t jobIndex) {
    for(int i = 0; i < 125; i++){
        uint32_t globalIndex = jobIndex * 125 + i;
        data[globalIndex] = compute(globalIndex);
    }
});

上のコードでは、1000個のデータを処理しますが、ジョブを8つに分割することで、高速に並列処理できるようになります。

ラムダには jobIndex が渡されるようにしておくことで、グローバルインデックスを計算できるようにしておきます。

3.1 実装

実装は単純で、まずは jobCount を受け取る schedule() のオーバーロードを追加します。forループのインデックスをキャプチャし、ジョブに引数として渡すようなラッパーラムダを作成し、スケジュールするだけです。

class JobSystem {
    // ...
    
    std::vector<JobHandle> schedule(uint32_t jobCount, 
                                    const std::function<void(uint32_t)>& job) {
        std::vector<JobHandle> handles(jobCount);
        for (uint32_t jobIndex = 0; jobIndex < jobCount; jobIndex++) {
            auto wrapper = [job, jobIndex]() { job(jobIndex); };
            handles[jobIndex] = schedule(wrapper);
        }
        return handles;
    }
};

ちなみに、schedule() 側で二重ループまで書いてしまって、ジョブ本体はループなしでフラットに書かせるような実装も可能です。こうすると、GPGPUと同じ感覚で使えます。その代わり、ジョブごとに一度だけ実行したい処理などは書きにくくなるので、今回の実装例とどちらがいいかは使い方によります。

3.2 動作確認

動作確認では1000万個のデータで、インデックスの二乗を計算するジョブで試してみます。


int main() {
    // ジョブシステムの作成...

    std::vector<uint32_t> resultData(10'000'000);

    // 開始時間を記録...
    
    uint32_t jobCount = 8;
    uint32_t size = resultData.size() / jobCount;
    jobSystem.schedule(jobCount, [size, &jobData, &resultData](uint32_t jobIndex) {
        auto thisId = std::this_thread::get_id();

        // 開始時間を記録
        JobData data;
        data.name = 'A';
        data.start = now();

        // 簡単な計算
        for (uint32_t index = 0; index < size; index++) {
            uint32_t globalIndex = size * jobIndex + index;
            resultData[globalIndex] = globalIndex * globalIndex;
        }

        // 終了時間を記録
        data.end = now();

        // ジョブデータを保存
        std::lock_guard<std::mutex> lock(dataMutex);
        if (!jobData.contains(thisId)) {
            jobData[thisId] = {};
        }
        jobData[thisId].push_back(data);
    });

    // 全てのジョブが完了するのを待つ...
    // 終了時間を記録...
    // タイムラインを表示...
}

実際に実行してみると、以下のようになります。

Total duration: 17 ms
Thread  6076 |AAAAAAAAAAAA     | (2 jobs)
Thread 57200 |AAAAAAAAAAAAA    | (2 jobs)
Thread 27832 |AAAAAAAAAAAAAAAA | (2 jobs)
Thread 16416 |AAAAAAAAAAAAAAAAA| (2 jobs)

並列に実行できていることが確認できます。ジョブがインデックスの二乗計算になっているので、数字が大きい部分ほど実行に時間がかかっていそうですね。

私の環境では、シングルスレッドと比べると5倍以上高速になりました。

おわり

C++でジョブシステムの実装を試してみました。実際に利用するにはまだまだ改善が必要かもしれませんが、全体的な流れは勉強できた気がしています。

間違っている点改善点おすすめのデータ構造おすすめの資料などがあればぜひコメントいただけると嬉しいです。

ここまで読んでいただき、ありがとうございました!

Discussion