🙆‍♀️

Promise.allSettledよりもPromise.allを使ったほうがよかった話

2022/10/16に公開
2

背景

バッチ処理で複数ユーザーにメール配信する場面があったのですが、Promise.all()Promise.allSettledの挙動の理解に時間がかかったので、メール配信することを想定して記事にまとめてみました。

実現したいこと

  • 複数ユーザーにメール配信する処理おいて、一部のユーザーにエラーが起きても、エラーが起きていないユーザーにはメール配信をする
  • エラーが起きたとき、配信ができなかったユーザーを特定してエラー内容を通知する

Promise.allとPromise.allSettledの違い

前提として、Promise.allとPromise.allSettledの違いを整理します。Promise.allとPromise.allSettledは両方とも、Promiseオブジェクトの配列を引数として受け取り、新しいPromiseオブジェクトの配列を返します。

Promise.allは、エラーが発生したら拒否(reject)されて、最初に発生したrejectの値を返します。

Promise.all() メソッドは入力としてプロミスの集合の反復可能オブジェクトを取り、入力したプロミスの集合の結果の配列に解決される単一の Promise を返します。この返却されたプロミスは、入力したプロミスがすべて解決されるか、入力した反復可能オブジェクトにプロミスが含まれていない場合に解決されます。入力したプロミスのいずれかが拒否されるか、プロミス以外のものがエラーを発生させると直ちに拒否され、最初に拒否されたメッセージまたはエラーをもって拒否されます。
公式ドキュメントより引用

一方で、Promise.allSettledはエラーが起きても、必ず成功のオブジェクトを返します。

Promise.allSettled() メソッドは、与えられたすべてのプロミスが履行されたか拒否された後に、それぞれのプロミスの結果を記述した配列オブジェクトで解決されるプロミスを返します。
公式ドキュメントより引用

Promise.allSettledの返り値は配列ですが、成功と失敗かどうかで配列の各要素は決まります。成功の場合は{status:"fulfilled",value:結果の値}で、失敗の場合は{status:"rejected",value:結果の値(エラーの内容)}というオブジェクトが返されます。
このことから、Promise.allSettledではエラーが起きても必ず実行されるのでエラーをキャッチする必要はないと考えまています。

実装

上記の内容を踏まえて、実装していきます。

準備

まず、メール送信する関数は仮想で作成します。メールアドレスが取得できなかった(emailがnull)場合、エラーが起こるようにします。

function sendMail(email: string | null): Promise<"SUCCESS" | "REJECTED"> {
  console.log(email);
  return new Promise((resolve, reject) => {
    if (email) {
      resolve("SUCCESS");
    } else {
      reject(new Error("REJECTED"));
    }
  });
}

Promise.allSettledでの実装

エラーが起きてもすべての処理は実行させたいので、今回実現したいことから考えるとPromise.allSettledを使用するほうが適切かと思います。したがって、まずはPromise.allSettledで実装します。

Promise.allSettled(
  [
    { id: 1, email: "test1@test.com" },
    { id: 2, email: null },
    { id: 3, email: "test2@test.com" },
  ].map(async (user) => {
    const results = await sendMail(user.email);
    //sendMail()内で出力されたログ
    //test1@test.com
    //null
    //test2@test.com
    return results;
  })
).then((results) => {
  //メール配信結果のログ
  console.log(results);
  //0: {status: 'fulfilled', value: 'SUCCESS'}
  //1: {status: 'rejected', reason: Error: REJECTED at http://localhost:3000/index.js:8:20 at new Promise (<anonymous>) at …}
  //2: {status: 'fulfilled', value: 'SUCCESS'}

  //エラーが起きた時の通知
  results.forEach((result) => {
    if (result.status === "rejected") console.log(result.reason);
    //Error: REJECTED
  });
});

Promise.allでの実装

前述の通り、Promise.allではエラーが発生したら拒否されるのですが、sendMail()でエラーをキャッチすればPromise.allでもすべて処理を実行することができます。

Promise.all(
  [
    { id: 1, email: "test1@test.com" },
    { id: 2, email: null },
    { id: 3, email: "test2@test.com" },
  ].map(async (user) => {
    return await sendMail(user.email).catch((error) => {
    //sendMail()内で出力されたログ
    //test1@test.com
    //null
    //test2@test.com
    
      //エラーが起きたときのログ
      console.log(`REJECTED_USER_${user.id}`);
      //REJECTED_USER_2
      console.log(error);
      //Error: REJECTED
      throw error;
    });
  })
)
  .then((result) => {
    //メール配信結果のログ(エラーが起きていない場合)
    console.log(result);
  })
  .catch((error) => {
    //メール配信結果のログ(エラーが起きた場合)
    console.log(error);
    //Error: REJECTED
  });

まとめ

最初、Promise.allSettledを使用したほうがいいと考えていました。しかし、Promise.allSettledで拒否されたときに{status:"rejected",value:結果の値(エラーの内容)}というオブジェクトが返されるので、メール配信されなかったユーザーを特定できないことが問題でした。
Promise.allでもsendMailでエラーをキャッチすれば、すべての処理を実行できることに気付いたので今回のケースではPromise.allを使用したほうがいいという結論に至りました。
もし、他にもいい実装方法があればぜひ教えてください。
また、誤った表記があればご指摘いただけると幸いです。

GitHubで編集を提案

Discussion

nap5nap5

もし、他にもいい実装方法があればぜひ教えてください。

いい実装かどうかは判断をゆだねますが、ちょっとデモを書いてみました。エラーが起きた場合に対する本件でいうところのメール再送はいずれかの場合でも可能なんじゃないかとおもいます。FakeエラーでstoryIDが偶数の場合にエラー発生するようにしております。

共通ロジック

import { expect, test } from "vitest";
import { cluster, parallel, sleep } from "radash";
import { default as fetch } from "cross-fetch";
import { Err, Ok, Result, err, ok } from 'neverthrow'

const demoData = {
  by: "bpierre",
  descendants: 48,
  id: 37361050,
  kids: [
    37362222, 37363734, 37362079, 37362429, 37366919, 37366920, 37363171,
    37361240, 37361524, 37361316, 37364299, 37363233, 37361613, 37364963,
    37364330, 37361551, 37365637, 37362068, 37363830, 37361140, 37364871,
    37364746, 37363775,
  ],
  score: 156,
  time: 1693659009,
  title: "Semantic Zoom",
  type: "story",
  url: "https://alexanderobenauer.com/labnotes/038/",
};

type StoryData = typeof demoData;

const fetchStory = async (storyId: number): Promise<Result<StoryData, Error>> => {
  try {
    if (storyId % 2 === 0) return err(new Error("Something went wrong...", { cause: { storyId } }))
    const response = await fetch(
      `https://hacker-news.firebaseio.com/v0/item/${storyId}.json?print=pretty`
    );
    const json = await response.json();
    return ok(json as StoryData);
  } catch (error) {
    return err(error as Error)
  }
};

サマリレポートで結果をKeepする場合

const convolveResultWithSummary = async <T, E>(promises: Promise<Result<T, E>>[]) => {
  const results = await Promise.all(promises);
  const errors: E[] = [];
  const values: T[] = [];

  results.forEach(result => {
    result.match((d) => values.push(d), (e) => errors.push(e));
  });

  return {
    summary: {
      err: err(errors),
      ok: ok(values)
    }
  }
}

const doFetchParallelWithSummary = <T = unknown, D = unknown, E = Error>(
  input: ReadonlyArray<T>,
  fn: (d: T) => Promise<Result<D, E>>,
  maximumTaskNumber: number = 7,
) => {
  const pList = cluster(input, maximumTaskNumber).map(params => params.map(fn))
  return parallel(maximumTaskNumber, pList, convolveResultWithSummary);
};

test("Neverthrow doFetchParallelWithSummary", async () => {
  const storyIds = [
    37367013, 37361947, 37361409, 37363362, 37358063, 37359193, 37364290,
    37366341, 37361053, 37356119, 37359310, 37362132, 37359250, 37364388,
    37349906, 37362740, 37365485, 37366678, 37364124, 37365468, 37361094,
    37364624, 37361050, 37362151,
  ];
  const results = await doFetchParallelWithSummary(storyIds, fetchStory, 13)
  const summaryData = results.map(d => d.summary)
  const okData = Result.combine(summaryData.map(d => d.ok))._unsafeUnwrap().flat()
  const errData = Result.combineWithAllErrors(summaryData.map(d => d.err))._unsafeUnwrapErr().flat()
  const report = {
    summary: {
      okCount: okData.length,
      errCount: errData.length,
    },
    detail: {
      okData,
      errData
    }
  }
  expect(report.summary).toStrictEqual({
    okCount: 10,
    errCount: 14,
  })
  console.log(report)
});

Fail Firstでエラーが一件でもある場合

const convolveResult = async <T, E>(promises: Promise<Result<T, E>>[]): Promise<Result<T[], E[]>> => {
  const results = await Promise.all(promises);
  const errors: E[] = [];
  const values: T[] = [];

  results.forEach(result => {
    result.match((d) => values.push(d), (e) => errors.push(e));
  });

  if (errors.length > 0) {
    return err(errors);
  } else {
    return ok(values);
  }
}

const doFetchParallel = <T = unknown, D = unknown, E = Error>(
  input: ReadonlyArray<T>,
  fn: (d: T) => Promise<Result<D, E>>,
  maximumTaskNumber: number = 7,
) => {
  const pList = cluster(input, maximumTaskNumber).map(params => params.map(fn))
  return parallel(maximumTaskNumber, pList, convolveResult);
};

test("Neverthrow doFetchParallel", async () => {
  const storyIds = [
    37367013, 37361947, 37361409, 37363362, 37358063, 37359193, 37364290,
    37366341, 37361053, 37356119, 37359310, 37362132, 37359250, 37364388,
    37349906, 37362740, 37365485, 37366678, 37364124, 37365468, 37361094,
    37364624, 37361050, 37362151,
  ];
  const results = await doFetchParallel(storyIds, fetchStory, 9)
  const result = Result.combineWithAllErrors(results)
  if (result.isErr()) return console.log(result.error.flat())
  const data = result.value.flat()
  console.log(data)
  expect(data.length).toStrictEqual(10);
});

サマリレポートで結果をKeepしつつ並列リクエスト間隔を間引く場合


const convolveResultWithSummary = async <T, E>(promises: Promise<Result<T, E>>[]) => {
  const results = await Promise.all(promises);
  const errors: E[] = [];
  const values: T[] = [];

  results.forEach(result => {
    result.match((d) => values.push(d), (e) => errors.push(e));
  });

  return {
    summary: {
      err: err(errors),
      ok: ok(values)
    }
  }
}

const getRandomDelay = (baseDelay: number, variance: number = 1000): number => {
  return baseDelay + Math.floor(Math.random() * variance);
}

const doFetchParallelWithSummaryUsingInterval = async <T = unknown, D = unknown, E = Error>(
  input: ReadonlyArray<T>,
  fn: (d: T) => Promise<Result<D, E>>,
  maximumTaskNumber: number = 7,
  intervalMillSeconds: number = 700,
) => {
  const batches = cluster(input, maximumTaskNumber);
  let allResults: {
    summary: {
      err: Err<never, E[]>;
      ok: Ok<D[], never>;
    }
  }[] = [];

  for (const batch of batches) {
    console.log(`Starting requests at ${new Date().toISOString()}`);
    const batchResults = await convolveResultWithSummary(batch.map(fn));
    console.log(`Finished requests at ${new Date().toISOString()}`);
    allResults = allResults.concat(batchResults);
    await sleep(getRandomDelay(intervalMillSeconds));
  }

  return allResults;
};

test("Neverthrow doFetchParallelWithSummaryUsingInterval", async () => {
  const storyIds = [
    37367013, 37361947, 37361409, 37363362, 37358063, 37359193, 37364290,
    37366341, 37361053, 37356119, 37359310, 37362132, 37359250, 37364388,
    37349906, 37362740, 37365485, 37366678, 37364124, 37365468, 37361094,
    37364624, 37361050, 37362151,
  ];
  const results = await doFetchParallelWithSummaryUsingInterval(storyIds, fetchStory, 13, 3_200)
  const summaryData = results.map(d => d.summary)
  const okData = Result.combine(summaryData.map(d => d.ok))._unsafeUnwrap().flat()
  const errData = Result.combineWithAllErrors(summaryData.map(d => d.err))._unsafeUnwrapErr().flat()
  const report = {
    summary: {
      okCount: okData.length,
      errCount: errData.length,
    },
    detail: {
      okData,
      errData
    }
  }
  expect(report.summary).toStrictEqual({
    okCount: 10,
    errCount: 14,
  })
  console.log(report)
});

demo code.

https://codesandbox.io/p/sandbox/quirky-mclaren-2nds66?file=/src/index.test.ts:1,1

簡単ですが、以上です。

masatotezukamasatotezuka

返信遅くなりすみません!
コメントありがとうございます!!