👌

R2 の Event 通知を Queue で受け取る

2024/04/18に公開

2024年4月の Developer Week で R2 のイベント通知機能がベータリリースされましたのでしてみます。

What is Cloudflare R2 ?

R2 はCloudflareが提供する Amazon S3 互換APIを有するオブジェクトストレージです。データの保存量が半額、データのダウンロードで発生するデータダウンロード料金がゼロ円というオリジンに最適なストレージです。S3 互換ということでAWS CLIからそのまま操作可能です。
今回のアップデートでオブジェクトの作成や削除イベント時にメッセージをQueueに送信することが可能となりました。

What is Cloudflare Queue ?

Queue はメッセージのPub/Sub基盤です。Producerがメッセージを書き込み、Consumerがメッセージを取り出して処理をすることが可能で、Cloudflareの場合、Workers がConsumer として用意されています。

また将来的にはこの機能がさらに拡張され、Producer が Apache Kafka や HTTPエンドポイントをサポートし、データ処理パイプライン機能を有することがアナウンスされています。

AWSでいうとS3 → Queue → Lambda が現在サポートされ、今後Kinesis, HTTP EndPoint → Event Bridge → Lambda への拡張がアナウンスされているようなイメージでしょうか?

過去このブログシリーズではそれぞれR2とQueueをWorkerから操作する手順をまとめましたので併せて参照してみてください。
https://zenn.dev/kameoncloud/articles/96e27a6eba410e
https://zenn.dev/kameoncloud/articles/e20a11c6b22ed7

さっそくやってみる

まずマネージメントコンソールでQueueを作成します。

Create queueを押します。

r2という名前を付けます。


マネージメントコンソール上でメッセージのQueueへの送信とQueue内部のメッセージ確認が可能となっています。作成されたQueueをクリックしてMessagesタブを押します。

Send messageをクリックします。

Textタブから適当なメッセージを入れSend messageボタンを押します。

List messagesを押すと先ほどのメッセージが表示されます。

では次に R2 バケットを作成します。Create bucketを押します。

適当な名前を付けてCreate bucketを押します。

作成された bucket に対して queue へのイベント送出を設定します。現在この設定はCLIでのみ可能です。Workersの開発環境で以下を実行します。

npx wrangler r2 bucket notification create <r2_bucket_name> --event-type object-create --queue <queue_name>

--event-typeobject-createもしくはobject-deleteを指定可能です。現在のベータ中はどちらか一方しか指定できないようです。

 ⛅️ wrangler 3.51.0
-------------------
Creating event notification rule for object creation (PutObject,CompleteMultipartUpload,CopyObject)
Configuration created successfully!

では R2 マネージメントコンソールでバケットをクリックしObjectsタブを押します。

適当なファイルをアップロードします。
Queue の画面で List messages をクリックすると正しくメッセージが出ていることがわかります。Producer は Worker と指定されています。これはQueue へのメッセージを送信しているのは Worker が動作しているためです。

{"account":"709e3d845e10aafe52a88a8336178220","bucket":"qproducer","object":{"key":"<file_name>","size":17720541,"eTag":"f515eb49eaf390ece2127aa0650c2a3b"},"action":"PutObject"

では次にこの Queue メッセージを処理する Consumer Worker を作成します。いつも通りHello Worldを終わらせた後以下の内容に書き換えます。

worker.js
export default {
	async queue(batch, env) {
	  let messages = JSON.stringify(batch.messages);
	  console.log(`consumed from our queue: ${messages}`);
	  
	},
  };
wrangler.toml
name = "qconsumer"
main = "src/worker.js"
compatibility_date = "2024-04-18"

[[queues.consumers]]
 queue = <queue_name>
 max_batch_size = 10 # optional: defaults to 10
 max_batch_timeout = 5 # optional: defaults to 5 seconds

<queue_name>は実際の Queue の名前に置換します。
wrangler tailを実行するとworkerログを監視するプロセスが起動します。
この状態で再度 R2 バケットへ文字を書き込むと以下のようなものが表示されます。

Successfully created tail, expires at 2024-04-18T11:00:46Z
Connected to qconsumer, waiting for logs...
Queue r2 (1 message) - Ok @ 2024/4/18 14:03:38
  (log) consumed from our queue: [{"attempts":1,"body":{"account":"709e3d845e10aafe52a88a8336178220","bucket":"qproducer","object":{"key":"<fine_name>","size":639152,"eTag":"9bb0fe012fd87eca7ce94aaed3fd85f2"},"action":"PutObject"},"timestamp":"2024-04-18T05:03:33.663Z","id":"97dba4c5033fad88f1edb6ff076da848"}]

正しく Consumer Worker が Queue からメッセージを取得していることがわかります。
Queue マネージメントコンソールで再度 List messagesをクリックするとメッセージが表示されないはずです。Consumer Worker がメッセージを Queue から取得した時点でメッセージは削除されます。

Discussion