TypeORM + PostgreSQL でリクエストごとのコネクションプールを実現した話
おはようございます。dinii の whatasoda です。
本記事では、TypeORM を使用した PostgreSQL への接続において、pg-pool
ライブラリの connectionTimeoutMillis
設定に起因するコネクションプールの管理問題に焦点を当てます。特に、大量のクエリを同時に処理する際に生じるタイムアウトエラーの原因とその影響について解説し、この問題に対する実用的な解決策を提案します。
なお、この記事内には実際に dinii で利用している実装を一部変更したものを載せていますが、これらのコードを利用する際は自己責任でお願い致します。
TL;DR
-
pg-pool
のconnectionTimeoutMillis
はコネクションプールの開放待ち時間にも適用され、望まないエラーを引き起こすことがある - TypeORM では DataSource インスタンス一つあたり一つのコネクションプールしか作成されない(≒ 1 Node.js プロセスあたり 1 コネクションプール)
-
pg-pool
のPool#connect
をラップしてcls-hooked
やrxjs
と組み合わせることでリクエストごとにいい感じのコネクションプールを実現できる
課題
TypeORM は PostgreSQL への接続に pg-pool
というライブラリを使用します。pg-pool
は Node.js で実装されたデータベースへのコネクションプール管理ライブラリです。 pg-pool
にはデータベース接続試行のタイムアウト時間として connectionTimeoutMillis
という設定項目がありますが、ここで設定した値はコネクションプールの開放待ちのタイムアウト時間としても利用されます。しかし、この設定が大量のクエリを一度に処理する際に問題を引き起こすことがあります。
例えば connectionTimeoutMillis
を 5000 に、プールの最大数を 10 に設定し、1秒かかるクエリを100個並行して実行すると、前半 50 クエリは成功しますが、後半 50 クエリは Error: timeout exceeded when trying to connect
というエラーによって失敗します。このエラーはコネクションプール開放の待ち時間が設定されたタイムアウト秒数を超えてしまうために発生するエラーです。
このように一斉に複数のクエリが走る例としてはまず Promise.all
を使うパターンがあります。 dinii ではレジ締めのオペレーションのなかで必要になる情報を参照する際に 卓・来店客・注文・会計 などの情報を時間軸ベースでクエリする必要があり、 複数のクエリを Promise.all
を使って並行に実行しています。
他の例として targets.map(async (target) => { ... })
のような処理があります。これは要は N+1 になっているので決して良い実装とは言えませんが、そもそものクエリが重たいので分割したかったり、実装としてのシンプルさを優先して意図的にこのように書くことがあるかもしれません。 dinii では多くの店舗をもつ業態における集計系の処理で一部このようなパターンを使っています (BigQuery を使うようにして根本的に解決するような計画を進めていますがまだこのような実装が残ってしまっています)。
これらのケースで状況を改善するためにはいずれも並行処理を直列に寄せていくというアプローチが有効です。具体的には Promise.all
で並行実行するのではなく一つずつ await
するようにしたり、 targets を分割して段階的に処理するか完全に直列に reduce や for 文で処理するようにしたりという変更です。しかし、すべての箇所でそれを強制するのは変更の範囲が広すぎるためあまり現実的ではありませんし、パフォーマンスに作用する変数が増えてしまうことでパフォーマンスの悪化を招く可能性もあります。
そもそも、コネクションプールの待ち時間が発生すること自体は異常な状態ではなく、時間さえかければ処理を完了できるもののはずです。大量のデータを扱うことの多い、非同期的に処理できればいいものであればエラーにならずにずっと待っていてほしいものです。
TypeORM では DataSource インスタンス一つあたり pg-pool
のコネクションプールが一つのみ設けられるようになっています(ref1, ref2)(replication の設定がある場合は replica ごとに pool が作られます)。 DataSource インスタンスは無闇にたくさん作るようなものではなく、基本的にはデータベースのインスタンスに対応させる形で作るものだと思います。そのため、一つのデータベースのみに着目するのであれば、一つの Node.js プロセスにはただ一つのコネクションプールが存在し、同じ Node.js プロセスで実行されている複数のリクエストはそのコネクションプールを共有してクエリを実行することになります。すると、あるリクエストが大量のクエリが発行した場合に同じ Node.js プロセスで実行されている他のリクエストでエラーが発生したり、パフォーマンスに影響を及ぼす可能性があります。(dinii では Cloud Run に NestJS アプリケーションをデプロイしているため、ここでいう Node.js プロセスは Cloud Run の 1 インスタンスと言い換えることができます。)
続く2つのセクションではこの課題を「独自のプールの実装」と「リクエスト毎のプールの適用」の2段階に分けて解決していきます。
解決 Step 1: 独自のプールの実装
この問題に対処するため、まず pg-pool
****をラップする方法を検討しました。直接 pg-pool
内部の実装を変更することはメンテナンス性に懸念があるため、外側から問題を解決できないかというアプローチです。
外側からのアプローチであっても中で何が行われているのかを知らないことには何もできません。まずは pg-pool
の実装を紐解いてみました。(このあたり)
- 新たなコネクションを獲得するためには必ず
Pool#connect
を呼び出している- 問題の
Error: timeout exceeded when trying to connect
はこのメソッドから発行されている
- 問題の
-
Pool#connect
はコネクションを獲得するとPoolClient
インスタンスを返す -
PoolClient
インスタンスは自身に割り当てられた処理を完了するとPoolClient#release
を呼び出す-
PoolClient#release
は自身を発行したPool
インスタンスのrelease
イベントを発火する(Pool
クラスは EventEmitter を継承している)
-
-
release
イベントからは開放されたPoolClient
インスタンスそのものを参照することができる
以上の挙動を元に、以下の方針で独自のコネクションプールを実装してみます。
-
Pool#connect
をラップしてオリジナルの処理を遅延実行できるようにする - 最大並行実行数を調整できるキューを rxjs を用いて実装し、遅延実行のタイミングや最大実行数を制御する
- 適切に最大実行数を制御するために「接続 → クエリ実行 → 開放」 まででキューのタスクの完了とする
(シーケンス図で書いたため、実際には並行・並列で実行されていても直列で書かれている箇所があります)
Queue の実装
import { createPromise, sleep } from "dinii-self-js-lib/promise";
import { PoolClient } from "pg";
import { Subject, Subscription, defer, mergeMap, tap } from "rxjs";
import { getHookInContext, getTransactionalContext } from "typeorm-transactional/dist/common";
const DEFAULT_QUEUE_SIZE = 20;
const DEFAULT_NO_ACTIVITY_TIMEOUT = 10000;
export type PgConnectionQueue = ReturnType<typeof createPgConnectionQueue>;
export type PgConnectionQueueProps = {
name: string;
concurrency: number | "default";
noActivityTimeout: number | "default";
};
export type PgPoolReleaseCallback = (err: Error | undefined, client: PoolClient) => void;
export type PgPoolLike = {
on(event: "release", listener: PgPoolReleaseCallback): unknown;
};
export class ConnectionQueueNoActivityTimeoutError extends Error {
constructor() {
super("ConnectionQueue timeout: no activity occurred for a while");
}
}
const clientToReleaseNotifierMap = new Map<PoolClient, { notify: () => void }>();
const handlerAttachedPoolSet = new WeakSet<PgPoolLike>();
const waitForClientRelease = (client: PoolClient) => {
// NOTE: 要は Promise.withResolvers()
const { promise, resolve } = createPromise<void>();
clientToReleaseNotifierMap.set(client, { notify: resolve });
return promise;
};
const handleReleaseClient = (client: PoolClient) => {
const notifyRelease = clientToReleaseNotifierMap.get(client)?.notify;
if (notifyRelease) {
notifyRelease();
clientToReleaseNotifierMap.delete(client);
return true;
}
return false;
};
// NOTE: Pool の release イベントを利用して client が開放されるのを検知する
const attachReleaseHandler = (pgPool: PgPoolLike) => {
if (!handlerAttachedPoolSet.has(pgPool)) {
pgPool.on("release", (_err, client) => {
if (!handleReleaseClient(client)) {
// NOTE: connect 完了直後に同期的に release が呼ばれるケースに対応するため、
// 一度対象がみつからなければ Promise.resolve を挟んで再度実行する
void Promise.resolve().then(() => {
handleReleaseClient(client);
});
}
});
handlerAttachedPoolSet.add(pgPool);
}
};
type QueueItem = {
isInTransaction: boolean;
rejectConnect: (reason: unknown) => void;
connect: () => Promise<PoolClient | null>;
};
export const createPgConnectionQueue = (props: PgConnectionQueueProps) => {
let subscription: "uninitialized" | Subscription | "closed" = "uninitialized";
const queue$ = new Subject<QueueItem>();
const connections$ = defer(() => {
let waitingCount = 0;
let processingCount = 0;
let transactionCount = 0;
let error: Error | null = null;
let noActivityTimer: ReturnType<typeof setTimeout> | null = null;
const noActivityTimerPromise = createPromise<void>();
let hasSentWarningLog = false;
const concurrency = props.concurrency === "default" ? DEFAULT_QUEUE_SIZE : props.concurrency;
const noActivityTimeout =
props.noActivityTimeout === "default" ? DEFAULT_NO_ACTIVITY_TIMEOUT : props.noActivityTimeout;
const setNoActivityTimerIfNeeded = () => {
// NOTE: 処理に詰まってしまった場合のために、エラーをセットして後続の処理をキャンセルできるようにしておきたいが、
// すべてでタイムアウトを設定するとただ処理が長いだけでエラーになってしまう。
// そのため、最大同時実行数に到達していてかつ後続の処理がある場合のみチェックするようにする。
if (processingCount === concurrency && waitingCount > 0) {
noActivityTimer =
// NOTE: 重複して timeout が設定されてしまうことを回避するため、既存のタイマーがあればそちらをつかう
noActivityTimer ??
setTimeout(() => {
error = new ConnectionQueueNoActivityTimeoutError();
noActivityTimerPromise.resolve();
}, noActivityTimeout);
}
};
return queue$.pipe(
tap(() => {
waitingCount++;
if (!error) {
setNoActivityTimerIfNeeded();
}
}),
// NOTE: concurrency 以上に接続しようとすると他の処理が完了するまで待機する(mergeMap の機能)
mergeMap(async ({ isInTransaction, connect, rejectConnect }) => {
waitingCount--;
if (error) {
rejectConnect(error);
return;
}
processingCount++;
if (isInTransaction) {
transactionCount++;
// NOTE: トランザクションの場合1つのコネクションが release されるまでの時間がトランザクションではない場合と
// 比較して長い傾向にある。そのため noActivityTimeout になる可能性が高く、 ConnectionQueue のパラメーターの
// 調整が必要になるため、一つのリクエストで複数のトランザクションが並行して走っている場合に気がつけるようにしておく
if (transactionCount > 1 && !hasSentWarningLog) {
hasSentWarningLog = true;
logger.warn(
`Multiple transactions are processing concurrently in a single request: ${props.name}`,
);
}
}
setNoActivityTimerIfNeeded();
// NOTE: タイムアウトした場合は後続の処理を reject していく必要があるため同時実行数を減らしたい。
// そのため timeout の promise とメインの処理とで race しておく
await Promise.race([
noActivityTimerPromise.promise,
(async () => {
try {
const client = await connect();
if (!client) return;
await waitForClientRelease(client);
} catch {
// no-op
}
})(),
]);
// NOTE: 一つでも resolve されれば後続のタスクは処理できるのでタイマーをリセットする
if (noActivityTimer) {
clearTimeout(noActivityTimer);
noActivityTimer = null;
}
if (isInTransaction) {
transactionCount--;
}
processingCount--;
}, concurrency),
);
});
const closeQueue = () => {
if (subscription instanceof Subscription) {
subscription.unsubscribe();
}
subscription = "closed";
};
const enqueue = (pool: PgPoolLike, connect: () => Promise<PoolClient | null>) => {
if (subscription === "closed") {
const { name } = props;
logger.warn("Trying to connect to database after connection pool is closed", { name });
return connect() as Promise<PoolClient>;
}
if (subscription === "uninitialized") {
subscription = connections$.subscribe();
}
attachReleaseHandler(pool);
const { promise, resolve, reject } = createPromise<PoolClient>();
queue$.next({
// NOTE: dinii では typeorm-transactional を利用しているため、本来はそれに適した実装をおいています
isInTransaction: checkIsInTransaction(),
rejectConnect: reject,
connect: () => {
const result = connect();
result.then(
(client) => {
if (client) {
resolve(client);
} else {
reject(new Error("[unreachable] pg-pool client is null"));
}
},
(error) => reject(error),
);
return result;
},
});
return promise;
};
return { closeQueue, enqueue };
};
dinii ではモノレポを採用しており、 dinii-self-js-lib というのはそのモノレポ内で管理しているもので、全プロダクトのソースコードから利用できる JavaScript の一般的な実装をまとめています。モノレポ内でのパッケージの解決には tamashii というツールを自作して行っています。こちらについてはまたどこかで記事を出せればと考えています。
patch の実装
import { noop } from "dinii-self-js-lib/noop";
import { createPromise } from "dinii-self-js-lib/promise";
import type { Pool, PoolClient } from "pg";
import {
getPgConnectionQueueContext,
unwrapOutOfConnectionQueue,
} from "./wrap-in-connection-queue";
const patchedPrototypeSet = new WeakSet();
export type PgPoolConnectCallback = (
err: Error | undefined,
client: PoolClient | undefined,
done: (release?: any) => void,
) => void;
type AsyncConnect = ReturnType<typeof phony>["asyncConnect"];
type CallbackConnect = ReturnType<typeof phony>["callbackConnect"];
const phony = (PoolClass: typeof Pool) => ({
asyncConnect: () => PoolClass.prototype.connect(),
callbackConnect: (callback: PgPoolConnectCallback) => PoolClass.prototype.connect(callback),
});
export const patchPgPoolWithConnectionQueue = (PoolClass: typeof Pool) => {
const prototype = PoolClass.prototype;
if (patchedPrototypeSet.has(prototype)) return;
// NOTE: overload ごとに分けて実装を定義することで型的な安心感を得ている
const asyncConnect = PoolClass.prototype.connect as AsyncConnect;
const callbackConnect = PoolClass.prototype.connect as CallbackConnect;
const asyncConnect_: AsyncConnect = function (this: Pool) {
const context = getPgConnectionQueueContext();
const queue = context?.queue ?? null;
if (!queue) {
return asyncConnect.apply(this);
}
return queue.enqueue(this, () => asyncConnect.apply(this));
};
const callbackConnect_: CallbackConnect = function (this: Pool, callback: PgPoolConnectCallback) {
const context = getPgConnectionQueueContext();
const queue = context?.queue ?? null;
if (!queue) {
return callbackConnect.apply(this, [callback]);
}
let rejected = false;
const connect = () => {
const { resolve, reject, promise } = createPromise<PoolClient | null>();
const wrappedCallback: PgPoolConnectCallback = (...args) => {
const [error, client] = args;
if (error) {
rejected = true;
reject(error);
} else {
resolve(client ?? null);
}
return callback(...args);
};
callbackConnect.apply(this, [wrappedCallback]);
return promise;
};
return void queue.enqueue(this, connect).catch((error) => {
if (!rejected) {
callback(error, undefined, noop);
}
});
};
const connect_ = function connect_(this: Pool, callback?: PgPoolConnectCallback) {
return callback ? callbackConnect_.apply(this, [callback]) : asyncConnect_.apply(this);
} as AsyncConnect & CallbackConnect;
prototype.connect = connect_;
};
実行中の処理が詰まってしまった場合への対策
なにか予期しない不具合が発生して処理が詰まってしまった場合、ただ単に処理をキューにのせるだけでは実行もされずエラーにもならないゾンビクエリがずっと残り続けるような状態になってしまいます。コネクションプールの開放待ちはエラーにしたくないですが、このようなケースでは適切にエラーを返してあげたいものです。
上記の実装では、以下の2つを満たしたときに待機中の処理に対してエラーを返すような仕組みが実装されています。
- 実行中の処理の数が最大同時並行数に達している状態で後続の待機中の処理がある
- 処理中のタスクが一つも完了しないまま一定時間が経過した
トランザクションについての考慮
トランザクションとして実行する場合、一連の処理はすべて一つのコネクションを使って行われます。そのため、トランザクション内でいくらクエリが並行に発行されても直列で実行されるようになっています。
今回実装した独自のコネクションプールは Pool#connect
を呼び出すものに対して適用されるため、トランザクション内部の処理の制御は行わず、トランザクション自体の同時並行実行数を制御するような動きになります。
1トランザクションあたりに要する時間と1クエリあたりに要する時間とを比べると、大抵の場合は1トランザクションあたりに要する時間のほうが長くなるはずです。よって、先述の処理が詰まってしまった場合への対策で実装した仕組みに引っかかって後続の処理がエラーを返してしまう蓋然性は少し高くなってしまいます。そこで、1リクエストに複数のトランザクションが並行して実行されている場合にはログを出力して気がつけるようにしています。
解決 Step 2: リクエスト毎のプールの適用
Step 1 によって独自のコネクションプールを設けることには成功しましたが、一つのコネクションプールを同一インスタンス内で処理するすべてのリクエストで共有している問題も解決するためにはもう少し手を加える必要があります。
Step 1 では Pool#connect
の処理の外側で独自のコネクションプールを実現しているので、あとはリクエストごとに新しいプールを作成して利用できるようにすれば良さそうです。これは cls-hooked
を利用することで簡単に実現できました。
import { createNamespace } from "cls-hooked";
import { PgConnectionQueue, PgConnectionQueueProps, createPgConnectionQueue } from "./queue";
enum PgConnectionQueueContextKey {
Queue = "queue",
Props = "props",
}
export type PgConnectionQueueContextValue = {
queue: PgConnectionQueue;
props: PgConnectionQueueProps;
};
const clsContext = createNamespace("dinii:connection-queue");
const getActiveClsContext = () => (clsContext.active ? clsContext : null);
const setPgConnectionQueueContext = (value: PgConnectionQueueContextValue | null) => {
const context = getActiveClsContext();
context?.set(PgConnectionQueueContextKey.Queue, value?.queue ?? null);
context?.set(PgConnectionQueueContextKey.Props, value?.props ?? null);
};
export const getPgConnectionQueueContext = (): PgConnectionQueueContextValue | null => {
const context = getActiveClsContext();
const queue = (context?.get(PgConnectionQueueContextKey.Queue) ??
null) as PgConnectionQueue | null;
const props = (context?.get(PgConnectionQueueContextKey.Props) ??
null) as PgConnectionQueueProps | null;
return queue && props ? { queue, props } : null;
};
export const wrapInConnectionQueue = <Func extends (this: any, ...args: any[]) => ReturnType<Func>>(
fn: Func,
props: PgConnectionQueueProps,
): Func => {
const isAsyncFn = (fn as any)[Symbol.toStringTag] === "AsyncFunction";
return function withConnectionQueue(this: any, ...args: any[]): ReturnType<Func> {
return clsContext.runAndReturn(() => {
const queue = createPgConnectionQueue(props);
setPgConnectionQueueContext({ queue, props });
const result = fn.apply(this, args);
if (!isPromise(result)) {
return result;
}
if (isAsyncFn) {
return result as ReturnType<Func>;
}
// NOTE: Promise を返すが async function として定義されていないものについてもカバーするためには runPromise が必要
return clsContext.runPromise(() => result) as ReturnType<Func>;
});
} as Func;
};
export const unwrapOutOfConnectionQueue = <
Func extends (this: any, ...args: any[]) => ReturnType<Func>,
>(
fn: Func,
): Func => {
const isAsyncFn = (fn as any)[Symbol.toStringTag] === "AsyncFunction";
return function withoutConnectionQueue(this: any, ...args: any[]): ReturnType<Func> {
const curr = getPgConnectionQueueContext();
if (!curr) {
return fn.apply(this, args);
}
return clsContext.runAndReturn(() => {
setPgConnectionQueueContext(null);
const result = fn.apply(this, args);
if (!isPromise(result)) {
return result;
}
if (isAsyncFn) {
return result as ReturnType<Func>;
}
// NOTE: Promise を返すが async function として定義されていないものについてもカバーするためには runPromise が必要
return clsContext.runPromise(() => result) as ReturnType<Func>;
});
} as Func;
};
これで、独自のコネクションプールでラップされたリクエストであればリクエストごとに最大コネクション数を制御できるようになりました。しかし、 pg-pool
側のプールサイズによる制限は引き続き受けることになるため大きめの値の設定はしますが、この状態でもし独自のコネクションプールの設定漏れが発生してしまうと結局元の問題と同じことが発生する可能性があります。変更前よりもプールのサイズが大きいため、むしろ過剰にプールを消費してしまうという新しい問題も発生してしまうかもしれません。
dinii では NestJS を利用しているため、この課題に対する対策としてすべてのリクエストをラップする interceptor を定義しました。middleware でも良かったのですが、トランザクションの並行実行に関するログなどにリクエストの情報含めたかったため、それらの情報へのアクセスが容易な interceptor を利用しています。
import { CallHandler, ExecutionContext, Injectable, NestInterceptor } from "@nestjs/common";
import { wrapObservableWithConnectionQueue } from "./run-with-connection-queue";
@Injectable()
export class ConnectionQueueInterceptor implements NestInterceptor {
intercept(context: ExecutionContext, next: CallHandler<any>) {
return wrapObservableWithConnectionQueue(() => next.handle(), {
name: this.getName(context),
concurrency: "default",
noActivityTimeout: "default",
});
}
private getName(context: ExecutionContext) {
...
}
}
今度こそ、リクエストごとのコネクションプールを設けることができましたが、プールのサイズを個別に調整したいケースのために decorator も定義しました。この decorator を利用しているリクエストでは interceptor と decorator で重複してコネクションプールが作成されますが、コネクションプールの作成自体はほとんどコストがかからない処理であり、より内側でラップしたコネクションプールが利用されるようになっている(React の context などと同じイメージ)ため問題ありません。
import { unwrapOutOfConnectionQueue, wrapInConnectionQueue } from "./wrap-in-connection-queue";
export const PG_CONNECTION_QUEUE_WATERMARK = "dinii:pg-connection-queue-watermark";
export type ConnectionQueueConfig =
| {
disabled?: never;
concurrency: number | "default";
noActivityTimeout?: number | "default";
}
| {
disabled: true;
concurrency?: never;
noActivityTimeout?: never;
};
// eslint-disable-next-line func-style
export function ConnectionQueue(config: ConnectionQueueConfig): MethodDecorator & ClassDecorator {
const decorateMethod: MethodDecorator = (
target: unknown,
methodName: string | symbol,
descriptor: TypedPropertyDescriptor<any>,
) => {
const { concurrency = "default", noActivityTimeout = "default", disabled = false } = config;
const className = (target?.constructor?.name ?? null) as string | null;
const original = descriptor.value;
descriptor.value = disabled
? unwrapOutOfConnectionQueue(original)
: wrapInConnectionQueue(original, {
name: `${className}/${methodName.toString()}`,
concurrency,
noActivityTimeout,
overwrite: false,
});
Reflect.getMetadataKeys(original).forEach((previousMetadataKey) => {
const previousMetadata = Reflect.getMetadata(previousMetadataKey, original);
Reflect.defineMetadata(previousMetadataKey, previousMetadata, descriptor.value);
});
Reflect.defineProperty(descriptor.value, "name", {
value: original.name,
writable: false,
});
};
const decorateClass: ClassDecorator = (constructor) => {
for (const key of Reflect.ownKeys(constructor.prototype)) {
if (key === "constructor") continue;
const descriptor = Reflect.getOwnPropertyDescriptor(constructor.prototype, key);
if (!descriptor || typeof descriptor.value !== "function") continue;
decorateMethod(constructor.prototype, key, descriptor);
Reflect.defineProperty(constructor.prototype, key, descriptor);
}
Reflect.defineMetadata(PG_CONNECTION_QUEUE_WATERMARK, true, constructor);
};
return function (
target: any,
methodName: string | symbol,
descriptor: TypedPropertyDescriptor<any>,
) {
return typeof target === "function"
? decorateClass(target)
: decorateMethod(target, methodName, descriptor);
} as MethodDecorator & ClassDecorator;
}
まとめ
本記事では、TypeORM と PostgreSQL の接続における pg-pool
の connectionTimeoutMillis
設定が引き起こすコネクションプール管理の問題を解説しました。リクエストごとに新しいプールを作成して利用することで、インスタンス内のリクエスト間での干渉を防ぎ、システムの安定性の向上を期待できます。
おわりに
dinii では飲食をもっと楽しく面白くするために一緒に働いてくれる仲間を募集しています。今回紹介したような基盤寄りの実装ができるエンジニア向けのポジションもあり、まさに自分はそのポジションで活動しています。詳細はこちらの記事をご覧ください!
Discussion