🕌

Typescriptで実装する!高効率なジョブキューによる大量アイテム並列処理

2023/04/20に公開

こんにちは、株式会社エムアイ・ラボ のエンジニアです。

弊社の業務では大量のデータを外部と通信することが多く、少しでも効率良く処理したいです。
今回はそんなときに利用できる、大量のデータを高効率で捌くためのJob queueを使った実装方法をご紹介します。
弊社のメイン開発言語であるTypescriptを使用します。

なぜJob queue?

例えばAWSのDynamoDBに100万件のデータを書き込むとします。
一番手っ取り早い実装方法は以下のようになります。

const promises = [];
const arr = []; // 100万件のデータが入った配列
arr.forEach(data => {
  promises.push(/** DynamoDBへの書き込み処理 **/);
});
await Promise.all(promises); // 同期する

しかしこの実装では以下の問題が発生します。

  • 可能な限り最速で書き込み処理を投げるため、スループット制限に引っかかりリクエストがスロットリングされる
  • 上記と同じ理由で無駄な通信が発生する
  • システムの負荷が高くなり、処理速度が低下する
  • 処理のスケジューリング・実行状況の監視・エラーのハンドリングなど、複雑な処理管理を入れづらい

上記の問題をそのまま解決出来るのがJob queueです。
その特徴を以下に記します。

  • 一度に大量の並行処理を投げるのではなく、コントロールされた形で非同期処理を実行することで処理の管理と制御を容易に出来る
  • 適切なスロットル制御をかけて実行できるのでシステムの負荷を一定に保って安定した処理を実行できる
  • 分散システムで処理を複数のノードに分散して実行出来る。その場合はノードの故障やネットワークの遅延などの問題が発生する可能性があるが、そのハンドリングも容易である

このような非同期で実行するタスクを管理し、効率的かつ堅牢に実行できるのがJob queueです。

実装方法

今回はバッチ処理を想定したロジックを組みました。
あらかじめ処理をしたい配列が決まっているパターンです。

全体の流れ

[処理する配列を準備] →
[配列内を全てエンキュー] →
[A: ジョブをデキューして処理を開始] →
[B: ジョブを処理中配列に入れる] →
[C: 処理が終了したら処理中配列からタスクを除く] →
[A→B→Cをキューと処理中配列が空になるまで繰り返す] →
[キューと処理中配列が空になったら終了]

ソースコード

/**
 * キューのクラス
 */
class Queue<T> {
  private items: T[] = [];

  public enqueue(item: T): void {
    this.items.push(item);
  }

  public dequeue(): T | undefined {
    return this.items.shift();
  }

  public isEmpty(): boolean {
    return this.items.length === 0;
  }

  public length(): number {
    return this.items.length;
  }
}

/**
 * 実際に処理をするクラス
 */
class ProcessClass {

  // 処理を呼び出す
  public async processLotsOfData() {
    // 処理したいデータが入った配列を作成
    const arr: string[] = [];
    for (let i = 0; i < 1000; i++) {
      arr.push(`process_${i}`);
    }
    await this.processQueue(arr);
    console.log('処理終了');
  }

  /**
   * キューにタスクを入れて、それを実行する
   * @param tasks
   * @private
   */
  private async processQueue<T>(tasks: T[]) {
    const queue = new Queue<T>();

    // キューに全件入れる
    for (let i = 0; i < tasks.length; i++) {
      queue.enqueue(tasks[i]);
    }

    // キューを同時並行で処理する
    await this.processQueueConcurrently(queue, 100);
  }

  private async processQueueConcurrently<T>(
    queue: Queue<T>,
    maxConcurrentTasks: number
  ) {
    const runningTasks: Promise<void>[] = [];
    while (!queue.isEmpty() || runningTasks.length > 0) {
      while (!queue.isEmpty() && runningTasks.length < maxConcurrentTasks) {
        const item = queue.dequeue();
        if (item !== undefined) {
          const task = this.processItem(item);
          runningTasks.push(task);
        }
      }
      await Promise.race(runningTasks);
      runningTasks.shift();

      // 500個処理したら進捗を出力
      if (queue.length() % 500 === 0 && !queue.isEmpty()) {
        // process.stdout.write('\r' + queue.length()); // Node.jsの場合
        console.log(queue.length()); // ブラウザでの実行の場合
      }
    }
  }

  /**
   * 実際の処理
   * @param item
   * @private
   */
  private async processItem<T>(item: T) {
    // 実際の処理したい内容を書く
    // エラーハンドリングなどもここに書く
    console.log(item);
  }

}

// 処理の実行
const main = async () => {
  const process = new ProcessClass();
  process.processLotsOfData();
}
main();

Typescriptのplaygroundなどに上記のコードをそのまま貼り付けて動くと思いますが、processItem()には実際にやりたい処理を書いてください。
上記の場合では最大の並行処理数は100ですが、余裕がある場合は上げられます。

※追記(2023/04/17)

弊社CEOから汎用的に使いやすいコードを提供してもらいましたので追記します。
上記のコードとは以下の点で改善されています。

  • 戻り値が同期出来る
  • 戻り値の順番がリクエスト順と同じ
  static async processQueueConcurrently<T>(
    taskFns: (() => Promise<T>)[],
    maxConcurrentTasks: number = 1,
  ): Promise<T[]> {
    return this._processQueueConcurrently(
      taskFns, maxConcurrentTasks, taskFns.length
    ).then((res) => res
      .sort((a, b) => { return a.index < b.index ? -1 : 1; })
      .map(r => r.result)
    );
  }
  
  private static async _processQueueConcurrently<T>(
    taskFns: (() => Promise<T>)[],
    maxConcurrentTasks: number,
    taskCount: number
  ): Promise<{ index: number, result: T }[]> {
    const promises: Promise<{ index: number, result: T }[]>[] = [];
    while (taskFns.length > 0 && promises.length < maxConcurrentTasks) {
      const index = (taskCount - taskFns.length);
      promises.push(taskFns.shift()().then((result: T) => ({ index, result })).then((result) => {
        return this._processQueueConcurrently(taskFns, 1, taskCount).then(results => {
          results.push(result);
          return results.flat(1);
        });
      }));
    }
    return Promise.all(promises).then(
      (res) => {
        return res.flat(1);
      });
  }

使い方

// 行いたい非同期処理
const proc = (_a: number, _b: string): Promise<string> => {
  return new Promise((resolve) => {
    setTimeout(() => {
      resolve(_b);
    }, _a)
  });
};

// 非同期処理を配列化
// ポイント! () => (args) => Promise.reoslve(xxx) の形式の配列を作成
const queue: (() => Promise<string>)[] = [];
for (let i = 0; i < 1000; i++) {
  queue.push(() => proc(i, `${i}`))
}

// 並行処理を実行
// 全ての完了も同期可能
return this.processQueueConcurrently<string>( // さきほどのコードを呼び出す
  queue, 20
).then((res) => {
  console.log(res.sort());
});

活用例

Job queueが有効活用出来る例を以下に挙げていきます。
バッチ処理以外で使用する場合は上記のソースコードを少し作り変える必要があります。
どのように作り変えるかは、申し訳ございませんが今回は言及しません。

バッチ処理

一定数の処理をまとめて実行するバッチ処理に適しています。
大量のデータを処理するとき、タスクをキューに投げて非同期で処理することでパフォーマンスを向上させられます。

ワーカープール

ワーカープールは複数のワーカーがジョブキューからタスクを取得して処理を実行するシステムで、非同期処理を分散して実行するために使用出来ます。
Job queueはこのワーカープールと一緒に使用されることが多いです。

メッセージングシステム

メッセージングシステムにおいて、メッセージを受信したら特定の処理を実行するように設定する場合があります。
その場合、ジョブキューを使用して、メッセージを受信したタイミングでタスクをキューに投げて非同期で処理できます。

まとめ

今回は並行処理を制御しつつ効率的に実行したいときに利用できるロジックを紹介しました。
大量のデータを捌く際に活用してください。


採用情報

エムアイ・ラボでは一緒に働くメンバーを募集しています。
ぜひお気軽にご連絡ください!

Wantedlyアカウントをお持ちでない方はTwitterのDMからでも大丈夫です。
お待ちしております。

https://www.wantedly.com/companies/milab-inc

Discussion