📌

Cloudflare R2イベントをQueuesに送信してWorkersを動かす

2024/06/04に公開

概要

CloudflareのR2のイベントをトリガーにQueuesにキューをproduceして、Workersからキューをconsumeする、ということをやっていこうと思います。
R2 → Queues → Workes
という処理の流れです。

前提

  • Node.js, npmがインストールされている。
  • Cloudflareのアカウントが作成されており、有料プランになっている。
    (現状、Cloudflare Queuesを利用するには5$/monthのプランを購読する必要があります。)

各リソースを作成する

wranglerをインストールする

$ npm install wrangler --save-dev

ログインする

$ npx wrangler login

R2バケットを作成する

バケットのコンソールからも作成できます。

$ npx wrangler r2 bucket create <YOUR_BUCKET_NAME>

R2バケットが作成されていることを確認する

作成されたバケットはコンソールからも確認できます。

$ npx wrangler r2 bucket list

Workersを作成する

Queuesを作成するにはまずWorkersを作成しないといけないのでWorkersから作成しましょう。

$ npm create cloudflare@latest

上記を実行するといくつか質問されるので

  1. 任意のWorker名
  2. Hello World Workerを選択
  3. Typescriptを使用するか → Yes
  4. Gitを利用するか → No
  5. Workerをdeployするか → No

で質問に回答してください。
この回答通りでなくとも目的は達成できますが、簡単のためここでは上記のような回答とさせてください。
質問への回答が完了するとプロジェクトが作成されます。
今回いじるのはsrc/index.tswrangler.tomlのみです。

  • src/index.ts
    ここにhttps経由でworkerが呼び出された時の処理を書いたり、Queuesのconsumerとしてworkerが起動した時の処理を書いていきます。

  • wrangler.toml
    workerの設定ファイルになります。ここにQueuesのConsumerとしての設定を記述したります。

Queuesを作成する

$ npx wrangler queues create <YOUR_QUEUE_NAME>

QueuesとWorkesの連携

QueuesとWorkesの連携を行います。とてもシンプルで、wrangler.tomlを少しいじるだけで簡単にWorkesをQueuesのConsumerに設定できます。

wrangler.toml

workerのプロジェクトが作成されてすぐは以下のようなファイルになっているかと思います。

wrangler.toml
#:schema node_modules/wrangler/config-schema.json
name = <YOUR_WORKER_NAME>
main = "src/index.ts"
compatibility_date = "2024-05-29"
compatibility_flags = ["nodejs_compat"]

# Automatically place your workloads in an optimal location to minimize latency.
# If you are running back-end logic in a Worker, running it closer to your back-end infrastructure
# rather than the end user may result in better performance.
# Docs: https://developers.cloudflare.com/workers/configuration/smart-placement/#smart-placement
# [placement]
# mode = "smart"

# Variable bindings. These are arbitrary, plaintext strings (similar to environment variables)
# Docs:
# - https://developers.cloudflare.com/workers/wrangler/configuration/#environment-variables
# Note: Use secrets to store sensitive data.
# - https://developers.cloudflare.com/workers/configuration/secrets/
# [vars]
# MY_VARIABLE = "production_value"

# Bind the Workers AI model catalog. Run machine learning models, powered by serverless GPUs, on Cloudflare’s global network
# Docs: https://developers.cloudflare.com/workers/wrangler/configuration/#workers-ai
# [ai]
# binding = "AI"

# Bind an Analytics Engine dataset. Use Analytics Engine to write analytics within your Pages Function.
# Docs: https://developers.cloudflare.com/workers/wrangler/configuration/#analytics-engine-datasets
# [[analytics_engine_datasets]]
# binding = "MY_DATASET"

# Bind a headless browser instance running on Cloudflare's global network.
# Docs: https://developers.cloudflare.com/workers/wrangler/configuration/#browser-rendering
# [browser]
# binding = "MY_BROWSER"

# Bind a D1 database. D1 is Cloudflare’s native serverless SQL database.
# Docs: https://developers.cloudflare.com/workers/wrangler/configuration/#d1-databases
# [[d1_databases]]
# binding = "MY_DB"
# database_name = "my-database"
# database_id = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"

# Bind a dispatch namespace. Use Workers for Platforms to deploy serverless functions programmatically on behalf of your customers.
# Docs: https://developers.cloudflare.com/workers/wrangler/configuration/#dispatch-namespace-bindings-workers-for-platforms
# [[dispatch_namespaces]]
# binding = "MY_DISPATCHER"
# namespace = "my-namespace"

# Bind a Durable Object. Durable objects are a scale-to-zero compute primitive based on the actor model.
# Durable Objects can live for as long as needed. Use these when you need a long-running "server", such as in realtime apps.
# Docs: https://developers.cloudflare.com/workers/wrangler/configuration/#durable-objects
# [[durable_objects.bindings]]
# name = "MY_DURABLE_OBJECT"
# class_name = "MyDurableObject"

# Durable Object migrations.
# Docs: https://developers.cloudflare.com/workers/wrangler/configuration/#migrations
# [[migrations]]
# tag = "v1"
# new_classes = ["MyDurableObject"]

# Bind a Hyperdrive configuration. Use to accelerate access to your existing databases from Cloudflare Workers.
# Docs: https://developers.cloudflare.com/workers/wrangler/configuration/#hyperdrive
# [[hyperdrive]]
# binding = "MY_HYPERDRIVE"
# id = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"

# Bind a KV Namespace. Use KV as persistent storage for small key-value pairs.
# Docs: https://developers.cloudflare.com/workers/wrangler/configuration/#kv-namespaces
# [[kv_namespaces]]
# binding = "MY_KV_NAMESPACE"
# id = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"

# Bind an mTLS certificate. Use to present a client certificate when communicating with another service.
# Docs: https://developers.cloudflare.com/workers/wrangler/configuration/#mtls-certificates
# [[mtls_certificates]]
# binding = "MY_CERTIFICATE"
# certificate_id = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"

# Bind a Queue producer. Use this binding to schedule an arbitrary task that may be processed later by a Queue consumer.
# Docs: https://developers.cloudflare.com/workers/wrangler/configuration/#queues
# [[queues.producers]]
# binding = "MY_QUEUE"
# queue = "my-queue"

# Bind a Queue consumer. Queue Consumers can retrieve tasks scheduled by Producers to act on them.
# Docs: https://developers.cloudflare.com/workers/wrangler/configuration/#queues
# [[queues.consumers]]
# queue = "my-queue"

# Bind an R2 Bucket. Use R2 to store arbitrarily large blobs of data, such as files.
# Docs: https://developers.cloudflare.com/workers/wrangler/configuration/#r2-buckets
# [[r2_buckets]]
# binding = "MY_BUCKET"
# bucket_name = "my-bucket"

# Bind another Worker service. Use this binding to call another Worker without network overhead.
# Docs: https://developers.cloudflare.com/workers/wrangler/configuration/#service-bindings
# [[services]]
# binding = "MY_SERVICE"
# service = "my-service"

# Bind a Vectorize index. Use to store and query vector embeddings for semantic search, classification and other vector search use-cases.
# Docs: https://developers.cloudflare.com/workers/wrangler/configuration/#vectorize-indexes
# [[vectorize]]
# binding = "MY_INDEX"
# index_name = "my-index"

コメントアウトされている必要ない設定は削除して以下のようにconsumerの設定を追加しましょう。
<YOUR_WORKER_NAME>と<YOUR_QUEUE_NAME>には前のステップで作成したworkerとqueueの名前を入れてください。わからなくなった場合はコンソールなどで確認してみてください。

wrangler.toml
#:schema node_modules/wrangler/config-schema.json
name = <YOUR_WORKER_NAME>
main = "src/index.ts"
compatibility_date = "2024-05-29"
compatibility_flags = ["nodejs_compat"]

+ # Bind a Queue consumer. Queue Consumers can retrieve tasks scheduled by + + Producers to act on them.
+ # Docs: https://developers.cloudflare.com/workers/wrangler/configuration/#queues
+ [[queues.consumers]]
+ queue = <YOUR_QUEUE_NAME>

src/index.ts

src/index.tsにはこちらのドキュメントに記載されているフォーマットでコードを追加します。
export defaultされている中でqueue関数を追加します。ここでやっていることはqueueから受け取ったメッセージをconsole.logで出力しているだけです。

src/index.ts
export default {
	async fetch(request: Request, env: Env, ctx: ExecutionContext): Promise<Response> {
		return new Response('Hello World!');
	},
+	async queue(batch: MessageBatch<Error>, env: Env, ctx: +ExecutionContext): Promise<void> {
+		console.log(batch.messages)
+	}
};

workerのデプロイ

これでQueuesをWorkesの連携はほぼ完了しました。あとはデプロイするだけです。

$ npx wrangler deploy

R2イベントをQueuesから受け取る

次はR2とQueuesの連携です。R2のイベントをトリガーにQueuesにqueueをproduceします。
こちらも非常にシンプルで以下のコマンドを実行するのみです。
(現状、R2イベントをQueuesから受け取るための設定はこのコマンド上からしか実行できません。)
EVENT_TYPEはこちらを参照してください。
2つのイベントタイプがありますが、queueごとに設定できるのは1つのみのようです。

$ npx wrangler r2 bucket notification create <YOUR_BUCKET_NAME> --event-type <EVENT_TYPE> --queue <YOUR_QUEUE_NAME>

動作確認

最後に動作確認をしましょう。
Workers&Pagesから作成したworkerを選択してLogsのBegin log streamを選択して別タブでR2のページへ移動してください。

R2ページから作成したバケットを選択して、適当なファイルをドラッグ&ドロップしてファイルをアップロードしてみてください。
この時別タブで開いていたworkerの画面にログが出力されたら動作が確認できたことになります。

最後に

ここまで、Cloudflare R2イベントをQueuesに送信してWorkersを動かすところまで動作確認をしてきました。
あとは、workerの処理をカスタマイズして自分たちのプロジェクトに利用してください。

Discussion