📌

Firestore → BigQuery のデータ移動時のQuery timed out

に公開

本記事の内容

Firestore 内のドキュメントを stream を用いて BigQuery にエクスポートする際に、
セッションの持続時間超過と思われる通信切断に遭遇しました。

解消までにいくつかの検証を行ったので、その過程を記事にまとめます。


背景

以下のようなスクリプトを用いてFirestoreからBigqueryにデータを移動していると

sample.ts
// 省略あり
const ws = table.createWriteStream({
  schema: { fields },
  sourceFormat: "NEWLINE_DELIMITED_JSON",
  writeDisposition: "WRITE_TRUNCATE",
})
const promises = [once(ws, "error"), once(ws, "complete")]

for await (const snapshot of ourCollection.stream()) {
  const document = snapshot.data()
  const data = `${JSON.stringify(document)}\n`
  if (!ws.write(data)) {
    await once(ws, "drain")
  }
}
ws.end()
await Promise.race(promises)

他と比べて非常にデータ量の多いコレクションを処理した際に、次のようなエラーが発生しました。

Error: 14 UNAVAILABLE: Query timed out. Please try either limiting the entities scanned, or run with an updated index configuration.

この現象の理由がわからなかったので仮説を立てて調査を行いました。

仮説①:drain待ち中にタイムアウト

まず考えたのは、
BigQuery 側の処理遅延でawait once(ws, "drain")でブロックされ
その間に Firestore 側のストリームがタイムアウトしたという可能性です。

そこで、GPT-5先生に頼んでWritableの状態やメモリ使用量を定期的にログ出力する
簡易モニタリングクラスを作ってもらい、実際に監視してみました。

monitor.ts
class BqWriteStreamMonitor {
  private ws: Writable
  private logIntervalMs: number
  private stallMs: number
  private startedAt = Date.now()
  private lastProgressAt = Date.now()
  private processed = 0
  private totalBytes = 0
  private drainCount = 0
  private totalDrainWaitMs = 0
  private maxDrainWaitMs = 0
  private watchdog?: NodeJS.Timeout
  private progressTimer?: NodeJS.Timeout
  private lastLogData?: {
    processed: number
    totalBytes: number
    elapsedMs: number
    drains: ReturnType<BqWriteStreamMonitor["getDrainStats"]>
    wsStats: ReturnType<BqWriteStreamMonitor["getWritableStats"]>
    memory: ReturnType<BqWriteStreamMonitor["getMemoryStats"]>
  }

  constructor(
    ws: Writable,
    opts?: { logIntervalMs?: number; stallMs?: number }
  ) {
    this.ws = ws
    this.logIntervalMs = opts?.logIntervalMs ?? 1000
    this.stallMs = opts?.stallMs ?? 30000
  }

  private getWritableStats(): {
    writableLength?: number
    writableHighWaterMark?: number
    writableNeedDrain?: boolean
  } {
    return {
      writableLength:
        typeof this.ws.writableLength === "number"
          ? this.ws.writableLength
          : undefined,
      writableHighWaterMark:
        typeof this.ws.writableHighWaterMark === "number"
          ? this.ws.writableHighWaterMark
          : undefined,
      writableNeedDrain:
        typeof this.ws.writableNeedDrain === "boolean"
          ? this.ws.writableNeedDrain
          : undefined,
    }
  }

  start() {
    const interval = Math.min(
      Math.max(Math.floor(this.stallMs / 2), 1000),
      10000
    )
    this.watchdog = setInterval(() => {
      const now = Date.now()
      const elapsed = now - this.lastProgressAt
      if (elapsed >= this.stallMs) {
        logWarning("BigQuery export watchdog: no progress", {
          msSinceLastProgress: elapsed,
          processed: this.processed,
          totalBytes: this.totalBytes,
          drainCount: this.drainCount,
          wsStats: this.getWritableStats(),
          memory: this.getMemoryStats(),
          sinceStartMs: now - this.startedAt,
        })
      }
    }, interval)

    this.progressTimer = setInterval(() => {
      const data = this.lastLogData ?? {
        processed: this.processed,
        totalBytes: this.totalBytes,
        elapsedMs: Date.now() - this.startedAt,
        drains: this.getDrainStats(),
        wsStats: this.getWritableStats(),
        memory: this.getMemoryStats(),
      }
      logInfo("BigQuery export progress", data)
    }, this.logIntervalMs)
  }

  stop() {
    if (this.watchdog) clearInterval(this.watchdog)
    if (this.progressTimer) clearInterval(this.progressTimer)
  }

  onDocument(bytes: number) {
    this.processed++
    this.totalBytes += bytes
    this.lastProgressAt = Date.now()

    this.lastLogData = {
      processed: this.processed,
      totalBytes: this.totalBytes,
      elapsedMs: Date.now() - this.startedAt,
      drains: this.getDrainStats(),
      wsStats: this.getWritableStats(),
      memory: this.getMemoryStats(),
    }
  }

  onBackpressureStart() {
    logInfo("BigQuery export backpressure: waiting for drain", {
      processed: this.processed,
      totalBytes: this.totalBytes,
      wsStatsBefore: this.getWritableStats(),
    })
  }

  onBackpressureEnd(waitedMs: number) {
    this.drainCount++
    this.totalDrainWaitMs += waitedMs
    this.maxDrainWaitMs = Math.max(this.maxDrainWaitMs, waitedMs)
    this.lastProgressAt = Date.now()

    logInfo("BigQuery export backpressure: drain resolved", {
      waitedMs,
      processed: this.processed,
      totalBytes: this.totalBytes,
      wsStatsAfter: this.getWritableStats(),
    })
  }

  private getDrainStats() {
    return {
      drainCount: this.drainCount,
      totalDrainWaitMs: this.totalDrainWaitMs,
      maxDrainWaitMs: this.maxDrainWaitMs,
    }
  }

  private getMemoryStats() {
    const { rss, heapUsed, external } = process.memoryUsage()
    return { rss, heapUsed, external }
  }
}

sample.ts
  const promises = [once(ws, "error"), once(ws, "complete")]
  const monitor = new BqWriteStreamMonitor(ws, {
    logIntervalMs: 5000,
    stallMs: 60000,
  })
  monitor.start()

  try {
    for await (const snapshot of ourCollection.stream() as
      AsyncIterable<
        QueryDocumentSnapshot<DocumentSchema>
      >) {
      const document = snapshot.data()
      const data = `${JSON.stringify(document)}\n`
      monitor.onDocument(Buffer.byteLength(data))
      if (!ws.write(data)) {
        const t0 = Date.now()
        monitor.onBackpressureStart()
        await once(ws, "drain")
        monitor.onBackpressureEnd(Date.now() - t0)
      }
    } finally {
      monitor.stop()
    }
  }
  ws.end()
  await Promise.race(promises)

検証結果:❌drain 待機は原因ではなかった

モニタリングの結果、
drain待機は非常に短く、処理はほぼスムーズに進んでいました。
それにもかかわらず、約10分経過すると必ずエラーが発生します。

Error: 14 UNAVAILABLE: Query timed out. Please try either limiting the entities scanned, or run with an updated index configuration.

仮説②:gRPC の Deadline による切断?

Firestore の stream() は内部的に gRPC を利用しており、
gRPC では「Deadline(最大存続時間)」が設定でき、
通信が続いていても一定時間で強制切断される仕様です。

gRPC 公式ドキュメントを確認すると次のように説明されています。

If the deadline expires before the server responds, the call is terminated with a DEADLINE_EXCEEDED error.

今回はエラーコードが 14 UNAVAILABLE であるため、
SDKからのエラー通りであれば Deadline 超過が原因ではなさそうですが、
念の為Wireshark でパケットキャプチャを行い確認してみる事にしました。

TLS の複合化設定は以下の記事が参考になりました。
https://zenn.dev/mryhryki/articles/2022-09-14-capture-tls13-packet-by-wireshark

Node.js で実行している場合は、次のように鍵を出力します:

NODE_OPTIONS="--tls-keylog=$PWD/tls_keys.log"

以下の様にフィルタ条件を変えながら当該パケットを見つけます。

tls.handshake.extensions_server_name contains "firestore.googleapis.com"
tcp.port == 443  && ipv6.addr == 2404:6800:4004:825::200a
ipv6.addr == 2404:6800:4004:824::200a && http2.header.name == "grpc-timeout"

検証結果:❌Deadline 超過ではなかった

パケットを確認したところ、
grpc-timeout ヘッダーの値は全然異なる5分程度の時間になっており、
Deadline に達する前に自動的にセッション更新(再接続)が行われていることが分かりました。

切断時の通信の様子を確認

せっかくなので、切断直前のやり取りも確認しました。

サーバーからデータを受信した直後、
クライアントが FIN/ACK を返してセッションを終了しています。

対応する HTTP/2 パケット(Headers[9], PING[0])を確認すると、
Firestore SDK は単に終了メッセージを返しているだけのようでした。

仮説③:Firestore 側のセッション TTL による切断?

調査の中で複数回試したところ、
どのケースでも約10〜12分で切断が発生しました。

処理速度やデータ量を変えて試しても同様の時間で切断されるため、
おそらく Firestore 側で セッションの最大存続時間(TTL) が
内部的に設定されているものと思われます。

対策:クエリを分割して処理する

切断までの時間を延長する方法は見つかりませんでしたが、
クエリを分割して処理してみたところ上手く動作しました。

sample.ts

  let cursor: QueryDocumentSnapshot<DocumentData> | null = null
  do {
    let query = ourCollection
      .orderBy(FieldPath.documentId())
      .limit(100000)
    if (cursor) {
      query = query.startAfter(cursor)
      cursor = null
    }
    for await (cursor of query.stream() as AsyncIterable<
      QueryDocumentSnapshot<DocumentSchema>
    >) {
      const document = cursor.data()
      const data = `${JSON.stringify(document)}\n`
      monitor.onDocument(Buffer.byteLength(data))
      if (!ws.write(data)) {
        const t0 = Date.now()
        monitor.onBackpressureStart()
        await once(ws, "drain")
        monitor.onBackpressureEnd(Date.now() - t0)
      }
    }
  } while (cursor)

まとめ

  • Firestore の stream() は 長時間接続を維持するとセッション切断が発生する場合がある
  • 原因は gRPC の Deadline ではなく、Firestore 側の セッション TTL の可能性が高い
  • 対策としては、クエリを複数回に分けるのが最も簡単

そもそもStreamを使わなくて良いのでは?

ここまで辿り着いたところで、
これではもはやstreamを使うメリットが無いのではという指摘を受けました。

そこで query.stream() の使用をやめ、
また、こちらの記事で紹介されているgeneratorのアイデアを用いて
https://zenn.dev/shuji_koike/articles/eb8216c16a10bb

以下のような関数を作ってみました。

sample.ts

async function* yieldDocs(
  collection: CollectionReference<DocumentData> | Query<DocumentData>
): AsyncGenerator<QueryDocumentSnapshot<DocumentData>> {
  const pageSize = 10000
  let last: QueryDocumentSnapshot<DocumentData> | null = null
  for (;;) {
    let q = collection.orderBy(FieldPath.documentId()).limit(pageSize)
    if (last) {
      q = q.startAfter(last)
    }
    const snap = await q.get()
    if (snap.empty) {
      return
    }
    for (const docSnap of snap.docs) {
      yield docSnap
    }
    last = snap.docs[snap.docs.length - 1] ?? null
    if (snap.size < pageSize) {
      return
    }
  }
}

こんな感じで非常にシンプルになりました!

sample.ts

  for await (const docSnap of yieldDocs(ourCollection)) {
    const document = docSnap.data()
    const data = `${JSON.stringify(document)}\n`
    if (!ws.write(data)) {
      await once(ws, "drain")
    }
  }

あれ、今回の調査の意味が無くなったのでは?

より一般的な方法

尚、ただデータを移動できれば良い多くのケースでは、
一旦Cloud storageに吐き出し、LoadJobを用いる方が簡単で適切かもしれません。

https://docs.cloud.google.com/firestore/docs/reference/rest/v1/projects.databases/exportDocuments
https://www.jsdocs.io/package/%40google-cloud/bigquery?utm_source=chatgpt.com#Table.createLoadJob

参考リンク

Terass Tech Blog

Discussion