Open8

Cloudflare Queues メモ

voluntasvoluntas

Cloudflare から Cloudflare Workers で利用できる Queue がオープンベータとして発表された。軽く触ってみた感じ、かなり欲しかった機能。自分用にメモしていく。

voluntasvoluntas

ざっくり

  • wrangler で queue を作成して、wrangler.toml に設定する
  • worker から MY_QUEUE.sent(Object) のように書き込む
  • Queue に書き込まれたメッセージは async queue(batch, env) で受け取れる

つまり worker 間でのやりとりを queue 経由でできるようになる。

公式資料

ドキュメントとオープンベータのブログがあるので、それを読めば良い。

voluntasvoluntas

サンプルコード

一つの worker で送信するプロデュサー(producers)と受信するコンシューマー(consumers) を定義できる。これが凄く便利。とりあえず queue に突っ込んでおいて response を返すという処理が出来るようになった。

wrangler でキューを作成

キューを作成する前に Cloudflare Queues を Cloudflare のダッシュボードから有効にする必要がある。

wrangler queues create my-queue

wrangler.toml

[[queues.producers]]
 queue = "my-queue"
 binding = "MY_QUEUE"

[[queues.consumers]]
 queue = "<my-queue>"

queue は作られるキューの名前で管理用。
binding はコードから呼び出す時の名前で MY_QUEUE の場合は MY_QUEUE.send でキューに書き込む。

index.ts

export interface Env {
 MY_QUEUE: Queue;
}
 
export default {
 async fetch(request: Request, env: Env): Promise<Response> {
   await env.MY_QUEUE.send({"abc": "spam"})
   return new Response("OK")
 },
 async queue(batch: MessageBatch<Error>, env: Env): Promise<void> {
    for (const message of batch.messages) {
       console.log(message.body)
    }
 },
}

queue からの取り出しはバッチ処理で行われる。ここが凄く重要で複数のメッセージが入ってくる。

バッチには max_batch_size と max_batch_timeout という設定があり、これは wrangler.toml で指定できる。max_batch_size は「指定したサイズまでキューにメッセージがたまったらバッチ処理が呼び出される」という値。max_batch_timeout は「指定した時間が経過したらバッチ処理が呼び出される」という値。

  • max_batch_size のデフォルトは 10 メッセージ
  • max_batch_timeout のデフォルトは 5 秒

どちらかに達したらバッチ処理が呼び出されると考えて良い。

これはリクエストが送られて 5 秒経過、または 10 メッセージがたまったらバッチが呼び出されて入力した文字列が出力される。

キューの名前とメッセージの ID とタイムスタンプ

batch.queue で queue の名前がとれるので、処理できるキューは一つではないのがとても良い。

また、メッセージにはユニークな ID がふられる。 message.id で取得できる。message.timestamp で日時も取得できる。

voluntasvoluntas

リトライ

処理が失敗した場合はリトライを実行できる。ただしバッチのメッセージが 10 送られてきて 9 番目に処理が失敗したのでリトライをするとなっても 10 メッセージすべてがリトライされる。

リトライ回数のデフォルトは 3 。キューに追加された値の期限はデフォルト 4 日間。

(ドキュメントには 5 日って書いてあるが間違いだと思う)

デッドレター

デッドレターもちゃんとある。最大リトライ回数を超えてしまった場合はデッドレターキューという専用のキューに入れらる。このキューの値は削除されるまで 4 日間保持される。おそらくリトライも無限。

Batching and Retries · Cloudflare Queues

voluntasvoluntas

価格

安い。100 万オペレーションまでは無料で、その後 100 万オペレーション単位で 0.40 ドル。1 ドル 140 円で 56 円 ... 。

ただし注意点があり 64 KB 以上のメッセージは 2 倍のオペレーションが利用される。つまり 100 KB のメッセージをキューに突っ込んだらオペレーションは 2 回消費されることになる。

オペレーションの単位はメッセージ単位でキューへの書き込み、キューからの読み込み、キューからの削除が発生する。

再試行ももちろんオペレーションが発生するので要注意。

Cloudflare Queues - Pricing · Cloudflare Queues

voluntasvoluntas

感想

実際に Worker ログを Go API サーバ経由で TimescaleDB (TSDB) に書き込む仕組みを書いてみたが、本当に簡単にかけてびっくりした。

少しでも重い処理はキューに入れて処理するという仕組みが実現できる。プロデューサーもコンシューマーもすべて Worker で動くためスケールを考える必要がないのは強さしかない。

API Webhook の処理を非同期にする際も、簡単に実現することができる。