Cloudflare R2イベントをQueuesに送信してWorkersを動かす
概要
CloudflareのR2のイベントをトリガーにQueuesにキューをproduceして、Workersからキューをconsumeする、ということをやっていこうと思います。
R2 → Queues → Workes
という処理の流れです。
-
R2イベントについて
https://developers.cloudflare.com/r2/buckets/event-notifications/ -
QueuesのProducer, Consumerについて
https://developers.cloudflare.com/queues/reference/how-queues-works/
WorkersからQueuesをconsumeする時には規定のフォーマットでコードを書かなければいけないのでこちらを参照してください。(後でもう一度説明します。)
前提
- Node.js, npmがインストールされている。
- Cloudflareのアカウントが作成されており、有料プランになっている。
(現状、Cloudflare Queuesを利用するには5$/monthのプランを購読する必要があります。)
各リソースを作成する
wranglerをインストールする
$ npm install wrangler --save-dev
ログインする
$ npx wrangler login
R2バケットを作成する
バケットのコンソールからも作成できます。
$ npx wrangler r2 bucket create <YOUR_BUCKET_NAME>
R2バケットが作成されていることを確認する
作成されたバケットはコンソールからも確認できます。
$ npx wrangler r2 bucket list
Workersを作成する
Queuesを作成するにはまずWorkersを作成しないといけないのでWorkersから作成しましょう。
$ npm create cloudflare@latest
上記を実行するといくつか質問されるので
- 任意のWorker名
-
Hello World
Workerを選択 - Typescriptを使用するか → Yes
- Gitを利用するか → No
- Workerをdeployするか → No
で質問に回答してください。
この回答通りでなくとも目的は達成できますが、簡単のためここでは上記のような回答とさせてください。
質問への回答が完了するとプロジェクトが作成されます。
今回いじるのはsrc/index.ts
とwrangler.toml
のみです。
-
src/index.ts
ここにhttps経由でworkerが呼び出された時の処理を書いたり、Queuesのconsumerとしてworkerが起動した時の処理を書いていきます。 -
wrangler.toml
workerの設定ファイルになります。ここにQueuesのConsumerとしての設定を記述したります。
Queuesを作成する
$ npx wrangler queues create <YOUR_QUEUE_NAME>
QueuesとWorkesの連携
QueuesとWorkesの連携を行います。とてもシンプルで、wrangler.toml
を少しいじるだけで簡単にWorkesをQueuesのConsumerに設定できます。
wrangler.toml
workerのプロジェクトが作成されてすぐは以下のようなファイルになっているかと思います。
#:schema node_modules/wrangler/config-schema.json
name = <YOUR_WORKER_NAME>
main = "src/index.ts"
compatibility_date = "2024-05-29"
compatibility_flags = ["nodejs_compat"]
# Automatically place your workloads in an optimal location to minimize latency.
# If you are running back-end logic in a Worker, running it closer to your back-end infrastructure
# rather than the end user may result in better performance.
# Docs: https://developers.cloudflare.com/workers/configuration/smart-placement/#smart-placement
# [placement]
# mode = "smart"
# Variable bindings. These are arbitrary, plaintext strings (similar to environment variables)
# Docs:
# - https://developers.cloudflare.com/workers/wrangler/configuration/#environment-variables
# Note: Use secrets to store sensitive data.
# - https://developers.cloudflare.com/workers/configuration/secrets/
# [vars]
# MY_VARIABLE = "production_value"
# Bind the Workers AI model catalog. Run machine learning models, powered by serverless GPUs, on Cloudflare’s global network
# Docs: https://developers.cloudflare.com/workers/wrangler/configuration/#workers-ai
# [ai]
# binding = "AI"
# Bind an Analytics Engine dataset. Use Analytics Engine to write analytics within your Pages Function.
# Docs: https://developers.cloudflare.com/workers/wrangler/configuration/#analytics-engine-datasets
# [[analytics_engine_datasets]]
# binding = "MY_DATASET"
# Bind a headless browser instance running on Cloudflare's global network.
# Docs: https://developers.cloudflare.com/workers/wrangler/configuration/#browser-rendering
# [browser]
# binding = "MY_BROWSER"
# Bind a D1 database. D1 is Cloudflare’s native serverless SQL database.
# Docs: https://developers.cloudflare.com/workers/wrangler/configuration/#d1-databases
# [[d1_databases]]
# binding = "MY_DB"
# database_name = "my-database"
# database_id = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
# Bind a dispatch namespace. Use Workers for Platforms to deploy serverless functions programmatically on behalf of your customers.
# Docs: https://developers.cloudflare.com/workers/wrangler/configuration/#dispatch-namespace-bindings-workers-for-platforms
# [[dispatch_namespaces]]
# binding = "MY_DISPATCHER"
# namespace = "my-namespace"
# Bind a Durable Object. Durable objects are a scale-to-zero compute primitive based on the actor model.
# Durable Objects can live for as long as needed. Use these when you need a long-running "server", such as in realtime apps.
# Docs: https://developers.cloudflare.com/workers/wrangler/configuration/#durable-objects
# [[durable_objects.bindings]]
# name = "MY_DURABLE_OBJECT"
# class_name = "MyDurableObject"
# Durable Object migrations.
# Docs: https://developers.cloudflare.com/workers/wrangler/configuration/#migrations
# [[migrations]]
# tag = "v1"
# new_classes = ["MyDurableObject"]
# Bind a Hyperdrive configuration. Use to accelerate access to your existing databases from Cloudflare Workers.
# Docs: https://developers.cloudflare.com/workers/wrangler/configuration/#hyperdrive
# [[hyperdrive]]
# binding = "MY_HYPERDRIVE"
# id = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
# Bind a KV Namespace. Use KV as persistent storage for small key-value pairs.
# Docs: https://developers.cloudflare.com/workers/wrangler/configuration/#kv-namespaces
# [[kv_namespaces]]
# binding = "MY_KV_NAMESPACE"
# id = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
# Bind an mTLS certificate. Use to present a client certificate when communicating with another service.
# Docs: https://developers.cloudflare.com/workers/wrangler/configuration/#mtls-certificates
# [[mtls_certificates]]
# binding = "MY_CERTIFICATE"
# certificate_id = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
# Bind a Queue producer. Use this binding to schedule an arbitrary task that may be processed later by a Queue consumer.
# Docs: https://developers.cloudflare.com/workers/wrangler/configuration/#queues
# [[queues.producers]]
# binding = "MY_QUEUE"
# queue = "my-queue"
# Bind a Queue consumer. Queue Consumers can retrieve tasks scheduled by Producers to act on them.
# Docs: https://developers.cloudflare.com/workers/wrangler/configuration/#queues
# [[queues.consumers]]
# queue = "my-queue"
# Bind an R2 Bucket. Use R2 to store arbitrarily large blobs of data, such as files.
# Docs: https://developers.cloudflare.com/workers/wrangler/configuration/#r2-buckets
# [[r2_buckets]]
# binding = "MY_BUCKET"
# bucket_name = "my-bucket"
# Bind another Worker service. Use this binding to call another Worker without network overhead.
# Docs: https://developers.cloudflare.com/workers/wrangler/configuration/#service-bindings
# [[services]]
# binding = "MY_SERVICE"
# service = "my-service"
# Bind a Vectorize index. Use to store and query vector embeddings for semantic search, classification and other vector search use-cases.
# Docs: https://developers.cloudflare.com/workers/wrangler/configuration/#vectorize-indexes
# [[vectorize]]
# binding = "MY_INDEX"
# index_name = "my-index"
コメントアウトされている必要ない設定は削除して以下のようにconsumerの設定を追加しましょう。
<YOUR_WORKER_NAME>と<YOUR_QUEUE_NAME>には前のステップで作成したworkerとqueueの名前を入れてください。わからなくなった場合はコンソールなどで確認してみてください。
#:schema node_modules/wrangler/config-schema.json
name = <YOUR_WORKER_NAME>
main = "src/index.ts"
compatibility_date = "2024-05-29"
compatibility_flags = ["nodejs_compat"]
+ # Bind a Queue consumer. Queue Consumers can retrieve tasks scheduled by + + Producers to act on them.
+ # Docs: https://developers.cloudflare.com/workers/wrangler/configuration/#queues
+ [[queues.consumers]]
+ queue = <YOUR_QUEUE_NAME>
src/index.ts
src/index.tsにはこちらのドキュメントに記載されているフォーマットでコードを追加します。
export defaultされている中でqueue関数を追加します。ここでやっていることはqueueから受け取ったメッセージをconsole.logで出力しているだけです。
export default {
async fetch(request: Request, env: Env, ctx: ExecutionContext): Promise<Response> {
return new Response('Hello World!');
},
+ async queue(batch: MessageBatch<Error>, env: Env, ctx: +ExecutionContext): Promise<void> {
+ console.log(batch.messages)
+ }
};
workerのデプロイ
これでQueuesをWorkesの連携はほぼ完了しました。あとはデプロイするだけです。
$ npx wrangler deploy
R2イベントをQueuesから受け取る
次はR2とQueuesの連携です。R2のイベントをトリガーにQueuesにqueueをproduceします。
こちらも非常にシンプルで以下のコマンドを実行するのみです。
(現状、R2イベントをQueuesから受け取るための設定はこのコマンド上からしか実行できません。)
EVENT_TYPEはこちらを参照してください。
2つのイベントタイプがありますが、queueごとに設定できるのは1つのみのようです。
$ npx wrangler r2 bucket notification create <YOUR_BUCKET_NAME> --event-type <EVENT_TYPE> --queue <YOUR_QUEUE_NAME>
動作確認
最後に動作確認をしましょう。
Workers&Pagesから作成したworkerを選択してLogsのBegin log stream
を選択して別タブでR2のページへ移動してください。
R2ページから作成したバケットを選択して、適当なファイルをドラッグ&ドロップしてファイルをアップロードしてみてください。
この時別タブで開いていたworkerの画面にログが出力されたら動作が確認できたことになります。
最後に
ここまで、Cloudflare R2イベントをQueuesに送信してWorkersを動かすところまで動作確認をしてきました。
あとは、workerの処理をカスタマイズして自分たちのプロジェクトに利用してください。
Discussion