C++でジョブシステムを作ってみる(1)
ジョブシステム (Job System)
ゲームエンジンの勉強をしているとジョブシステムの話がよく出てくるので、実際にC++20で実装してみました。マルチスレッドに慣れていないため、正しいアプローチである確信はありません。重要なミスがあったら教えていただけると助かります。
コードは標準ライブラリにある機能は素直に使い、シンプルな実装を目指します。また、この記事の全てのコードはパブリックドメインとします。
これから、3つのステップでジョブシステムを実装していきます。
- ジョブを並列実行する機能
- ジョブ同士に依存関係を設定する機能
- 大きなジョブを分割して並列化する機能
この記事ではステップ1の最も基本的なジョブシステムを実装することを目標にします。次回 はステップ2とステップ3を実装します。
実装に入る前に雰囲気を伝えるため、各ステップにおけるインターフェイスを示します。
// スレッド数4としてジョブシステムを作成
JobSystem jobSystem(4);
// ジョブをスケジュール
// 空いているスレッドで実行される
jobSystem.schedule([]() {
doWork();
});
// すべてのジョブの完了を待機する
jobSystem.waitForAll();
// ジョブAをスケジュール
// ジョブハンドルを取得しておく
JobHandle handleA = jobSystem.schedule([]() {
doWork();
});
// ジョブBをスケジュール
// ジョブAのハンドルを渡すと、ジョブBはジョブAが終わるまで待機する
JobHandle handleB = jobSystem.schedule([]() {
doWork();
}, {handleA});
// 処理を4つのジョブに分割してスケジュールする
// ラムダ式には`jobIndex`が渡される
std::vector<int> data(100);
jobSystem.schedule(4, [](uint32_t jobIndex) {
for(int i = 0; i < 25; i++){
data[jobIndex * 25 + i] = compute();
}
});
1. ジョブを並列実行する機能
まずは最も単純なジョブシステムを実装します。以下のタイムラインのようなイメージで、タスクが複数スレッドに分散され、並列処理されるようにします。
Thread 11344 | DDDDDDDDDDIIIII |
Thread 21508 |AAAAAAAAA FFFFF JJJJJJJ|
Thread 28908 | BBBBBBBEEEEEHHHHHHH |
Thread 46992 | CCCCCCCCCGGGGGGG |
1.1 基本メンバー変数の準備
ここでは最低限ジョブを実行するために必要な変数を考えます。
std::queue<std::function<void()>>
ジョブキュー まずはジョブキューです。ジョブは std::function<void()>
としています。ジョブは色々なスレッドからプッシュ・ポップされるため、ジョブキューがスレッドセーフであることが重要です。よって、排他制御するためのミューテックス (std::mutex
) も追加しておきます。
#include <queue>
#include <mutex>
#include <functional>
class JobSystem {
private:
std::queue<std::function<void()>> jobQueue;
std::mutex queueMutex;
};
std::thread
ワーカースレッド 次にワーカースレッドです。ワーカースレッドは、処理すべきジョブを実行しつづけ、ジョブがない場合は待機します。この待機を実現するためには条件変数 (std::condition_variable
) も追加しておく必要があります。
#include <thread>
#include <vector>
#include <condition_variable>
class JobSystem {
// ...
std::vector<std::thread> workers;
std::condition_variable condition;
};
1.2 基本メンバー関数の実装
次にメンバー関数の実装に入っていきます。
コンストラクタ(ワーカースレッドの作成)
まずはコンストラクタで必要なスレッド数を受け取り、ワーカースレッドを作成します。ワーカースレッドでは、次に説明する workerThreadFunction()
を実行させておきます。
class JobSystem {
public:
explicit JobSystem(size_t threadCount) {
for (size_t i = 0; i < threadCount; ++i) {
workers.emplace_back([this] { this->workerThreadFunction(); });
}
}
// ...
};
ちなみに、ここではコンストラクタを使っていますが、シングルトンのような形にして void init()
などの関数で明示的に初期化する方法でもいいと思います。
workerThreadFunction()
workerThreadFunction()
は、処理すべきジョブが来るまで待機し、来たらジョブを取得して実行します。std::unique_lock
を使うことで、ジョブキューをスレッドセーフに操作し、条件変数と連携して待機を可能にします。
待機から起きたら、ジョブキューからジョブをポップし、実際にジョブを実行します。
// ...コンストラクタ
private:
void workerThreadFunction() {
// 無限ループで待機と実行を繰り返す
while (true) {
std::function<void()> job; // ジョブの入れ物を用意
{
// ジョブキューにジョブが追加されるまで待機する
std::unique_lock<std::mutex> lock(queueMutex);
condition.wait(lock, [this] { return !jobQueue.empty(); });
// ジョブキューからジョブを取得する
job = std::move(jobQueue.front());
jobQueue.pop();
} // ロックを解放
// ジョブを実行する
job();
}
}
// ...メンバー変数
ロックはジョブの実行前にスコープを抜けて解放されていることに注意してください。ジョブの実行中までロックしてしまうと、他のスレッドで並列実行できなくなってしまいます。
schedule()
次にジョブを追加する関数 schedule()
を実装します。
ジョブキューにジョブをプッシュするため、まずは std::lock_guard
でロックしておきます。先ほど、ワーカースレッドでは std::unique_lock
を使っていましたが、今回は条件変数による待機の必要がないため、std::lock_guard
で十分です。
その後、ジョブキューにジョブをプッシュします。
最後に、条件変数の notify_one()
を呼ぶことで、この条件変数で待機しているワーカースレッドのうち、ひとつを起こします。こうすることで、ワーカースレッド側がジョブの追加を検知し、実行を始めることができます。
// ...コンストラクタ
void schedule(const std::function<void()>& job) {
{
std::lock_guard<std::mutex> lock(queueMutex);
// ジョブをキューにプッシュ
jobQueue.push(job);
}
// 待機中のワーカースレッドをひとつ起こす
condition.notify_one();
}
// ...
1.3 動作確認
これで、ひとまずジョブを並列実行させることができるはずです。試してみましょう。
#include <chrono>
#include <iostream>
void busyWait(std::chrono::nanoseconds duration) {
auto start_time = std::chrono::high_resolution_clock::now();
while (std::chrono::high_resolution_clock::now() - start_time < duration) {
// CPU時間を消費するためのアクティブな待機
// ループの中で何もしない
}
}
int main()
{
JobSystem jobSystem(4);
for (int i = 0; i < 10; ++i) {
// ジョブをスケジュール
jobSystem.schedule([]() {
// スレッドIDを出力してみる
std::cout << "Executed: " << std::this_thread::get_id() << std::endl;
busyWait(std::chrono::milliseconds(1000));
});
// 少し待機してから次のジョブをスケジュールする
busyWait(std::chrono::milliseconds(500));
}
}
適当な環境で実際に実行してみると、以下の出力が得られました。分かりづらいですが、複数スレッドで実行されていることは確認できます。
Executed: 19328
Executed: 23400
Executed: 11836
Executed: 27348
Executed: 19328
Executed: 23400
Executed: 11836
Executed: 27348
Executed: 19328
Executed: 23400
これを実際に実行してみると、待機や終了処理ができていないので正常に終了できません。
1.4 全てのジョブの待機
全てのジョブを待機するメンバー関数 waitForAll()
を実装します。
待機処理はいくつか実装方法が考えられると思います。今回は、ジョブキューに追加され、まだ完了していないジョブの数を記録しておき、それが 0
になったらすべてのジョブが完了したと判断することにします。
ジョブの数も複数スレッドから操作されるため、スレッドセーフな std::atomic
を使うことにします。
さらに、待機や通知のための完了フラグを追加しておきます。(C++20未満の場合は、ミューテックスと条件変数で代用できます。)
// ...その他のメンバー変数
std::atomic<int> jobCounter = 0;
std::atomic<bool> completeFlag = false;
インクリメント
では、schedule()
を修正し、ジョブがプッシュされたらカウンターをインクリメントします。
void schedule(const std::function<void()>& job) {
{
std::lock_guard<std::mutex> lock(queueMutex);
+ jobCounter.fetch_add(1); // インクリメント
+ completeFlag.store(false); // 未完了にする
jobQueue.push(job);
}
condition.notify_one();
}
デクリメント
次に、workerThreadFunction()
を修正し、ジョブが完了したらカウンターをデクリメントします。さらに、デクリメントした結果が 0
になるとき、完了フラグの notify_all
を使って全ジョブが終了したことを通知しておきます。
std::atomic
の fetch_sub()
は操作の直前の値が返ることに注意してください。デクリメント結果が 0
になるのは、返り値が 1
のときです。
void workerThreadFunction() {
while (true) {
std::function<void()> job;
{
std::unique_lock<std::mutex> lock(queueMutex);
condition.wait(lock, [this] { return !jobQueue.empty(); });
job = std::move(jobQueue.front());
jobQueue.pop();
}
job();
+ // デクリメント
+ if(jobCounter.fetch_sub(1) == 1) {
+ // カウントが0になったら完了フラグを立てて通知
+ completeFlag.store(true);
+ completeFlag.notify_all();
+ }
}
}
waitForAll
は、完了フラグが立つまで待機するだけで実装終わりです。
void waitForAll() const {
// trueになるまで待機する
completeFlag.wait(false);
}
1.5 終了処理
最後にジョブシステムの終了処理を実装します。
ワーカースレッドの終わらせ方
デストラクタの前に、まだ考えるべきことがあります。それはワーカースレッドの終わらせ方です。ワーカースレッドはジョブがなくなったとしても、無限ループで待機状態に入るだけなので、勝手に処理が終了することはありません。なので、明示的にループから抜ける処理が必要です。
具体的には、停止フラグを追加して、停止フラグが立った場合も待機から起きられるようにします。その後、停止フラグが立っていればループから抜けます。
+ std::atomic<bool> stopFlag = false;
void workerThreadFunction() {
while (true) {
std::function<void()> job;
{
std::unique_lock<std::mutex> lock(queueMutex);
- condition.wait(lock, [this] { return !jobQueue.empty(); });
+ condition.wait(lock, [this] {
+ // 停止フラグが立った場合も起きる
+ return !jobQueue.empty() || stopFlag;
+ });
+ // 停止フラグが立っていて、ジョブキューが空なら抜ける
+ if(stopFlag && jobQueue.empty()) {
+ break;
+ }
job = std::move(jobQueue.front());
jobQueue.pop();
}
job();
if(jobCounter.fetch_sub(1) == 1) {
completionCondition.notify_all();
}
}
}
ここでは停止フラグが立っていて、さらにジョブキューが空なら抜けるようにしました。残っているジョブを無視して即座に停止させる場合は、空かどうかの判定はいりません。
デストラクタの実装
それではデストラクタを実装します。停止フラグを立てて、ワーカースレッドを起こします。最後に、join()
で終了するのを待機します。
~JobSystem() {
// 全てのワーカースレッドに停止信号を送る
stopFlag.store(true);
// 全てのワーカースレッドを起こす
condition.notify_all();
// 全てのワーカースレッドが終了するのを待つ
for (auto& worker : workers) {
if (worker.joinable()) {
worker.join();
}
}
}
以上でステップ1の単純なジョブシステムの実装が完了しました。正常に終了できるようになっているはずです。
1.6 動作の可視化
タイムライン可視化
ジョブシステムの実装ではありませんが、動作確認のためにある程度分かりやすい出力ができるように整備します。実行されたジョブのタイムスタンプを記録しておき、スレッドごとにタイムラインとして可視化するコードを紹介します。
using time_point = std::chrono::high_resolution_clock::time_point;
time_point now() {
return std::chrono::high_resolution_clock::now();
}
int duration(time_point start, time_point end) {
return std::chrono::duration_cast<std::chrono::milliseconds>(end - start)
.count();
}
struct JobData {
int name;
time_point start;
time_point end;
};
// タイムラインバーを作成する関数
std::string createTimeline(const std::vector<JobData>& jobData,
time_point globalStart, int globalDuration) {
// タイムラインを空白で初期化
std::string timeline;
for (const auto& data : jobData) {
timeline = std::string(globalDuration, ' ');
}
// ジョブ実行中の時刻にインデックスを挿入
for (const auto& data : jobData) {
auto begin = timeline.begin();
int start = duration(globalStart, data.start);
int end = duration(globalStart, data.end);
std::fill(begin + start, begin + end, data.name);
}
return timeline;
}
ランダムな長さのジョブをスケジュール
では、ランダムな長さのジョブをスケジュールして、動作を見てみます。
タイムスタンプの記録などは本題ではないので説明は省略させていただきます。
#include <random>
#include <iomanip>
#include <sstream>
#include <unordered_map>
int generateRandomInt(int minNum, int maxNum) {
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> distribution(minNum, maxNum);
return distribution(gen);
}
std::mutex dataMutex;
int main() {
// ジョブシステムの作成
unsigned int threadCount = 4;
JobSystem jobSystem(threadCount);
// ジョブデータの格納先
std::unordered_map<std::thread::id, std::vector<JobData>> jobData;
// 開始時間を記録
auto globalStart = now();
// いくつかのジョブをスケジュール
for (int i = 0; i < 20; ++i) {
// ジョブをスケジュール
jobSystem.schedule([i, &jobData]() {
auto thisId = std::this_thread::get_id();
// 開始時間を記録
JobData data;
data.name = 'A' + i;
data.start = now();
// ランダムな時間だけ待機する
int sleepTime = generateRandomInt(5, 10);
busyWait(std::chrono::milliseconds(sleepTime));
// 終了時間を記録
data.end = now();
// ジョブデータを保存
std::lock_guard<std::mutex> lock(dataMutex);
if (!jobData.contains(thisId)) {
jobData[thisId] = {};
}
jobData[thisId].push_back(data);
});
// 少し待機してから次のジョブをスケジュールする
busyWait(std::chrono::milliseconds(2));
}
// 全てのジョブが完了するのを待つ
jobSystem.waitForAll();
// 終了時間を記録
auto globalEnd = now();
int globalDuration = duration(globalStart, globalEnd);
std::cout << "Total duration: " << globalDuration << " ms" << std::endl;
// タイムラインを表示
for (const auto& [id, data] : jobData) {
// スレッドIDを空白埋めして5桁にする
std::ostringstream threadIdStream;
threadIdStream << std::setw(5) << std::setfill(' ') << id;
std::string threadIdStr = threadIdStream.str();
// タイムラインを表示
std::string timeline = createTimeline(data, globalStart, globalDuration);
std::cout << "Thread " << threadIdStr
<< " |" << timeline << "| ("
<< data.size() << " jobs)" << std::endl;
}
}
実行結果
実際に実行すると、以下のようなタイムラインが表示されます。ここでは20個のジョブが上手く分散されていることが分かります。
Total duration: 46 ms
Thread 28692 | CCCCCCEEEEE IIIIIIIIMMMMMMMMM RRRRRRRRR | (5 jobs)
Thread 22056 | DDDDDDDDDHHHHHHHHLLLLLLLLPPPPPP TTTTTTTT| (5 jobs)
Thread 13920 | BBBBBBBBFFFFFFFFFFKKKKK NNNNN QQQQQQQQQ | (5 jobs)
Thread 6608 |AAAAAAAAAA GGGGGGJJJJJJJJJJOOOOOOOOSSSSSSS | (5 jobs)
以上でステップ1の最も基本的なジョブシステムが完成しました。うまく動いているように見えますが、見落としている点はまだまだあるかもしれません。もし気が付いたことがあれば教えていただけるとありがたいです。
続きはこちらです。
Discussion