R2 の Event 通知を Queue で受け取る
2024年4月の Developer Week で R2 のイベント通知機能がベータリリースされましたのでしてみます。
What is Cloudflare R2 ?
R2 はCloudflareが提供する Amazon S3 互換APIを有するオブジェクトストレージです。データの保存量が半額、データのダウンロードで発生するデータダウンロード料金がゼロ円というオリジンに最適なストレージです。S3 互換ということでAWS CLIからそのまま操作可能です。
今回のアップデートでオブジェクトの作成や削除イベント時にメッセージをQueueに送信することが可能となりました。
What is Cloudflare Queue ?
Queue はメッセージのPub/Sub基盤です。Producerがメッセージを書き込み、Consumerがメッセージを取り出して処理をすることが可能で、Cloudflareの場合、Workers がConsumer として用意されています。
また将来的にはこの機能がさらに拡張され、Producer が Apache Kafka や HTTPエンドポイントをサポートし、データ処理パイプライン機能を有することがアナウンスされています。
AWSでいうとS3 → Queue → Lambda が現在サポートされ、今後Kinesis, HTTP EndPoint → Event Bridge → Lambda への拡張がアナウンスされているようなイメージでしょうか?
過去このブログシリーズではそれぞれR2とQueueをWorkerから操作する手順をまとめましたので併せて参照してみてください。
さっそくやってみる
まずマネージメントコンソールでQueueを作成します。
Create queue
を押します。
r2
という名前を付けます。
マネージメントコンソール上でメッセージのQueueへの送信とQueue内部のメッセージ確認が可能となっています。作成されたQueueをクリックしてMessages
タブを押します。
Send message
をクリックします。
Text
タブから適当なメッセージを入れSend message
ボタンを押します。
List messages
を押すと先ほどのメッセージが表示されます。
では次に R2 バケットを作成します。Create bucket
を押します。
適当な名前を付けてCreate bucket
を押します。
作成された bucket に対して queue へのイベント送出を設定します。現在この設定はCLIでのみ可能です。Workersの開発環境で以下を実行します。
npx wrangler r2 bucket notification create <r2_bucket_name> --event-type object-create --queue <queue_name>
--event-type
はobject-create
もしくはobject-delete
を指定可能です。現在のベータ中はどちらか一方しか指定できないようです。
⛅️ wrangler 3.51.0
-------------------
Creating event notification rule for object creation (PutObject,CompleteMultipartUpload,CopyObject)
Configuration created successfully!
では R2 マネージメントコンソールでバケットをクリックしObjects
タブを押します。
適当なファイルをアップロードします。
Queue の画面で List messages
をクリックすると正しくメッセージが出ていることがわかります。Producer は Worker と指定されています。これはQueue へのメッセージを送信しているのは Worker が動作しているためです。
{"account":"709e3d845e10aafe52a88a8336178220","bucket":"qproducer","object":{"key":"<file_name>","size":17720541,"eTag":"f515eb49eaf390ece2127aa0650c2a3b"},"action":"PutObject"
では次にこの Queue メッセージを処理する Consumer Worker を作成します。いつも通りHello Worldを終わらせた後以下の内容に書き換えます。
export default {
async queue(batch, env) {
let messages = JSON.stringify(batch.messages);
console.log(`consumed from our queue: ${messages}`);
},
};
name = "qconsumer"
main = "src/worker.js"
compatibility_date = "2024-04-18"
[[queues.consumers]]
queue = <queue_name>
max_batch_size = 10 # optional: defaults to 10
max_batch_timeout = 5 # optional: defaults to 5 seconds
<queue_name>
は実際の Queue の名前に置換します。
wrangler tail
を実行するとworkerログを監視するプロセスが起動します。
この状態で再度 R2 バケットへ文字を書き込むと以下のようなものが表示されます。
Successfully created tail, expires at 2024-04-18T11:00:46Z
Connected to qconsumer, waiting for logs...
Queue r2 (1 message) - Ok @ 2024/4/18 14:03:38
(log) consumed from our queue: [{"attempts":1,"body":{"account":"709e3d845e10aafe52a88a8336178220","bucket":"qproducer","object":{"key":"<fine_name>","size":639152,"eTag":"9bb0fe012fd87eca7ce94aaed3fd85f2"},"action":"PutObject"},"timestamp":"2024-04-18T05:03:33.663Z","id":"97dba4c5033fad88f1edb6ff076da848"}]
正しく Consumer Worker が Queue からメッセージを取得していることがわかります。
Queue マネージメントコンソールで再度 List messages
をクリックするとメッセージが表示されないはずです。Consumer Worker がメッセージを Queue から取得した時点でメッセージは削除されます。
Discussion