🚥

JavaScriptの非同期処理をじっくり理解する (4) AbortSignal, Event, Async Context

36 min read 7

対象読者と目的

  • 非同期処理の実装方法は知っているが、仕組みを詳しく知らないのでベストプラクティスがわからないときがある
  • 実行順序の保証がよくわからないので自信をもってデプロイできない変更がある
  • より詳しい仕組みを理解することでより計画的な実装をできるようになりたい

という動機で書かれた記事です。同様の課題を抱える人を対象読者として想定しています。

目次

  1. 実行モデルとタスクキュー
  2. Promise
  3. async/await
  4. AbortSignal, Event, Async Context
  5. WHATWG Streams / Node.js Streams (執筆中)
  6. 未定

中止処理

並行処理ではしばしば実行中の処理を中止したい場合があります。

古典的なキャンセル処理

Webブラウザ/Node.jsともに、 setTimeout の中止が可能です。

const timeout = setTimeout(() => {
  console.log("It takes longer to load...");
}, 1000);
// setTimeoutしていた処理をキャンセルする
window.addEventListener("load", () => clearTimeout(timeout));

XMLHttpRequestはfetchが浸透される前によく使われていたWebブラウザ用の通信APIです。XMLHttpRequestもrequestオブジェクトに対して中止が可能です。

const req = new XMLHttpRequest();
req.addEventListener("load", () => { ... });
req.open("GET", "https://example.com/endpoint1.json");
req.send();

button.onclick = () => {
  // endpoint1.jsonへのリクエストをキャンセルする
  req.abort();
};

jQueryのajaxが返すjqXHRオブジェクトもXMLHttpRequestの一種であるため、abortが可能です。

const req = $.ajax("https://example.com/endpoint1.json");
req.abort();

AbortController / AbortSignal

この種の中止処理を統一したAPIで行うのがAbortController / AbortSignalです。Webブラウザにおける挙動はDOM仕様の一部として定義されていますNode.jsにも互換と思われる機能が提供されています

AbortController / AbortSignal自体はシンプルなメッセージキューです。通知用のAbortControllerと受信用のAbortSignalに分けられています。

// 通知用のAbortControllerと受信用のAbortSignalはペアで作られる
const controller = new AbortController(); // 通知用のハンドル
const { signal } = controller; // 受信用のハンドル

通知側ができるのは abort() だけです。

// 中止を通知する
controller.abort();

通知の結果は aborted フラグと abort イベントで受け取ることができます。

// すでに中止されているかどうかはabortedフラグで受け取れる
if (signal.abort) throw new Error("Aborted: ...");
// 将来の中止イベントはabortイベントで受け取れる
signal.addEventListener("abort", () => { ... });

核となるAPIはこれだけです。この他にabort済みのAbortSignalを生成する AbortSignal.abort() が定義されていますが、いずれにせよAPIは非常にシンプルだと言えます。

注意点として、AbortSignalはEventTargetの一種であるため、すでにabortしたsignalにabortイベントを後から登録してもイベントハンドラは呼ばれません。Promiseとは違うので注意が必要です。

AbortSignalを使えるAPI (Webブラウザ)

非同期APIを中断可能にするにはAbortSignal型のオプションを受け取るようにします (名前は signal とすることが推奨されています)。Webブラウザでは以下のようなAPIがAbortSignalを受け取ります。

// DOMで定義されているイベント全般
// abortするとイベントリスナが削除される
button.addEventListener("click", () => { ... }, { signal });

// fetch API
// abortすると通信を中断し、Promiseを `new DOMException("AbortError")` でrejectする
const resp = await fetch("https://example.com/endpoint1.json", { signal });

fetch APIに限らず、Promiseを返すAPIがabortされたときは new DOMException(..., "AbortError") を返すことが推奨されています。

AbortSignalを使えるAPI (Node.js)

以下のようなAPIがAbortSignalをサポートしています。

AbortError

「操作が中止された」という結果は通常、Rejection (=非同期例外) で表現されます。DOM規格ではこのときAbortError という名前をもつ DOMException を返すことを推奨しています

// ブラウザ環境でのAbortErrorの投げ方
if (signal.aborted) {
  // 第二引数をAbortErrorにする
  throw new DOMException("The operation was aborted", "AbortError");
}

受け取り側は以下のようにしてAbortErrorを判別できます。

try {
  await asyncOperation({ signal });
} catch (e) {
  if (e.name === "AbortError") {
    // 中止されたときの処理
  } else {
    throw e;
  }
}

Node.jsには非公開APIの DOMException クラス [1] と、同じく非公開APIの AbortError クラスが存在します。これらは以下のように使い分けられます。

  • Web Crypto API および Web Streams API は AbortError という名前をもつ DOMException のインスタンスを投げます。
  • それ以外の (Node.js固有の) APIは AbortError クラスのインスタンスを投げます。

いずれも name プロパティに "AbortError" という値が入っているため、中止判定は同様のコードで行えます。

イベントハンドラを中止可能なPromiseにする

イベントハンドラをPromiseにするには以下のようなパターンが出てきます。

function waitForClick(button) {
  return new Promise((resolve) => {
    function handler() {
      resolve();
      button.removeEventListener("click", handler);
    }
    button.addEventListener("click", handler);
  });
}

addEventListenerはAbortSignalを受け取れますが、これを使うように上記のパターンを改良することもできます。

function waitForClick(button, options) {
  const { signal } = options ?? {};
  return new Promise((resolve) => {
    function handler() {
      resolve();
      button.removeEventListener("click", handler);
    }
    function abort() {
      reject(new DOMException("aborted", "AbortError"))
    }
    button.addEventListener("click", handler, { signal });
    signal.addEventListener("abort", abort);
    // すでにabortedな場合に備えてこの分岐も必要
    if (signal.aborted) abort();
  });
}

AbortSignalのネスティング

AbortSignalは、基本的に同じオブジェクト全ての関連する非同期処理に適用して使います。

しかし、キャンセル可能な一連の処理の一部だけを別途キャンセルしたい場合は、既存APIの組み合わせでAbortSignal間の親子関係を導入することもできます。

const childController = new AbortController();
const { signal: childSignal } = childController;
parentSignal.addEventListener("abort", () => childController.abort());

// 以下 childSignal を使った処理

上の例では childController をabortすれば一部だけをキャンセルできます。一方、呼び出し元で parentSignal がabortされた場合も、この部分がきちんとキャンセルされます。

例題: abort可能なsleepの実装

export interface SleepOptions {
    signal: AbortSignal;
}
export function sleep(timeout: number, options: Partial<SleepOptions> = {}): Promise<void> {
    const { signal } = options;
    return new Promise((resolve, reject) => {
        if (signal?.aborted) {
            reject(newAbortError());
            return;
        }
        function onTimeout() {
            resolve();
            signal?.removeEventListener("abort", onAbort);
        }
        function onAbort() {
            reject(newAbortError());
            clearTimeout(timeoutHandle);
        }
        const timeoutHandle = setTimeout(onTimeout, timeout);
        signal?.addEventListener("abort", onAbort);
    });
}
function newAbortError() {
    if (typeof DOMException === "function") {
        // browser
        return new DOMException("The operation was aborted", "AbortError");
    } else {
        // Node.js
        return Object.assign(new Error("The operation was aborted"), { name: "AbortError" });
    }
}

EventTarget / EventEmitter

EventTargetとEventEmitterはどちらもイベントリスナーの管理を行うための仕組みですが、二者の間に互換性はありません。

EventTarget EventEmitter
登録 addEventListener addListener
on
prependListener
1度だけ実行 {once: true} addOnceListener
prependOnceListener
削除 removeEventListener removeListener
off
発火 dispatchEvent(event) emit(type, ...args)
コールバック Function | EventListener Function
バブリング ✔️
デフォルト挙動を止める方法 preventDefault 常に止められる

EventTargetはDOMで定義されている仕様、つまりWebブラウザの機能です。またNode.jsにも (AbortSignalに合わせて) v14.5.0から導入されました

一方EventEmitterはNode.js独自機能です。同等機能を提供するnpmパッケージが多数存在するため、これらを使えばWebブラウザでもEventEmitterを使うことが可能です。

元々はEventTargetをユーザー定義クラスで継承することができなかったようですが、現在は継承可能です。そのためEventEmitterを使う意義は薄くなりつつあると考えられます。

ℹ️ 本稿ではNode.jsが提供するEventEmitterの仕様を解説します。互換ライブラリは動作が異なる可能性があります。

発火タイミング

  • Webブラウザがユーザー入力に起因して発行するイベントはuser interaction task sourceに紐付けられたタスクとしてエンキューされます。
  • EventTarget.prototype.dispatchEvent はイベントを同期的に処理します。
  • EventEmitter.prototype.emit はイベントを同期的に処理します。

イベントハンドラの実行順

EventTarget, EventEmitter ともに、イベントハンドラは登録された順番に実行されます。戻り値がPromiseであってもPromiseの終了は待機せずに次のハンドラの実行が開始されます。

const target = new EventTarget();
target.addEventListener("foo", async () => { console.log(1); await null; console.log(2); });
target.addEventListener("foo", async () => { console.log(3); await null; console.log(4); });
target.dispatchEvent(new CustomEvent("foo"));
// => 1, 3, 2, 4
const em = new EventEmitter();
em.on("foo", async () => { console.log(1); await null; console.log(2); });
em.on("foo", async () => { console.log(3); await null; console.log(4); });
em.emit("foo");
// => 1, 3, 2, 4

キャプチャとバブリング

(非同期処理とあんまり関係なくなってきていますが、せっかくなので説明します)

DOMのイベント処理では特定の要素だけではなく、その子孫要素からのイベントも受け取りたいというユースケースがあります。EventTargetはそのために「キャプチャとバブリング」という仕組みを実装しています。

EventTargetは3つのphaseを実装しています。UI Eventsの図にもありますが、3つのphaseとは

  1. capture phase: Window(祖先)からターゲット(子孫)に向かって順にイベントを発火させる。
  2. at-target phase: ターゲットにイベントを発火させる。
    • 実際にはcapture handlerを実行するat-target phaseとbubble handlerを実行するat-taret phaseの2つに分かれている。
  3. bubble phase: ターゲット(子孫)からWindow(祖先)に向かって順にイベントを発火させる。

です。 addEventListenercapture オプションによって発火条件が変化します。

capture=false
(デフォルト)
capture=true
capture phase ✔️
at-target phase (前半) ✔️
at-target phase (後半) ✔️
bubble phase ✔️

イベントハンドラ内でEventオブジェクトに対して stopPropagation() を呼ぶと、プロパゲーションが止まります。現在の要素に対するイベントハンドラは登録順が後であっても全て実行されますが、次の要素以降のイベントハンドラは呼ばれません。 stopPropagation() のかわりに stopImmediatePropagation() を使うと、現在の要素に対する後続のイベントハンドラの発火も止めることができます。

キャプチャ/バブリングを含めたイベントディスパッチの詳細はDOM §2.9 Dispatching Eventsに書かれています。キャプチャ/バブリングは同期的に (1つのタスクの中で) 行われます。

EventEmitterにはキャプチャ/バブリングの概念はありません。またNode.jsのEventTargetも同機能の必要がないため、形式的にAPIが実装されているのみです。

ハンドラの戻り値と副作用

EventTargetに対して発行されるイベントには、デフォルトの振舞いを持つものがあります。 (リンクのクリック、submitボタンのクリック、ドラッグ、スクロールなど)

イベントハンドラがEventオブジェクトの preventDefault() を呼ぶと、これらのデフォルトの振舞いがキャンセルされます。また実装依存の挙動として、イベントハンドラの戻り値を解釈してpreventDefaultを行うブラウザもあるとされています

Many implementations additionally interpret an event listener’s return value, such as the value false, to mean that the default action of cancelable events will be cancelled (though window.onerror handlers are cancelled by returning true).

したがってこれらのデフォルトの振舞いは、全てのイベントハンドラの処理が終わってからでないと発火しません。仮にイベントハンドラ自体が重い処理を行っていなくても、JavaScriptのタスクキューが掃かれるのを待つこと自体がオーバーヘッドであることに注意が必要です。特にスクロールなどのイベントではユーザー体験への影響があります

そのため、 preventDefault() を呼ばないことをイベントハンドラの登録時にあらかじめ宣言する仕組みが存在しています。

// passive handler
elem.addEventListener("wheel", () => { ... }, { passive: true });

passive: true つきで登録されたイベントハンドラは preventDefault() を呼ぶことができません。自身と祖先要素のハンドラが全てpassiveの場合、Webブラウザはタスクキューの処理を待たずにデフォルトの振舞いを実行することが可能です。

手動で発行したイベントの場合は、戻り値を見てデフォルト挙動を実装することができます。

if (eventTarget.dispatchEvent(new CustomEvent("foo", { cancelable: true }))) {
  // preventDefault() が呼ばれなかったら実行する
  someDefaultBehavior();
}

一方EventEmitterはイベントハンドラが存在するときに emit() がtrueを返すため、この戻り値を用いてデフォルト挙動が実装されることがあります。 preventDefault() のように動的にデフォルト挙動をキャンセルする仕組みはありません。

if (!eventEmitter.emit("foo")) {
  // イベントハンドラが存在しなかったら実行する
  someDefaultBehavior();
}

エラーハンドリング

EventTargetのイベントハンドラがエラーをthrowした場合は捕捉されなかった例外と同様の扱いで処理されます。イベントの処理は停止せず、後続のイベントハンドラはエラーがない場合と同様に呼び出されます。

  1. Call a user object's operation with listener's callback, "handleEvent", « event », and event's currentTarget attribute value. If this throws an exception, then:
// Web browser
const target = new EventTarget();
target.addEventListener("foo", () => { console.log("foo"); throw new Error("test"); });
target.addEventListener("foo", () => { console.log("bar") });
target.dispatchEvent(new CustomEvent("foo"));
console.log("baz");
// => foo, Uncaught Error: test, bar, baz

Node.jsのEventTargetは少し仕様が異なり、エラー処理はその場で行われるのではなく process.nextTick により遅延されます

// Node.js
const target = new EventTarget();
target.addEventListener("foo", () => { console.log("foo"); throw new Error("test"); });
target.addEventListener("foo", () => { console.log("bar") });
target.dispatchEvent(new Event("foo"));
console.log("baz");
// => foo, bar, baz, Uncaught Error: test

一方、EventEmitterはイベントハンドラの例外を捕捉せずそのままemit元に投げ返します。後続のイベントハンドラの実行も行われません。

const em = new EventEmitter();
em.on("foo", () => { console.log("foo"); throw new Error("test"); });
em.on("foo", () => { console.log("bar"); });
em.emit("foo");
console.log("baz");
// => foo, Uncaught Error: test
// (bar, bazは表示されない)

非同期エラーハンドリング

イベントハンドラがPromiseを返したときに特別な挙動が規定されていることがあります。まずEventEmitterに captureRejections: true を設定すると、Promiseがrejectされたときにerrorイベントを発生させるようになります。

const em = new EventEmitter({ captureRejections: true });

Promiseのrejectがハンドルされるようになるだけで、実行順序への影響はありません。

const em = new EventEmitter({ captureRejections: true });
em.on("foo", async () => { console.log(1); await null; console.log(2); });
em.on("foo", async () => { console.log(3); await null; console.log(4); });
em.emit("foo");
// => 1, 3, 2, 4

また、Webブラウザとは異なり、Node.jsのEventTargetはPromiseのrejectも例外と同様に扱います

Promiseへの変換

EventTarget / EventEmitterはコールバックベースのAPIのため、適切にコールバックを登録することでPromise化することができます。Node.jsにはそのためのユーティリティー関数 once, on が用意されています。

import events from "node:events";

// someEventNameの発火を1度だけ待つ
// イベントハンドラは必要なくなったら削除される
const eventArgs = await events.once(eventEmitter, "someEventName");
// someEventNameの発火を繰り返し処理する
// forループ脱出時にイベントハンドラは削除される
for await (const eventArgs of events.on(eventEmitter, "someEventName")) {
  // ...
}

これらのユーティリティー関数は指定したイベントだけではなく "error" イベントにもハンドラを登録し、Promiseをrejectするのに使います。

AsyncHooks, AsyncLocalStorage

第2回で言及したように、JavaScriptには非同期呼び出しのスタックや擬似的なスレッドのようなものがあるように見えますが、それを明示的に扱うための仕組みはありません。(第3回で説明した非同期スタックトレースはブラウザ側のヒューリスティックスとして実装されています)

そのため、非同期版のスレッドローカル変数のようなことをするには、基本的にはアプリケーションコードでcontext変数を明示的に伝搬する必要があります。しかしこれはパフォーマンスモニタリングなどミドルウェア用途でスレッドローカル変数のようなことを行いたいときには困りものです。

この問題を解決するためにNode.jsはAsyncHooksとAsyncLocalStorageという2つのAPIを提供しています。

  • AsyncHooksは非同期コールバック呼び出しを追跡するための低レベルAPIです。
  • AsyncLocalStorageはAsyncHooksをもとに非同期スレッドローカル変数を実現する高レベルAPIです。

AsyncLocalStorageの基本的な使い方

先に高級APIであるAsyncLocalStorageの基本的な使い方を説明します。

const async_hooks = require("async_hooks");
const asyncLocalStorage = new async_hooks.AsyncLocalStorage();

// runの中、およびそこから派生した非同期処理中では foo が参照される
asyncLocalStorage.run("foo", () => {
  setTimeout(() => {
    console.log("foo", asyncLocalStorage.getStore());
    // => foo foo
  }, 50);
});

// runの中、およびそこから派生した非同期処理中では bar が参照される
asyncLocalStorage.run("bar", () => {
  setTimeout(() => {
    console.log("bar", asyncLocalStorage.getStore());
    // => bar bar
  }, 50);
});

たとえば非同期HTTPサーバーであれば、リクエストハンドラをそれぞれ asyncLocalStorage.run で囲ってそれぞれ固有のオブジェクトを渡すようにすれば、リクエストローカル変数を実現できます。

AsyncHooksの構成

AsyncLocalStorageの理解を深めるために、ここからは低レベルAPIであるAsyncHooksを説明します。

AsyncHooksは非同期コールバックの呼び出しを観測できるオブザーバーAPIです。

import async_hooks from "node:async_hooks";
const asyncHook = async_hooks.createHook({
  // 非同期処理の呼び出しを観測する
  init(asyncId, type, triggerAsyncId, resource) { ... }
  before(asyncId) { ... }
  after(asyncId) { ... }
});
asyncHook.enable();

以下、テストのために以下のようなコード断片を使います。

const fs = require("fs");
const util = require("util");
const async_hooks = require("async_hooks");

// 通常のconsole.logは非同期に実行されるので、async_hooks内で使うと無限ループになってしまう。かわりに同期的に書き込む亜種を用意する。
// またstderrは先に初期化しておく。
const stderr = process.stderr.fd
const console = {
  log(s) {
    fs.writeSync(stderr, s + "\n");
  }
};

const asyncHook = async_hooks.createHook({
  init(asyncId, type, triggerAsyncId, resource) {
    console.log(`init(${asyncId}, ${type}, ${triggerAsyncId}, ${util.format(resource)}) ${async_hooks.executionAsyncId()}`);
  },
  before(asyncId) {
    console.log(`before(${asyncId}) ${async_hooks.executionAsyncId()}`);
  },
  after(asyncId) {
    console.log(`after(${asyncId}) ${async_hooks.executionAsyncId()}`);
  },
  destroy(asyncId) {
    console.log(`destroy(${asyncId}) ${async_hooks.executionAsyncId()}`);
  },
  promiseResolve(asyncId) {
    console.log(`promiseResolve(${asyncId}) ${async_hooks.executionAsyncId()}`);
  }
});
asyncHook.enable();

たとえば、以下のようにファイルを読み込んでみます。

fs.readFile("/proc/cpuinfo", (err, data) => {
  if (err) throw err;
});

すると、以下のようなログが得られます。

init(4, FSREQCALLBACK, 1, FSReqCallback {}) 1
before(4) 4
init(5, FSREQCALLBACK, 4, FSReqCallback {}) 4
after(4) 4
destroy(4) 0
before(5) 5
init(6, FSREQCALLBACK, 5, FSReqCallback {}) 5
after(5) 5
destroy(5) 0
before(6) 6
init(7, FSREQCALLBACK, 6, FSReqCallback {}) 6
after(6) 6
destroy(6) 0
before(7) 7
init(8, FSREQCALLBACK, 7, FSReqCallback {}) 7
after(7) 7
destroy(7) 0
before(8) 8
init(9, FSREQCALLBACK, 8, FSReqCallback {}) 8
after(8) 8
destroy(8) 0
before(9) 9
after(9) 9
destroy(9) 0

非同期リソースとAsync ID

Async Hooksで得られる情報のうち、中核となるのが非同期リソースの依存関係です。非同期リソースとはコールバックを呼び出す主体となるオブジェクトのことです (コールバックそのものが実質的に非同期リソースそのものとみなせる場合もあります)。

ネイティブ実装されている非同期リソースには以下のようなものがあります

type 生成方法
NONE 未使用 (ダミー値)
DIRHANDLE fs.opendir()
fs.opendirSync()
fs.promises.opendir()
DNSCHANNEL new dns.Resolver()
new dns.promises.Resolver()
ELDHISTOGRAM monitorEventLoopDelay()
FILEHANDLE fs.promises.open
FILEHANDLECLOSEREQ fileHandle.close()
FIXEDSIZEBLOBCOPY blob.arrayBuffer()
FSEVENTWRAP fs.watch()
fs.promises.watch()
FSREQCALLBACK fs.open()
fs.read()
fs.write()
他fs系のコールバックAPI
FSREQPROMISE fs.promises.open()
fileHandle.read()
fileHandle.write()
他fs.promises系のAPI
GETADDRINFOREQWRAP dns.lookup()
dns.promises.lookup()
GETNAMEINFOREQWRAP dns.lookupService()
dns.promises.lookupService()
HEAPSNAPSHOT v8.getHeapSnapshot()
HTTP2SESSION http2.createSecureServer()
http2.createServer()
http2.connect()
HTTP2STREAM http2関連
HTTP2PING http2関連
HTTP2SETTINGS http2関連
HTTPINCOMINGMESSAGE http.createServer()
new http.Server()
HTTPCLIENTREQUEST http.request()
new http.ClientRequest()
JSSTREAM new tls.TLSSocket()
http2.createSecureServer()
http2.createServer()
http2.connect()
JSUDPWRAP 内部テスト用
MESSAGEPORT new Worker()
PIPECONNECTWRAP socket.connect()
PIPESERVERWRAP net.createHandle()
net.createServerHandle()
PIPEWRAP childProcess.spawn()
net.createHandle()
socket.connect()
PROCESSWRAP child_process.spawn()
new child_process.ChildProcess()
PROMISE 未使用 (非ネイティブ実装を参照)
QUERYWRAP resolver.resolve()
resolver.resolve6()
他dns系のコールバックAPI
SHUTDOWNWRAP Socketのclose
Http2Streamのclose
SIGNALWRAP process.on(signalName)
STATWATCHER fs.watchFile()
STREAMPIPE http2stream.respondWithFD()
TCPCONNECTWRAP socket.connect()
TCPSERVERWRAP net.createHandle()
net.createServerHandle()
TCPWRAP net.createHandle()
socket.connect()
new tls.TLSSocket()
TTYWRAP new tty.ReadStream()
new tty.WriteStream()
UDPSENDWRAP udpSocket.send()
UDPWRAP dgram.createSocket()
new dgram.Socket()
SIGINTWATCHDOG プロセス立ち上げ時 (pre_execution)
WORKER new worker_threads.Worker()
WORKERHEAPSNAPSHOT worker.getHeapSnapshot()
WRITEWRAP ネイティブストリームへの書き込み?
ZLIB zlib.createDeflate()
zlib.createInflate()
他圧縮・伸長ストリームの作成API
CHECKPRIMEREQUEST crypto.checkPrime()
PBKDF2REQUEST crypto.pbkdf2()
subtle.deriveBits()
subtle.deriveKey()
subtle.importKey()
KEYPAIRGENREQUEST crypto.generateKeyPair()
subtle.generateKey()
KEYGENREQUEST crypto.generateKey()
subtle.generateKey()
KEYEXPORTREQUEST subtle.exportKey()
CIPHERREQUEST subtle.encrypt()
subtle.decrypt()
DERIVEBITSREQUEST subtle.deriveBits()
subtle.deriveKey()
HASHREQUEST subtle.digest()
RANDOMBYTESREQUEST crypto.randomFill()
crypto.randomBytes()
RANDOMPRIMEREQUEST crypto.generatePrime()
SCRYPTREQUEST crypto.scrypt()
subtle.deriveBits()
subtle.deriveKey()
subtle.importKey()
SIGNREQUEST subtle.sign()
subtle.verify()
TLSWRAP TLS関連全般
VERIFYREQUEST 未使用?
INSPECTORJSBINDING inspector.open()

ネイティブ実装ではない非同期リソースのうち、Node.jsで定義されているものとして以下のようなものがあります。

type 生成方法
PROMISE new Promise()
Timeout setTimeout()
Immediate setImmediate()
TickObject process.nextTick()
Microtask queueMicrotask()
AsyncLocalStorage new async_hooks.AsyncLocalStorage()
bound-anonymous-fn AsyncResource.bind()
QueuedRequest httpAgent.addRequest()

全ての非同期リソースにはユニークなAsync ID (number 型の非負整数) が割り振られます。特別なAsync IDとして以下があります。

  • -1 ... Async IDとして無効な値であることを表すためのダミー値
  • 0 ... ネイティブのイベントループのためのID
  • 1 ... Node.jsの起動処理とエントリポイント (最初のモジュールを読み込むタスク) のためのID

executionAsyncId

Node.js上で実行されるコードは全て、特定の非同期リソースに紐づいて実行されたとみなされます。現在実行中のコードがどの非同期リソースに紐づいて実行されるかを取得するには async_hooks.executionAsyncId() を使います。

console.log(async_hooks.executionAsyncId()); // => 1, 4, 6 など

async_hooks.executionAsyncId() が返す値は特定の非同期リソースに紐付いていますが、例外として以下の特別な値が存在します。

  • イベントループ自体は特別にexecutionAsyncId = 0で実行されます。 (AsyncHooks中で観測されることがある)
  • Node.jsの起動処理とエントリポイント (最初のモジュールを読み込むタスク) は特別にexecutionAsyncId = 1で実行されます。

非同期リソースに紐付く例を挙げます。たとえば、 fs APIのコールバックは FSReqCallback オブジェクトに紐付いて呼ばれます。

console.log(async_hooks.executionAsyncId()); // => 1
fs.open("/proc/cpuinfo", "r", (err, f) => {
  console.log(async_hooks.executionAsyncId()); // => FSReqCallbackのasyncId
});

setTimeout, setImmediate, process.nextTick, queueMicrotask のコールバックはそれぞれ Timeout, Immediate, TickObject, Microtask オブジェクトに紐付いて呼ばれます。

setTimeout(() => {
  console.log(async_hooks.executionAsyncId()); // => TimeoutのasyncId
}, 0);
setImmediate(() => {
  console.log(async_hooks.executionAsyncId()); // => ImmediateのasyncId
});
process.nextTick(() => {
  console.log(async_hooks.executionAsyncId()); // => TickObjectのasyncId
});
queueMicrotask(() => {
  console.log(async_hooks.executionAsyncId()); // => MicrotaskのasyncId
});

zlibライブラリの各種圧縮・伸長用クラスはTransformストリーム (詳しくは次回説明予定) として機能します。Transformストリームの読み取り側コールバック (dataイベントやreadableイベント) は当該ストリームに紐付いて呼ばれます。

const zlib = require("zlib");
const stream = zlib.createGzip();
stream.on("data", () => {
  console.log(async_hooks.executionAsyncId()); // => streamのasyncId
});
stream.write("foo");
stream.end();

Node.jsのTCPサーバーでは、サーバー用の非同期リソースと接続管理用の非同期リソースがどちらも「TCPオブジェクト」として管理されています。TCPサーバーの接続コールバックはTCPオブジェクト(サーバー)に紐付いて呼ばれます。得られたコネクションにデータが来たときの読み取りコールバック (dataイベントやreadableイベント) はTCPオブジェクト(ソケット)に紐付いて呼ばれます。

const net = require("net");
const server = net.createServer((conn) => {
  console.log(async_hooks.executionAsyncId()); // => serverのTCPオブジェクトのasyncId
  conn.on("data", (data) => {
    console.log(async_hooks.executionAsyncId()); // => connのTCPオブジェクトのasyncId
  });
}).listen(8080);

Promiseの扱いはやや特殊なので後述します。

executionAsyncResource

executionAsyncIdに紐づくリソースオブジェクトの実体を executionAsyncResource で取得できます。

fs.open("/proc/cpuinfo", () => {
  console.log(async_hooks.executionAsyncResource());
  // => FSReqCallback object
});

executionAsyncId = 0 と executionAsyncId = 1 にはそれぞれ新規オブジェクト ({}) が割り当てられます。

得られる非同期リソースオブジェクトは、公開APIの戻り値とは異なる実体の場合があるので注意が必要です。 executionAsyncResource() で得られたオブジェクトに対して、PromiseやHTTPサーバーとしての公開APIを呼び出すという使い方は基本的に想定されていません。では何に使うかというと、単なる情報の保管場所として使います。 (AsyncLocalStorage内で実際にそのように使われています)

triggerAsyncId

非同期リソースを点とすると、リソース間を線でつなぐのがtriggerAsyncIdです。これは各非同期リソースが作られた原因 (=親リソース) を指します。特別なtriggerAsyncIdとして以下が定義されています。

  • イベントループ自体 (asyncId = 0) の親リソースはないため、便宜上triggerAsyncId = 0のように扱われます。
  • Node.jsの起動処理とエントリポイント (asyncId = 1) の親はイベントループであり、 triggerAsyncId = 0 として扱われます。

多くの場合は「リソースが作成された時点でのexecutionAsyncId」がそのままtriggerAsyncIdになるため、デフォルトの挙動もこれに準じています。しかし明示的に上書きすることも可能で、Node.jsが提供する非同期リソースの中にも特別な挙動をするものがあります。たとえば、

  • TCPサーバーが接続を受理する処理はネイティブのイベントループ内 (executionAsyncId = 0) で実行されますが、これによって生成されるソケットのtriggerAsyncIdにはTCPサーバーのasyncIdが割り当てられます。
  • ソケットのcloseメソッドを用いてソケットを閉じたとき、closeイベントの通知はcloseの呼び出し元の文脈ではなくソケットの文脈で行われます。 (= nextTickによって生成されるTickObjectのtriggerAsyncIdはexecutionAsyncIdではなくソケットのasyncIdになる)

triggerAsyncIdによって、非同期リソースの集合は巨大な仮想的な木構造をなすことになります。もちろん、これらを実際に全てメモリに乗せるわけではなく、必要なくなった情報はできるだけ早く捨てる必要があります。

executionAsyncIdが指す非同期リソースのtriggerAsyncIdを、 async_hooks.triggerAsyncId() で取得できます。

基本の4フック

  • init は非同期リソースが作られたときに呼ばれます。
    • asyncIdtriggerAsyncId が両方含まれているため、非同期リソースの因果関係を追跡するのに必須です。
    • その他、 type パラメーターに非同期リソースの種別が、 resource パラメーターに非同期リソースの実体が入っています。
  • before, after は、ある非同期リソースに関連してコールバックが呼ばれるときに呼ばれます。
    • FSReqCallback のようなリソースでは高々1回のみ呼ばれます。
    • サーバーやソケットでは同じリソースに対して複数回コールバックが呼ばれることがあります。この場合は before / after も複数回呼ばれます。
    • executionAsyncId は対象の非同期リソースにセットされます (主コールバック内から見える値と同じです)
  • destroy は非同期リソースが不要になったときに呼ばれます。そのタイミングはリソースごとに異なります。
    • 通常 executionAsyncId は0の状態で呼ばれます。

Promiseの扱い

Promiseの扱いはやや特殊です。まず、パフォーマンス上の問題から、PromiseはasyncHookを登録しない限り追跡されません

// このコードはテスト用のフックを入れずに実行する
const async_hooks = require("async_hooks");

Promise.resolve(42).then(() => {
  // hookをかけずに呼び出すと0になる
  console.log(async_hooks.executionAsyncId());
});

またasyncHookが登録されている場合でも「initフックが存在するかどうか」「destroyフックが存在するかどうか」「promiseResolveフックが存在するかどうか」の条件に応じて内部的に登録される追跡処理が異なります。そのため、使わないフックは登録しない (undefinedのまま残しておく) ことでパフォーマンスに貢献する可能性があります。

asyncHookを登録した場合はPromiseは非同期リソースの一種として振舞います。このとき注意するべきこととして、thenコールバックのexecutionAsyncIdはthenをトリガーしたPromiseではなく、thenによって解決されるべきPromiseのasyncIdとして実行されます

Promise関連でエンキューされるマイクロタスクには以下の3種類があります。それぞれの結果はコード内のコメントを参照してください。

// Case 1: onFulfilled ハンドラ (PromiseFulfillReactionJob)
const promise1 = Promise.resolve(42);
const promise2 = promise1.then(() => {
  console.log(async_hooks.executionAsyncId()); // => promise2のasyncId
});

// Case2: onRejected ハンドラ (PromiseRejectReactionJob)
const promise3 = Promise.reject(new Error("test"));
const promise4 = promise3.catch(() => {
  console.log(async_hooks.executionAsyncId()); // => promise4のasyncId
});

// Case3: resolve関数に渡されたThenableの解決 (PromiseResolveThenableJob)
const promise5 = Promise.resolve({
  then(onFulfilled) {
    console.log(async_hooks.executionAsyncId()); // => promise5のasyncId
    onFulfilled(42);
  }
});

then のonFulfilledコールバックがPromiseやThenableを返した場合は、同じPromiseに対して複数のコールバックが紐付く (同じasyncIdに対してbefore/afterが複数回呼ばれる) こともあります。

PromiseのdestroyはGCで回収されたときに呼ばれます。

Promiseがresolveされたタイミングを受信するための promiseResolve フックが存在します。これは名前に反して、Promiseがfulfillまたはrejectされたときに呼ばれます (つまり1度だけ呼ばれます)。

Promiseの親リソース (triggerAsyncIdに入る値) は以下のルールで決定されます

たとえば以下のようになります。

const promise1 = (async function () {
  console.log(async_hooks.executionAsyncId()); // 呼び出し元と同じ
  await null;
  console.log(async_hooks.executionAsyncId()); // Promise.resolve(null).then(...) のasyncId
})();

// await null によって発生する Promise.resolve(null) はpromise1を親に持つ
// await null によって発生する Promise.resolve(null).then(continuation) はPromise.resolve(null) を親に持つ

FSReqPromise

fs.promises APIはC++で実装された非同期API (internalBinding("fs")) のラッパーとして機能しています。このC++ APIは (特定のフラグを有効化して呼び出すと) Promiseを返しますが、内部的にPromiseの状態管理をするために FSReqPromise オブジェクトが同時に生成されます。これはネイティブオブジェクトですが .promise 属性を持ち、ここに対象Promiseが格納されています。

カスタムリソース

多くのケースで非同期処理の関連付けはうまくいきますが、場合によってはうまくいかないこともあります。

たとえばやや人工的な例ですが、処理をまとめて実行するための以下のようなクラスを考えてみます。

class Batch {
  callbacks;
  // 100ms後にcallbackを実行する。
  // ただし、すでにエンキューされているコールバックがあればそれと同時に実行する。
  later(callback) {
    if (this.callbacks) {
      this.callbacks.push(callback);
    } else {
      this.callbacks = [callback];
      setTimeout(() => {
        const callbacks =  this.callbacks;
        this.callbacks = undefined;
        for (const callback of callbacks) {
          callback();
        }
      }, 100);
    }
  }
}

これを以下のように使ってみると、asyncLocalStorageが意図通りに伝搬されません。

const asyncLocalStorage = new async_hooks.AsyncLocalStorage();

const batch = new Batch();

asyncLocalStorage.run("foo", () => {
  batch.later(() => {
    console.log("Value 1 =", asyncLocalStorage.getStore());
    // => foo
  });
});
asyncLocalStorage.run("bar", () => {
  batch.later(() => {
    console.log("Value 2 =", asyncLocalStorage.getStore());
    // => foo
  });
});

AsyncResourceを使うとユーザー定義の非同期リソースを作ることができます。特にコールバックの非同期リソース化は AsyncResource.bind ヘルパー関数で手軽に実現できます。

class Batch {
  callbacks;
  later(callback) {
    if (this.callbacks) {
      this.callbacks.push(async_hooks.AsyncResource.bind(callback, "BatchCallback"));
    } else {
      this.callbacks = [async_hooks.AsyncResource.bind(callback, "BatchCallback")];
      setTimeout(() => {
        const callbacks =  this.callbacks;
        this.callbacks = undefined;
        for (const callback of callbacks) {
          callback();
        }
      }, 100);
    }
  }
}

こうすると呼び出し元のコンテキストが正しく保存されます。

new AsyncResource("MyResource") でAsyncResourceをインスタンス化すれば、非同期コンテキストのより複雑な制御も可能です。

AsyncLocalStorage

AsyncLocalStorageはAsyncHookのうち以下の機能を用いて実現されています。

  • init hook
  • executionAsyncResource
  • AsyncResource

大まかには以下のような仕組みです。

  • AsyncLocalStorageごとに新規Symbol (this.kResourceStore) を作成する。
    • 各非同期リソースに resource[kResourceStore] プロパティとしてデータを保存するために必要。
  • init hookを登録し、以下を行うようにする。 (ストアの継承処理)
    • resource[kResourceStore] = executionAsyncResource()[kResourceStore];
  • AsyncLocalStorage.prototype.run では以下の処理を行う。 (ストアのオーバーライド)
    • AsyncLocalStorage という名前のカスタム非同期リソース resource を作る。
    • resource[kResourceStore] を指定された値で置き換える。
    • resource の文脈の中に (runInAsyncScope を使って) 入り、その中でコールバックを実行する。

そのため、AsyncLocalStorageの値はtriggerAsyncIdではなく「作成時のexecutionAsyncId」から継承される仕組みになっています。

まとめ

  • AbortSignal
    • 中止可能な非同期APIを作るには、当該処理がAbortSignalのインスタンスを受け取るようにすればよい。
    • AbortSignalは中止通知を受け取る側のハンドルで、中止を通知する能力はAbortControllerとして分離されている。
  • Event
    • DOM由来のEventTargetと、Node.js由来のEventEmitterがあり、目的は同じだが互換性はない。
    • バブリングやデフォルト処理の中止方法、エラーが起きたときの挙動など、EventTargetとEventEmitterには振舞い上の細かい違いも多数存在する。
    • 近年はNode.jsでもEventTargetが使われる傾向にあり、EventEmitterの必要性は低下している。
  • Async Context
    • Node.jsは非同期処理の開始・終了などをフックできるAPI (Async Hooks) を提供している。これを使うと非同期処理同士の因果関係を追跡できる。
    • Async Hooksを利用した高級APIとしてAsyncLocalStorageがあり、非同期処理で擬似的なスレッドローカル変数を実現できる。

関連資料

更新履歴

  • 2021/10/24 公開。
  • 2021/11/13 DOMErrorとなっていた箇所をDOMExceptionにし、コンストラクタ呼び出しの引数位置を修正。DOMExceptionがNode.js 17から公開される旨の記述を追加。
脚注
  1. DOMExceptionは不安定版のNode.js 17でグローバル変数として公開されたため、安定版では次のNode.js 18から使えるようになりそうです。 ↩︎

Discussion

いつも素晴らしい記事をありがとうございます。補足をさせていただきます。

AbortController について

もともと TC39 で Cancelable Promise として進んでいましたがうまくいかず、DOM Standard の仕様として AbortController を定義することになったという経緯があります。jxck さんの記事が詳しいです。

https://blog.jxck.io/entries/2017-07-19/aborting-fetch.html

Async Context について

実は2020年の6, 7月頃に Node.js の async_hooks や Angular の zone.js を元にして ECMAScript に Async Context を入れようとする動きがありました。これは async_hooks における AsyncLocalStorage を提供する提案になっています。

https://github.com/legendecas/proposal-async-context

しかし Dynamic Scope に見えるという懸念や、そもそもスレッドローカル変数のような機能を言語仕様に入れたくないという指摘がされたことによって Stage 1 になることすらなく一年以上放置されています。

https://github.com/tc39/notes/blob/master/meetings/2020-06/june-3.md#async-context

https://github.com/tc39/notes/blob/master/meetings/2020-07/july-23.md#async-context-updates--for-stage-1

このような経緯から async_hooks や zone.js の先行きを不安視しています。zone.js は直接 addEventListener などをモンキーパッチしフックしているライブラリなので急に動かなくなる性質のあるものではないですが、一方で async_hooks は Node.js において Stability: 1 - Experimental として扱われている機能ということもあって個人的にはあまり依存しないほうが良いかなと思っています(AsyncLocalStorage は例外的に Stability: 2 - Stable として扱われているようです)。

このような経緯から async_hooks や zone.js の先行きを不安視しています。

これについてはすでに書かれているように AsyncLocalStorage がstableであることを念頭に置いています。async_hooksはAsyncLocalStorageを理解するためのベースとして必要だったので書いたものなので、そのまま置いておこうと思います。この種のcontext trackingはサーバーサイドのプラットフォームであるNode.jsにとっては必要なものなので、代替策 (Realm/VM Contextベースの実装が普及する、アプリケーションで明示的にcontext propagationをする慣習が普及する 等) が実用化されるまで少なくとも中期的には無くならないものだと思っています。

記事に対する指摘です。

fetch APIに限らず、Promiseを返すAPIがabortされたときは new DOMError("AbortError") を返すことが推奨されています。

DOMErrorDOMException を混同してしまっていると思われます。DOMError は非推奨扱いかつ Firefox で実装されていません。DOMException の方を使うのが正しいと思います。

https://developer.mozilla.org/ja/docs/Web/API/DOMError

Node.jsには非公開APIの DOMException クラスと、同じく非公開APIの AbortError クラスが存在します。

Node.js v17 から DOMException はグローバルに公開されるようになりました。一方で AbortError についてはどうやら忘れ去られていたみたいです(そもそも DOMException に寄せる方が正しそう?)。

https://github.com/nodejs/node/pull/39176#issuecomment-947973277

DOMErrorDOMException を混同してしまっていると思われます。

おおっと、ありがとうございます。new DOMException の使い方もおかしかったので一緒に直しました。

先ほど仕様に追加が入り、AbortController#abort メソッドに引数として reason を受け取れるようになりました。まだ実装はありません。

https://github.com/whatwg/dom/pull/1027

省略した場合は今まで通り name プロパティが "AbortError"DOMException になるようです。そして AbortSignal を受け取っていた非同期函数内でわざわざエラーオブジェクトを作る手間がなくなり、throw signal.reason とすることが出来るようになります。

ありがとうございます。これは嬉しいですね。

ログインするとコメントできます