Cloud Pub/Subを触ってみた
はじめに
GCPのPubSubを触ってみたのでそれの備忘録です。
PubSubの概要ですが、GCPSketchnoteのリポジトリでとても分かりやすく図に纏められていました。
(他のサービスも分かりやすかったのです)
ざっとした流れは下記のようになります。
- パブリッシャーがトピックにメッセージを送信
- トピックは届いたメッセージをサブスクリプションに配信
- サブスクリプションは届いたメッセージを管理し、サブスクライバーにメッセージを送る
- push型の場合は、指定したエンドポイントにメッセージを送信
- pull型の場合は、アプリケーションからメッセージを受信する
今回は下記の構成でPubSubの動きを確認してみました。
構築
TOPICの作成
最初に②のトピックを作成します。今回はgcloudで作成します。
gcloud pubsub topics create topic-test
cloudfunctionの作成とサブスクリプションの作成
次に③と⑤を作成します。
今回、コンソールから作成しましたが、cloudfunctionのpubsubトリガーを指定するとサブスクリプションも同時に作成されます。
ドキュメントにもある通りpubsubではメッセージを受信した際、ackする必要があります。
基本的には明示的にackしますが、cloudfunctionでは例外をスローせず終了することでackします。
注: 例外がスローされ、自動再試行が有効になっていない場合、関数の呼び出し時、Cloud Functions は受信メッセージに内部で ack します。例外がスローされたときに呼び出しを自動的に再試行する方法の詳細は、バックグラウンド関数の再試行のドキュメントをご覧ください。
https://cloud.google.com/functions/docs/calling/pubsub?hl=ja
サブスクリプションの作成
④のpush型のサブスクリプションを作成します。
gcloud pubsub subscriptions create subscription-pull-test --topic=topic-test
サブスクリプションの一覧の確認
作成したサブスクリプションはlist
コマンドで確認できます。
gcloud pubsub subscriptions list
$ gcloud pubsub subscriptions list
---
ackDeadlineSeconds: 600
expirationPolicy: {}
messageRetentionDuration: 604800s
name: projects/${PROJECT}/subscriptions/gcf-subscribePushFunction-asia-northeast1-topic-test
pushConfig:
attributes:
x-goog-version: v1
pushEndpoint: https://xxxxx.appspot.com/_ah/push-handlers/pubsub/projects/${PROJECT}/topics/topic-test?pubsub_trigger=true
topic: projects/${PROJECT}/topics/topic-test
---
ackDeadlineSeconds: 10
expirationPolicy:
ttl: 2678400s
messageRetentionDuration: 604800s
name: projects/${PROJECT}/subscriptions/subscription-pull-test
pushConfig: {}
topic: projects/firebase-tutorial-suzuki/topics/topic-test
メッセージを受信するサブスクライバーを作成
⑥のローカルで動かすサブスクライバーを作成します。
import { PubSub } from '@google-cloud/pubsub';
const pubSubClient = new PubSub({ projectId: '${PROJECT}', keyFilename: './key.json' });
const subscriptionName = 'subscription-pull-test';
const timeout = 300;
function listenForMessages() {
const subscription = pubSubClient.subscription(subscriptionName);
let messageCount = 0;
const messageHandler = (message: any) => {
console.log(`Received message :`, message.id);
console.log(`Data: `, message.data.toString());
console.log(`Attributes:`, message.attributes);
messageCount += 1;
message.ack();
};
subscription.on('message', messageHandler);
setTimeout(() => {
subscription.removeListener('message', messageHandler);
console.log(`${messageCount} message(s) received.`);
}, timeout * 1000);
}
// 5分間pubsubのメッセージを受信する
listenForMessages();
実装はこちらのサンプルコードを参照しました。
メッセージを作成するパブリッシャーを作成
import { PubSub } from '@google-cloud/pubsub';
const pubSubClient = new PubSub({ projectId: '${PROJECT}', keyFilename: './key.json' });
const topicName = 'topic-test';
async function publishMessageWithCustomAttributes() {
for (let i = 0; i < 10; i++) {
const dataBuffer = Buffer.from(JSON.stringify({ data: `message${i}` }));
const customAttributes = {
origin: 'nodejs-sample',
username: 'gcp',
};
const messageId = await pubSubClient.topic(topicName).publish(dataBuffer, customAttributes);
console.log(`Message ${messageId} published.`);
}
}
publishMessageWithCustomAttributes().catch(console.error);
実装はこちらのサンプルコードを参照しました。
実行ログ確認
作成したsubscribePullFunctions.ts
を起動し、publish.ts
を実行した結果です。
publish.tsのログ
Message 2112903639819498 published.
Message 2112906498485253 published.
Message 2112905325376611 published.
Message 2112906488308818 published.
Message 2112906413743836 published.
Message 2112906048095929 published.
Message 2112905458525780 published.
Message 2112904243928711 published.
Message 2112904943349963 published.
Message 2112905305575893 published.
subscribePullFunctions.ts
Received message : 2112903639819498
Data: {"data":"message0"}
Attributes: { origin: 'nodejs-sample', username: 'gcp' }
Received message : 2112906498485253
Data: {"data":"message1"}
Attributes: { username: 'gcp', origin: 'nodejs-sample' }
Received message : 2112905325376611
Data: {"data":"message2"}
Attributes: { origin: 'nodejs-sample', username: 'gcp' }
Received message : 2112906488308818
Data: {"data":"message3"}
Attributes: { origin: 'nodejs-sample', username: 'gcp' }
Received message : 2112906413743836
Data: {"data":"message4"}
Attributes: { username: 'gcp', origin: 'nodejs-sample' }
Received message : 2112906048095929
Data: {"data":"message5"}
Attributes: { origin: 'nodejs-sample', username: 'gcp' }
Received message : 2112905458525780
Data: {"data":"message6"}
Attributes: { origin: 'nodejs-sample', username: 'gcp' }
Received message : 2112904243928711
Data: {"data":"message7"}
Attributes: { origin: 'nodejs-sample', username: 'gcp' }
Received message : 2112904943349963
Data: {"data":"message8"}
Attributes: { username: 'gcp', origin: 'nodejs-sample' }
Received message : 2112905305575893
Data: {"data":"message9"}
Attributes: { origin: 'nodejs-sample', username: 'gcp' }
cloudfunctionのログ
Cloudfunctionログを見ると送信順には実行されていません。PubSubはデフォルトは順番バラバラになるので、並べたい場合は順序指定キーを指定する必要があります。
スキーマ定義
現在プレビュー版の機能ですが、メッセージのスキーマを定義することができます。
パブリッシャーはどんなメッセージでも送信可能ですが、時には特定の構造のメッセージだけを送信できるようにしたいこともあると思います。
こういう時にスキーマを定義することで、メッセージを送信する際に定義に合わないものを弾くことが可能です。
スキーマの作成
今回は下記の構造をしたスキーマを定義します。
{
StringField: string,
FloatField: float,
BooleanField: boolean
}
プレビュー版のため、gccloudはbeta版を使用して作成します。
gcloud beta pubsub schemas create testSchema \
--type=AVRO \
--definition="{\"type\":\"record\",\"name\":\"Avro\",\"fields\":[{\"name\":\"StringField\",\"type\":\"string\"},{\"name\":\"FloatField\",\"type\":\"float\"},{\"name\":\"BooleanField\",\"type\":\"boolean\"}]}"
topicの作成
スキーマは現在、トピック作成時のみに付与することが可能です。
トピックの更新でスキーマを外したりすることはできないようです。また、スキーマを削除すると紐付いたトピックへのメッセージはすべて失敗するので注意が必要です。
gcloud beta pubsub topics create avrotopic \
--message-encoding=JSON \
--schema=testSchema
メッセージの送信
構造が正しいメッセージを送信
$ gcloud pubsub topics publish avrotopic \
--message="{\"StringField\":\"test\",\"FloatField\":123,\"BooleanField\":false}"
messageIds:
- '2113507840724102'
構造が正しくないメッセージを送信
gcloud pubsub topics publish avrotopic \
--message="{\"StringField\":\"test\"}"
ERROR: (gcloud.pubsub.topics.publish) INVALID_ARGUMENT: Request contains an invalid argument.
終わりに
今回、Pub/Subを一通り触ることができました。メッセージの送信時にバリデーションができるスキーマは便利ですが、スキーマファイルをパブリッシャーとサブスクライバーで共有する方法、TOPICに紐づくスキーマを変更する方法など考えることが結構ありそうでした。まだベータ版なのでこの辺が改善されれば使えそうな気がします。
また、GCPにはPub/Subと似たサービスのCloud Taskがあります。比較についてはこちらが参考になります。
参考資料
Discussion