async/awaitの並列・直列実行を計測しながら理解する - JavaScript

7 min read読了の目安(約6400字 2

はじめに

本記事は async/await 構文を使った非同期処理を、並列・直列を織り交ぜる際にどう記述すれば良いのかの理解を深めるのを目的としています。

諸注意

メソッドチェーンを使ったシンプルな記述方法が出来るように、既存の型に拡張メソッドを追加する方法をとっています、その為このまま利用出来るかはプロジェクトのガイドラインに適うかによります。
使わない方法を取るなら一度従来の方法で記述してみる事を推奨します、かなり読み辛くなるかと思います。

前知識

イメージとしては電流です、直列実行は電池の直列繋ぎ、並列実行は並列繋ぎをイメージすると分かりやすいかと思います。

直列実行

serial.png

直列実行は、主に処理結果の伝搬が必要な時に使います。
Promiseのthen()メソッドは次の処理を受け取り新たなPromiseを返すため、メソッドチェーンとして記述が出来ます。

const p = asyncHoge()
  .then(fooFromHoge) // 同期処理も可
  .then(asyncPiyoFromFoo);
console.log(await p);

並列実行

parallel.png

並列実行は、伝搬が必要ない処理を同時に実行する時に使います。
主に使われるのはPromise.all()メソッドで、これに複数のプロミスが含まれる iterable(配列など) が渡され、awaitにより解決済み結果を配列として受け取ります。

const p = Promise.all([
  asyncHoge(),
  foo(),
  asyncPiyo(),
]);
console.log(await p);

直列と並列の混在実行

serial_parallel.png

混在パターンはややこしい構造になります、直列は並列分岐する事もあり、並列は各要素をマージする事もあり、内部の各要素も再帰的な構造になります。

// コードは複雑になるため、拡張メソッドを使った方法を後述していきます。

計測コード

コードを書くにあたり、下記の拡張メソッドを実装しています。

Function.prototype.as_async = async function(...args) { return this(...args); };

Promise.prototype.map = async function(fmap) {
  const f = async x => fmap(await x);
  return [].concat(await this).map(f);
};
Promise.prototype.as_promise_all = async function() {
  return Promise.all([].concat(await this));
};

Array.prototype.as_async = async function() { return this; };

これにより要所要所での awaitawait Promise.all([...]) が取り除かれ、メソッドチェーンのみで直列と並列の混在のような複雑な構造も記述出来るようになります。

計測にあたり、非同期関数には実行時に任意のdelay値を設定しています。

// 実行する関数 ---
// ----------------
async function sleep(ms) {
  const p = new Promise(resolve => setTimeout(resolve, ms));
  await p;
}

async function asyncN(v) {
  await sleep(v*100);
  return v;
}
async function asyncx2(v) {
  await sleep(v*200);
  return v*2;
}
async function asyncx2inv(v) {
  await sleep((10-v)*200);
  return v*2;
}

function N(v) { return v; }
function x2(v) { return v*2; }

function sum(a) { return a.reduce((sum, elem) => sum + elem, 0); }
function mul(a) { return a.reduce((sum, elem) => sum * elem, 1); }

また、計測用のベンチマーク関数を用意しています。

// ベンチ用 ---
// ------------
async function bench(f) {
  const start = new Date();
  console.log('result:', await f());
  console.log('elapsed:', (new Date()).getTime() - start.getTime(), 'ms');
}

それでは直列、並列、直列と並列の混在の各処理を書いていきます。

直列

並列との比較をしやすいように、非同期処理の配列を用いて計測していきます、なので厳密には関数が各一つの並列処理を直列に繋いだものとなります。

//                        100 ms,    200 ms,    300 ms
const src_ary = () => [asyncN(1), asyncN(2), asyncN(3)];
// 4300 ms
await bench(async () =>
  src_ary().as_async().as_promise_all() //  100 ms,  200 ms,  300 ms
    .map(x2).as_promise_all()
    .map(asyncx2inv).as_promise_all()   // 1600 ms, 1200 ms,  800 ms
    .map(asyncx2).as_promise_all()      //  800 ms, 1600 ms, 2400 ms
    .map(x2).as_promise_all()
);

各並列処理の結果を待つ必要があるため、計測時間は各並列処理のそれぞれの最大値を加算したものになります、上記では 300 + 1600 + 2400 となるので4300 msの時間がかかっています。

並列

並列は、 Promise.all() による評価待ちを最後に一度だけ行います。

// 3500 ms
await bench(async () =>
  src_ary().as_async() //  100 ms,  200 ms,  300 ms
    .map(x2)
    .map(asyncx2inv)   // 1600 ms, 1200 ms,  800 ms
    .map(asyncx2)      //  800 ms, 1600 ms, 2400 ms
    .map(x2)
    .as_promise_all()
);

次々に各処理を実行出来るため、計測時間はそれぞれの合計時間の最大値になります、上記では max(2500, 3000, 3500) となるので3500 msの時間がかかっています。

直列と並列の混在

最後に、冒頭にあった直列と並列の混在実行を実装してみます。

// 4600 ms
await bench(async () =>
  // a1_1()
  asyncN(2)        // 200 ms
    // a1_2()
    .then(asyncx2) // 400 ms
    .then(x => // 4
      [
        [
          // a2A()
          asyncN(x) // 400 ms
            .then(y =>
              [
                // a2AA
                asyncN(y),        // 400 ms

                // a2AB_1
                asyncN(y)         // 400 ms
                  // a2AB_2
                  .then(asyncx2), // 800 ms
              ]
                .as_async()
                .as_promise_all()
                  // a3_1
                  .then(sum) // [4, 8] -> 12
                  // a3_2
                  .then(asyncx2)  // 2400 ms
            ),

          // a2B_1
          asyncN(x + 1) // 500 ms
            // a2B_2
            // -> [1, 2, 3, 4] 
            .then(y => {
              let a = [];
              for (let i = 1; i < y; i++) {
                a.push(i);
              }
              return a;
            })
            // a2B_3
            .map(y => y*y)
              .as_promise_all()
            .then(za => // [1, 4, 9, 16]
              [
                // a2BA_1
                sum.as_async(za) // -> 30
                  /// a2BA_2
                  .then(asyncx2inv), // 0 ms

                // a2BB
                mul.as_async(za), // -> 576
              ]
                .as_async()
                .as_promise_all()
            ),
        ]
          .as_async()
          .as_promise_all()
          // a5
          // NOTE: 直列と並列で `[ a3_2(), [a2BA_2(), a2BB()] ]` となるので
	  //       flattenして渡す
          .then(a => sum(a.flat())), // [24, [60, 576]].flat() -> 660

        // a2C
        asyncN(x + 2) // 600 ms
          .then(y =>
            [
              // a2CA_1
              asyncN(y)         // 600 ms
                // a2CA_2
                .then(asyncx2), // 1200 ms

              // a2CB
              asyncN(y),        // 600 ms
            ]
              .as_async()
              .as_promise_all()
              // a4
              .then(mul) // [12, 6] -> 72
              ,
          )
      ]
        .as_async()
        .as_promise_all(),
    )
    // a6
    .then(sum) // [660, 72] -> 732
);

こちらの計測時間は次のようになります。

serial_parallel_time.png

上記を参考に計算すると、

200 + 400 +
  max(
    max(
      400 + max(400, 400 + 800) + 0 + 2400,
      500 + 0 + 0 + max(0 + 0, 0)
    ) + 0,
    600 + max(600 + 1200, 600) + 0
  ) + 0

となり、4600 msの時間がかかっています。

また、ボトルネックやマージンがある事も分かります、ボトルネックとなるa3_2()に相当する処理が2400 msかかっているため、ここを短くすれば全体の実行時間が短くなると分かります、逆に言えばa2CA_2()に相当する処理には1200 msかかっていますが、ここをいくら短くしても全体の実行時間は変わりません。
また、a2A()から始まる直列の処理4000 msが変えられないのなら、例えばa4()に相当する処理には1600 msの余裕がある事になります。

全文コード

長くなったので外部リンクですが、掲載コードも含んだ全文と実行結果はこちら(Wandbox)で確認出来ます。