📚

Kysely + Cloudflare D1 でトランザクションを実行するヘルパー関数を書いた話

2024/05/20に公開

TL;DR

  • D1 の batch はトランザクション[2]
  • kysely-d1 を改造してヘルパー関数を作ると batch を型付きで呼べてよい
  • batch と beginTransaction/commit/rollback では、できることに差がある

前提条件

  • Cloudflare Workers + D1 + Kysely (kysely-d1使用)
    • Hono も使っていますがこの記事の内容には関係ないです

趣味プロジェクトで kysely を採用していたので業務でも引き続き使っています。Prisma はコードサイズが大きいので。

実装

db.batch() の型は batch<T = unknown>(statements: D1PreparedStatement[]): Promise<D1Result<T>[]>; となっているので、 D1PreparedStatement[] を作って渡せばいいことがわかります。
D1PreparedStatementdb.prepare(sql).bind(...params) で作れるので、kysely-d1 の Compilable.compile() の結果を渡します。
あとは全体を map() で回せばよいです。

返ってくる D1Result を kysely の QueryResult に変換する部分は kysely-d1 から export されていないので、実装を取り出してきて使います。

Promise.all() の型定義を参考に、引数 queryes の型から戻り値の型を作るようにすれば完成です。

src/db/index.ts
import { Compilable, QueryResult } from 'kysely'
import { D1Result } from '@cloudflare/workers-types'

// cc0
/** Compilable から Output の型を取り出す型関数 */
type QueryOutput<Q> = Q extends Compilable<infer O> ? O : never
/** D1 の db.batch() を kysely で使うヘルパー関数 */
export const batch = async <Q extends readonly Compilable[]>(
  db: D1Database,
  queries: Q
): Promise<{ [P in keyof Q]: QueryResult<QueryOutput<Q[P]>> }> => {
  // 一個もクエリがないと死ぬので
  if (queries.length === 0) return [] as { [P in keyof Q]: QueryResult<QueryOutput<Q[P]>> }

  // D1 の db.batch() を呼ぶ
  const results = await db.batch(
    queries
      .map((query) => query.compile())
      .map(({ sql, parameters }) => db.prepare(sql).bind(...parameters))
  )

  // 一個でもエラーがあれば例外を上げる
  const error = results.find((result) => result.error)
  if (error) throw new Error(error.error)

  // D1Result を kysely の QueryResult に変換する
  return results.map(
    (result): QueryResult<unknown> => transformResult(result)
  ) as { [P in keyof Q]: QueryResult<QueryOutput<Q[P]>> }
}

/*!
 * MIT License
 *
 * Copyright (c) 2022 Aiden Wallis
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy
 * of this software and associated documentation files (the "Software"), to deal
 * in the Software without restriction, including without limitation the rights
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 * copies of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be included in all
 * copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE.
 */
/** D1Result を kysely の QueryResult にマップする。
 * https://github.com/aidenwallis/kysely-d1/blob/main/src/index.ts#L98
 * executeQuery からの複写
 */
const transformResult = <O>(d1Result: D1Result): QueryResult<O> => {
  const { error, meta, results } = d1Result

  if (error) throw new Error(error)
  const numberAffectedRows = meta.changes > 0 ? BigInt(meta.changes) : undefined

  return {
    insertId:
      meta.last_row_id === undefined || meta.last_row_id === null
        ? undefined
        : BigInt(meta.last_row_id),
    rows: (results as O[]) || [],
    numAffectedRows: numberAffectedRows,
    // deprecated in kysely >= 0.23, keep for backward compatibility.
    numUpdatedOrDeletedRows: numberAffectedRows,
  }
}

使用方法

batch に渡すクエリはいつも通りクエリビルダで組み立てて、await や .execute() を付けずに配列に詰める。
戻り値にはきちんと型が付く。

// db: Kysely<Database>
// role_ids: number[]
const [_, __, { rows: roles }] = await batch(c.env.DB, [
  db.deleteFrom('role_user').where('user_id', '=', user_id),
  db
    .insertInto('role_user')
    .values(role_ids.map((roleId) => ({ user_id, role_id }))),
  db
    .selectFrom('role_user')
    .select('role_id')
    .where('user_id', '=', user_id)
    .distinct(),
] as const)
// roles: number[]

db.batch(statements) の制約

D1 の提供する db.batch() では、begin transaction / commit / rollback 命令でできていたことが一部実現できません。できること・できないことを以下に書きます

できること

  • クエリの途中でエラーが起きたらロールバックする[3]
  • ロックがかかる

できないこと

  • トランザクション分離レベル[4][5]を指定できない
    • db.batch() に該当するオプションを受け取る引数がない
  • トランザクション中のクエリの結果を利用した処理ができない
    • 結果を次のクエリに渡せない
      • サブクエリで工夫するしかない
    • アプリケーションコードで判断を入れてロールバックもできない

また、D1 ではなく Kysely の制約に由来するものとして、プラグイン一覧を取り出せないのでこのヘルパー関数の結果には kyselyPlugin.transformResult() が適用されていません。自前で呼ぶ必要があります。

余談

Drizzle の対応

Drizzle ORM は 0.29.0 で batch をサポートしている
https://github.com/drizzle-team/drizzle-orm/releases/tag/0.29.0
記事を書いている途中に見つけた、この記事の Drizzle 版(先行研究)
https://leaysgur.github.io/posts/2023/10/17/213948

kysely-d1 について

  • vitest にかけたらエラー吐かれました。esm版も提供してください
    • #25 がマージされたのでそのうち出るはず

D1 のストアドプロシージャ機能について(未実装)

https://blog.cloudflare.com/whats-new-with-d1

公式ブログのこの記事が言うには、

  • D1 でトランザクションAPIを提供できない理由はコードの実行場所とDBのプライマリが離れることでスケールしなくなるため
  • ストアドプロシージャの要点は、DB内で実行されることではなくデータのすぐそばで実行されることである
    • なので、D1のプライマリ・インスタンスと同じ場所に worker を置き、そこでコードが実行されるようにすれば同じこと
    • コードの実行場所とDBのプライマリが離れないので、D1 プライマリ横 worker では DB.transaction( ... ) API を呼べるようにしてよい

これを読むと Cloudflare Durable Objects がまさに同じアプローチで実装されていることに気づく。

(ところでこの記事から1年半経ってるんですが D1 ストアドプロシージャ機能の実装まだですか)

脚注
  1. https://zenn.dev/kazu0617/articles/699ca7d354c4c0 ↩︎

  2. 公式が "Batched statements are SQL transactions." と言っている https://developers.cloudflare.com/d1/build-with-d1/d1-client-api/#batch-statements ↩︎

  3. miniflare にもロールバックするかどうかのテスト項目がある https://github.com/cloudflare/workers-sdk/blob/95a1fa1c065d4745a6c0a82af5982313d1315bf4/packages/miniflare/test/plugins/d1/suite.ts#L74 ↩︎

  4. SQLite https://www.sqlite.org/lang_transaction.html ↩︎

  5. "SQLiteにおける分離レベルはANSI/ISO SQL標準の表記とは異なっており..." https://qiita.com/Yuki_312/items/7a7dff204e67af0c613a#分離レベル ↩︎

株式会社HIKKY

Discussion