🧐

async/awaitの並列・直列実行を計測しながら理解する - JavaScript

2021/02/10に公開
2

はじめに

本記事は async/await 構文を使った非同期処理を、並列・直列を織り交ぜる際にどう記述すれば良いのかの理解を深めるのを目的としています。

諸注意

メソッドチェーンを使ったシンプルな記述方法が出来るように、既存の型に拡張メソッドを追加する方法をとっています、その為このまま利用出来るかはプロジェクトのガイドラインに適うかによります。
使わない方法を取るなら一度従来の方法で記述してみる事を推奨します、かなり読み辛くなるかと思います。

前知識

イメージとしては電流です、直列実行は電池の直列繋ぎ、並列実行は並列繋ぎをイメージすると分かりやすいかと思います。

直列実行

serial.png

直列実行は、主に処理結果の伝搬が必要な時に使います。
Promiseのthen()メソッドは次の処理を受け取り新たなPromiseを返すため、メソッドチェーンとして記述が出来ます。

const p = asyncHoge()
  .then(fooFromHoge) // 同期処理も可
  .then(asyncPiyoFromFoo);
console.log(await p);

並列実行

parallel.png

並列実行は、伝搬が必要ない処理を同時に実行する時に使います。
主に使われるのはPromise.all()メソッドで、これに複数のプロミスが含まれる iterable(配列など) が渡され、awaitにより解決済み結果を配列として受け取ります。

const p = Promise.all([
  asyncHoge(),
  foo(),
  asyncPiyo(),
]);
console.log(await p);

直列と並列の混在実行

serial_parallel.png

混在パターンはややこしい構造になります、直列は並列分岐する事もあり、並列は各要素をマージする事もあり、内部の各要素も再帰的な構造になります。

// コードは複雑になるため、拡張メソッドを使った方法を後述していきます。

計測コード

コードを書くにあたり、下記の拡張メソッドを実装しています。

Function.prototype.as_async = async function(...args) { return this(...args); };

Promise.prototype.map = async function(fmap) {
  const f = async x => fmap(await x);
  return [].concat(await this).map(f);
};
Promise.prototype.as_promise_all = async function() {
  return Promise.all([].concat(await this));
};

Array.prototype.as_async = async function() { return this; };

これにより要所要所での awaitawait Promise.all([...]) が取り除かれ、メソッドチェーンのみで直列と並列の混在のような複雑な構造も記述出来るようになります。

計測にあたり、非同期関数には実行時に任意のdelay値を設定しています。

// 実行する関数 ---
// ----------------
async function sleep(ms) {
  const p = new Promise(resolve => setTimeout(resolve, ms));
  await p;
}

async function asyncN(v) {
  await sleep(v*100);
  return v;
}
async function asyncx2(v) {
  await sleep(v*200);
  return v*2;
}
async function asyncx2inv(v) {
  await sleep((10-v)*200);
  return v*2;
}

function N(v) { return v; }
function x2(v) { return v*2; }

function sum(a) { return a.reduce((sum, elem) => sum + elem, 0); }
function mul(a) { return a.reduce((sum, elem) => sum * elem, 1); }

また、計測用のベンチマーク関数を用意しています。

// ベンチ用 ---
// ------------
async function bench(f) {
  const start = new Date();
  console.log('result:', await f());
  console.log('elapsed:', (new Date()).getTime() - start.getTime(), 'ms');
}

それでは直列、並列、直列と並列の混在の各処理を書いていきます。

直列

並列との比較をしやすいように、非同期処理の配列を用いて計測していきます、なので厳密には関数が各一つの並列処理を直列に繋いだものとなります。

//                        100 ms,    200 ms,    300 ms
const src_ary = () => [asyncN(1), asyncN(2), asyncN(3)];
// 4300 ms
await bench(async () =>
  src_ary().as_async().as_promise_all() //  100 ms,  200 ms,  300 ms
    .map(x2).as_promise_all()
    .map(asyncx2inv).as_promise_all()   // 1600 ms, 1200 ms,  800 ms
    .map(asyncx2).as_promise_all()      //  800 ms, 1600 ms, 2400 ms
    .map(x2).as_promise_all()
);

各並列処理の結果を待つ必要があるため、計測時間は各並列処理のそれぞれの最大値を加算したものになります、上記では 300 + 1600 + 2400 となるので4300 msの時間がかかっています。

並列

並列は、 Promise.all() による評価待ちを最後に一度だけ行います。

// 3500 ms
await bench(async () =>
  src_ary().as_async() //  100 ms,  200 ms,  300 ms
    .map(x2)
    .map(asyncx2inv)   // 1600 ms, 1200 ms,  800 ms
    .map(asyncx2)      //  800 ms, 1600 ms, 2400 ms
    .map(x2)
    .as_promise_all()
);

次々に各処理を実行出来るため、計測時間はそれぞれの合計時間の最大値になります、上記では max(2500, 3000, 3500) となるので3500 msの時間がかかっています。

直列と並列の混在

最後に、冒頭にあった直列と並列の混在実行を実装してみます。

// 4600 ms
await bench(async () =>
  // a1_1()
  asyncN(2)        // 200 ms
    // a1_2()
    .then(asyncx2) // 400 ms
    .then(x => // 4
      [
        [
          // a2A()
          asyncN(x) // 400 ms
            .then(y =>
              [
                // a2AA
                asyncN(y),        // 400 ms

                // a2AB_1
                asyncN(y)         // 400 ms
                  // a2AB_2
                  .then(asyncx2), // 800 ms
              ]
                .as_async()
                .as_promise_all()
                  // a3_1
                  .then(sum) // [4, 8] -> 12
                  // a3_2
                  .then(asyncx2)  // 2400 ms
            ),

          // a2B_1
          asyncN(x + 1) // 500 ms
            // a2B_2
            // -> [1, 2, 3, 4] 
            .then(y => {
              let a = [];
              for (let i = 1; i < y; i++) {
                a.push(i);
              }
              return a;
            })
            // a2B_3
            .map(y => y*y)
              .as_promise_all()
            .then(za => // [1, 4, 9, 16]
              [
                // a2BA_1
                sum.as_async(za) // -> 30
                  /// a2BA_2
                  .then(asyncx2inv), // 0 ms

                // a2BB
                mul.as_async(za), // -> 576
              ]
                .as_async()
                .as_promise_all()
            ),
        ]
          .as_async()
          .as_promise_all()
          // a5
          // NOTE: 直列と並列で `[ a3_2(), [a2BA_2(), a2BB()] ]` となるので
	  //       flattenして渡す
          .then(a => sum(a.flat())), // [24, [60, 576]].flat() -> 660

        // a2C
        asyncN(x + 2) // 600 ms
          .then(y =>
            [
              // a2CA_1
              asyncN(y)         // 600 ms
                // a2CA_2
                .then(asyncx2), // 1200 ms

              // a2CB
              asyncN(y),        // 600 ms
            ]
              .as_async()
              .as_promise_all()
              // a4
              .then(mul) // [12, 6] -> 72
              ,
          )
      ]
        .as_async()
        .as_promise_all(),
    )
    // a6
    .then(sum) // [660, 72] -> 732
);

こちらの計測時間は次のようになります。

serial_parallel_time.png

上記を参考に計算すると、

200 + 400 +
  max(
    max(
      400 + max(400, 400 + 800) + 0 + 2400,
      500 + 0 + 0 + max(0 + 0, 0)
    ) + 0,
    600 + max(600 + 1200, 600) + 0
  ) + 0

となり、4600 msの時間がかかっています。

また、ボトルネックやマージンがある事も分かります、ボトルネックとなるa3_2()に相当する処理が2400 msかかっているため、ここを短くすれば全体の実行時間が短くなると分かります、逆に言えばa2CA_2()に相当する処理には1200 msかかっていますが、ここをいくら短くしても全体の実行時間は変わりません。
また、a2A()から始まる直列の処理4000 msが変えられないのなら、例えばa4()に相当する処理には1600 msの余裕がある事になります。

全文コード

長くなったので外部リンクですが、掲載コードも含んだ全文と実行結果はこちら(Wandbox)で確認出来ます。

Discussion

rithmetyrithmety

then を含めたコールバックをできるだけ避けた場合下記のようになりますかね…
読みやすさには個人差があるとは思いますが
なぜ async/await が追加されたのかが分かる気がします

await bench(async () => {
    const a1_1 = await asyncN(2)
    const a1_2 = await asyncx2(a1_1)

    const a2a = async () => {
        const a2AA = await asyncN(a1_2)
        const r = await Promise.all([
            asyncN(a2AA),
            (async () => asyncx2(await asyncN(a2AA)))(),
        ])
        return asyncx2(sum(r))
    }

    const a2b = async () => {
        const a2b_1 = await asyncN(a1_2 + 1)
        const a = []
        for (let i = 1; i < a2b_1; i++) {
            a.push(i)
        }
        const a2 = await Promise.all(a.map(y => y * y))
        const r = await Promise.all([
            asyncx2inv(sum(a2)),
            mul(a2),
        ])
        return sum(r.flat())
    }

    const a2c = async () => {
        const y = await asyncN(a1_2 + 2)
        const r = await Promise.all([
            (async () => asyncx2(await asyncN(y)))(),
            asyncN(y),
        ])
        return mul(r)
    }

    const r = await Promise.all([a2a(), a2b(), a2c()])
    return sum(r)
})
wordiwordi

うおおありがとうございます。
一部ですが a5() 部分が省かれてました、加算なので結果は変わらないのですがコメントで返信しておきます。

読みやすさは僕も慣れだとは思います、決まりきった構文部分は本文にあるような構造定義から自動生成出来れば楽になるのに。
あとはソースコードから構造定義への逆変換も欲しいですね、例えばグラフ構造ならばC言語からGraphvizのライブラリを使ってdotファイルを生成してdot.exeでグラフ画像を作成するみたいな。

以下が修正版です、 /**/ が変更点です。

await bench(async () => {
    const a1_1 = await asyncN(2)
    const a1_2 = await asyncx2(a1_1)

    const a2a = async () => {
        const a2AA = await asyncN(a1_2)
        const r = await Promise.all([
            asyncN(a2AA),
            (async () => asyncx2(await asyncN(a2AA)))(),
        ])
        return asyncx2(sum(r))
    }

    const a2b = async () => {
        const a2b_1 = await asyncN(a1_2 + 1)
        const a = []
        for (let i = 1; i < a2b_1; i++) {
            a.push(i)
        }
        const a2 = await Promise.all(a.map(y => y * y))
        const r = await Promise.all([
            asyncx2inv(sum(a2)),
            mul(a2),
        ])
/**/    return r;
    }
    
/**/const a5 = async () => {
/**/    const r = await Promise.all([
/**/        a2a(),
/**/        a2b()
/**/    ]);
/**/    return sum(r.flat());
/**/}

    const a2c = async () => {
        const y = await asyncN(a1_2 + 2)
        const r = await Promise.all([
            (async () => asyncx2(await asyncN(y)))(),
            asyncN(y),
        ])
        return mul(r)
    }

/**/const r = await Promise.all([a5(), a2c()])
    return sum(r)
})