Kysely + Cloudflare D1 でトランザクションを実行するヘルパー関数を書いた話
TL;DR
- D1 の batch はトランザクション[2]
- kysely-d1 を改造してヘルパー関数を作ると batch を型付きで呼べてよい
- batch と beginTransaction/commit/rollback では、できることに差がある
前提条件
趣味プロジェクトで kysely を採用していたので業務でも引き続き使っています。Prisma はコードサイズが大きいので。
実装
db.batch() の型は batch<T = unknown>(statements: D1PreparedStatement[]): Promise<D1Result<T>[]>;
となっているので、 D1PreparedStatement[]
を作って渡せばいいことがわかります。
D1PreparedStatement
は db.prepare(sql).bind(...params)
で作れるので、kysely-d1 の Compilable
の .compile()
の結果を渡します。
あとは全体を map() で回せばよいです。
返ってくる D1Result を kysely の QueryResult に変換する部分は kysely-d1 から export されていないので、実装を取り出してきて使います。
Promise.all() の型定義を参考に、引数 queryes の型から戻り値の型を作るようにすれば完成です。
import { Compilable, QueryResult } from 'kysely'
import { D1Result } from '@cloudflare/workers-types'
// cc0
/** Compilable から Output の型を取り出す型関数 */
type QueryOutput<Q> = Q extends Compilable<infer O> ? O : never
/** D1 の db.batch() を kysely で使うヘルパー関数 */
export const batch = async <Q extends readonly Compilable[]>(
db: D1Database,
queries: Q
): Promise<{ [P in keyof Q]: QueryResult<QueryOutput<Q[P]>> }> => {
// 一個もクエリがないと死ぬので
if (queries.length === 0) return [] as { [P in keyof Q]: QueryResult<QueryOutput<Q[P]>> }
// D1 の db.batch() を呼ぶ
const results = await db.batch(
queries
.map((query) => query.compile())
.map(({ sql, parameters }) => db.prepare(sql).bind(...parameters))
)
// 一個でもエラーがあれば例外を上げる
const error = results.find((result) => result.error)
if (error) throw new Error(error.error)
// D1Result を kysely の QueryResult に変換する
return results.map(
(result): QueryResult<unknown> => transformResult(result)
) as { [P in keyof Q]: QueryResult<QueryOutput<Q[P]>> }
}
/*!
* MIT License
*
* Copyright (c) 2022 Aiden Wallis
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
/** D1Result を kysely の QueryResult にマップする。
* https://github.com/aidenwallis/kysely-d1/blob/main/src/index.ts#L98
* executeQuery からの複写
*/
const transformResult = <O>(d1Result: D1Result): QueryResult<O> => {
const { error, meta, results } = d1Result
if (error) throw new Error(error)
const numberAffectedRows = meta.changes > 0 ? BigInt(meta.changes) : undefined
return {
insertId:
meta.last_row_id === undefined || meta.last_row_id === null
? undefined
: BigInt(meta.last_row_id),
rows: (results as O[]) || [],
numAffectedRows: numberAffectedRows,
// deprecated in kysely >= 0.23, keep for backward compatibility.
numUpdatedOrDeletedRows: numberAffectedRows,
}
}
使用方法
batch に渡すクエリはいつも通りクエリビルダで組み立てて、await や .execute() を付けずに配列に詰める。
戻り値にはきちんと型が付く。
// db: Kysely<Database>
// role_ids: number[]
const [_, __, { rows: roles }] = await batch(c.env.DB, [
db.deleteFrom('role_user').where('user_id', '=', user_id),
db
.insertInto('role_user')
.values(role_ids.map((roleId) => ({ user_id, role_id }))),
db
.selectFrom('role_user')
.select('role_id')
.where('user_id', '=', user_id)
.distinct(),
] as const)
// roles: number[]
db.batch(statements) の制約
D1 の提供する db.batch()
では、begin transaction / commit / rollback 命令でできていたことが一部実現できません。できること・できないことを以下に書きます
できること
- クエリの途中でエラーが起きたらロールバックする[3]
- ロックがかかる
できないこと
- トランザクション分離レベル[4][5]を指定できない
- db.batch() に該当するオプションを受け取る引数がない
- トランザクション中のクエリの結果を利用した処理ができない
- 結果を次のクエリに渡せない
- サブクエリで工夫するしかない
- アプリケーションコードで判断を入れてロールバックもできない
- 結果を次のクエリに渡せない
また、D1 ではなく Kysely の制約に由来するものとして、プラグイン一覧を取り出せないのでこのヘルパー関数の結果には kyselyPlugin.transformResult()
が適用されていません。自前で呼ぶ必要があります。
余談
Drizzle の対応
Drizzle ORM は 0.29.0 で batch をサポートしている
記事を書いている途中に見つけた、この記事の Drizzle 版(先行研究)kysely-d1 について
- vitest にかけたらエラー吐かれました。esm版も提供してください
- #25 がマージされたのでそのうち出るはず
D1 のストアドプロシージャ機能について(未実装)
公式ブログのこの記事が言うには、
- D1 でトランザクションAPIを提供できない理由はコードの実行場所とDBのプライマリが離れることでスケールしなくなるため
- ストアドプロシージャの要点は、DB内で実行されることではなくデータのすぐそばで実行されることである
- なので、D1のプライマリ・インスタンスと同じ場所に worker を置き、そこでコードが実行されるようにすれば同じこと
- コードの実行場所とDBのプライマリが離れないので、D1 プライマリ横 worker では
DB.transaction( ... )
API を呼べるようにしてよい
これを読むと Cloudflare Durable Objects がまさに同じアプローチで実装されていることに気づく。
(ところでこの記事から1年半経ってるんですが D1 ストアドプロシージャ機能の実装まだですか)
-
公式が "Batched statements are SQL transactions." と言っている https://developers.cloudflare.com/d1/build-with-d1/d1-client-api/#batch-statements ↩︎
-
miniflare にもロールバックするかどうかのテスト項目がある https://github.com/cloudflare/workers-sdk/blob/95a1fa1c065d4745a6c0a82af5982313d1315bf4/packages/miniflare/test/plugins/d1/suite.ts#L74 ↩︎
-
"SQLiteにおける分離レベルはANSI/ISO SQL標準の表記とは異なっており..." https://qiita.com/Yuki_312/items/7a7dff204e67af0c613a#分離レベル ↩︎
Discussion