🌊

Cloudflare Workers Queues を試す

2023/07/01に公開

今日は、Cloudflare Workers の Queues を試してみます。この機能は現在有償のオープンベータです。

各操作におけるデータは64KB毎にデータブロックとして計算されます。メッセージが64KBを超えた場合、例えば70KBであれば2としてカウントされます。1KB=1000Bytesで計算されメッセージ単位で約100バイトのメタデータが付与されます。

詳しい料金はこちらをご覧ください。

早速やってみる

このドキュメントの内容をもとにシンプルな手順をまずは試します。私はTypeScriptよりJavaScriptが好きなので、ドキュメントのtsサンプルはjsサンプルに書き換えています。

1.Producer用 Workersの作成
ProducerとはQueuesにデータを書き込むものを指します。いつも通り"Hello World"を作成します。プロジェクト名はこのブログではqtestとしています。この際jsでinitしましょう。手順が不明な方はこちらを参考にしてください。

Hello Worldが完了したらwrangler queues create qtestを実行します。
管理者画面では以下のようにQueueが1個作成されます。

次にwrangler.tomlに以下を追記します。

[[queues.producers]]
 queue = "qtest"
 binding = "qtest"

index.js or workers.js(使っているwranglerのバージョンで異なりますが、Hello Worldの時にデフォルトで作成されているものです)を以下に書き換えます。

export default {
	async fetch(req, env) {
	  let log = {
		url: req.url,
		method: req.method,
		headers: Object.fromEntries(req.headers),
	  };
	  await env.qtest.send(log);
	  return new Response('Success!');
	},
  };

wrangler publishを実行します。

ブラウザでアクセスする度にアクセス元の環境がQueueに書き込まれます。

2.Consumerの作成
では次にConsumerを作成します。ConsumerとはQueueからデータを取り出す存在のことです。プロジェクト名はqconsumerとしています。
"Hello World"まで完了したら index.js or workers.jsを以下に置き換えます。

export default {
	async queue(batch, env) {
	  let messages = JSON.stringify(batch.messages);
	  console.log(`consumed from our queue: ${messages}`);
	  
	},
  };

(仕様変更のためだと思いますが、ドキュメントの手順&スクリプトでは動作しないため修正してあります)
wrangler.tomlに以下を追記します。

[[queues.consumers]]
 queue = "qtest"
 max_batch_size = 10 # optional: defaults to 10
 max_batch_timeout = 5 # optional: defaults to 5 seconds

max_batch_sizeはメッセージをまとめて処理する件数、max_batch_timeoutはメッセージを一定間隔で処理する秒数です。Consumerはこのどちらかの短い間隔でメッセージを取りに行きます。時間を短くすればするほど処理遅延は短くなりますが、1回の呼び出しごとに読み込みオペレーションは発生しますので、料金とのバランスを見る必要があることに注意してください。
wrangler publishでデプロイした後、wrangler tailを行うとconsole.logの情報がリアルタイムで出力されます。
この状態で先ほど作成したProducerにブラウザで何度かアクセスを行うと以下のような情報がターミナルに出力されConsumerが正しくデータを取得できていることがわかります。

Successfully created tail, expires at 2023-07-01T07:56:53Z
Connected to qconsumer, waiting for logs...
Unknown Event - Ok @ 2023/7/1 10:57:06
  (log) consumed from our queue: [{"body":{"url":"https://qtest.harunobukameda.workers.dev/","method":"GET","headers":{"accept":"text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7","accept-encoding":"gzip","accept-language":"ja,en-US;q=0.9,en;q=0.8","cache-control":"max-age=0","cf-connecting-ip":"240d:1a:331:6a00:c10a:167e:26e6:dd7b","cf-ipcountry":"JP","cf-ray":"7dfb092d0d9e3bff","cf-visitor":"{\"scheme\":\"https\"}","connection":"Keep-Alive","cookie":"_ga=GA1.3.292078955.1677417639; _ga_PCSK1ZDQBF=GS1.1.1677417639.1.1.1677417868.0.0.0","host":"qtest.harunobukameda.workers.dev","priority":"u=0, i","sec-ch-ua":"\"Not.A/Brand\";v=\"8\", \"Chromium\";v=\"114\", \"Google Chrome\";v=\"114\"","sec-ch-ua-mobile":"?0","sec-ch-ua-platform":"\"Windows\"","sec-fetch-dest":"document","sec-fetch-mode":"navigate","sec-fetch-site":"none","sec-fetch-user":"?1","upgrade-insecure-requests":"1","user-agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36","x-forwarded-proto":"https","x-real-ip":"240d:1a:331:6a00:c10a:167e:26e6:dd7b"}},"timestamp":"2023-07-01T01:57:01.709Z","id":"2ff87287ed241392c9cda8a9c639ce54"},{"body":{"url":"https://qtest.harunobukameda.workers.dev/favicon.ico","method":"GET","headers":{"accept":"image/avif,image/webp,image/apng,image/svg+xml,image/*,*/*;q=0.8","accept-encoding":"gzip","accept-language":"ja,en-US;q=0.9,en;q=0.8","cf-connecting-ip":"240d:1a:331:6a00:c10a:167e:26e6:dd7b","cf-ipcountry":"JP","cf-ray":"7dfb092edefc3bff","cf-visitor":"{\"scheme\":\"https\"}","connection":"Keep-Alive","cookie":"_ga=GA1.3.292078955.1677417639; _ga_PCSK1ZDQBF=GS1.1.1677417639.1.1.1677417868.0.0.0","host":"qtest.harunobukameda.workers.dev","priority":"u=1, i","referer":"https://qtest.harunobukameda.workers.dev/","sec-ch-ua":"\"Not.A/Brand\";v=\"8\", \"Chromium\";v=\"114\", \"Google Chrome\";v=\"114\"","sec-ch-ua-mobile":"?0","sec-ch-ua-platform":"\"Windows\"","sec-fetch-dest":"image","sec-fetch-mode":"no-cors","sec-fetch-site":"same-origin","user-agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36","x-forwarded-proto":"https","x-real-ip":"240d:1a:331:6a00:c10a:167e:26e6:dd7b"}},"timestamp":"2023-07-01T01:57:01.974Z","id":"6c9eb0f78122329285672b87acc83b5b"},{"body":{"url":"https://qtest.harunobukameda.workers.dev/","method":"GET","headers":{"accept":"text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7","accept-encoding":"gzip","accept-language":"ja,en-US;q=0.9,en;q=0.8","cache-control":"max-age=0","cf-connecting-ip":"240d:1a:331:6a00:c10a:167e:26e6:dd7b","cf-ipcountry":"JP","cf-ray":"7dfb09352c5b3bff","cf-visitor":"{\"scheme\":\"https\"}","connection":"Keep-Alive","cookie":"_ga=GA1.3.292078955.1677417639; _ga_PCSK1ZDQBF=GS1.1.1677417639.1.1.1677417868.0.0.0","host":"qtest.harunobukameda.workers.dev","priority":"u=0, i","sec-ch-ua":"\"Not.A/Brand\";v=\"8\", \"Chromium\";v=\"114\", \"Google Chrome\";v=\"114\"","sec-ch-ua-mobile":"?0","sec-ch-ua-platform":"\"Windows\"","sec-fetch-dest":"document","sec-fetch-mode":"navigate","sec-fetch-site":"none","sec-fetch-user":"?1","upgrade-insecure-requests":"1","user-agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36","x-forwarded-proto":"https","x-real-ip":"240d:1a:331:6a00:c10a:167e:26e6:dd7b"}},"timestamp":"2023-07-01T01:57:02.981Z","id":"7b01ae621cc0390e6e074f262c509c33"},{"body":{"url":"https://qtest.harunobukameda.workers.dev/favicon.ico","method":"GET","headers":{"accept":"image/avif,image/webp,image/apng,image/svg+xml,image/*,*/*;q=0.8","accept-encoding":"gzip","accept-language":"ja,en-US;q=0.9,en;q=0.8","cf-connecting-ip":"240d:1a:331:6a00:c10a:167e:26e6:dd7b","cf-ipcountry":"JP","cf-ray":"7dfb0936ad803bff","cf-visitor":"{\"scheme\":\"https\"}","connection":"Keep-Alive","cookie":"_ga=GA1.3.292078955.1677417639; _ga_PCSK1ZDQBF=GS1.1.1677417639.1.1.1677417868.0.0.0","host":"qtest.harunobukameda.workers.dev","priority":"u=1, i","referer":"https://qtest.harunobukameda.workers.dev/","sec-ch-ua":"\"Not.A/Brand\";v=\"8\", \"Chromium\";v=\"114\", \"Google Chrome\";v=\"114\"","sec-ch-ua-mobile":"?0","sec-ch-ua-platform":"\"Windows\"","sec-fetch-dest":"image","sec-fetch-mode":"no-cors","sec-fetch-site":"same-origin","user-agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36","x-forwarded-proto":"https","x-real-ip":"240d:1a:331:6a00:c10a:167e:26e6:dd7b"}},"timestamp":"2023-07-01T01:57:03.221Z","id":"85f24503017efd57d10a34ecffaa1c20"}]

なおこの関数はreturnが定義されていないため、ブラウザでアクセスすると以下のExceptionが投げられますので注意してください。

GET https://qconsumer.harunobukameda.workers.dev/ - Exception Thrown @ 2023/7/1 10:58:33
X [ERROR]   Error: Handler does not export a fetch() function.

管理者画面ではConsumerが登録されていることがわかります。


先ほどのConsumer用Workers関数をpublishした時点でこの登録は自動されています。ここに登録されていなければQueueからデータを読み込めない(認証を兼ねている)ことに注意してください。AWSではIAMを使った認証を行いますが、Cloudflareではこのようにバインディングがその設定を兼ねています。

色々遊ぶと以下のようなレポートが管理者画面で出力されます。

Discussion