😸

Cloud Pub/Subを触ってみた

2021/03/16に公開

はじめに

GCPのPubSubを触ってみたのでそれの備忘録です。

PubSubの概要ですが、GCPSketchnoteのリポジトリでとても分かりやすく図に纏められていました。
(他のサービスも分かりやすかったのです)


https://github.com/priyankavergadia/GCPSketchnote

ざっとした流れは下記のようになります。

  1. パブリッシャーがトピックにメッセージを送信
  2. トピックは届いたメッセージをサブスクリプションに配信
  3. サブスクリプションは届いたメッセージを管理し、サブスクライバーにメッセージを送る
    • push型の場合は、指定したエンドポイントにメッセージを送信
    • pull型の場合は、アプリケーションからメッセージを受信する

今回は下記の構成でPubSubの動きを確認してみました。

構築

TOPICの作成

最初に②のトピックを作成します。今回はgcloudで作成します。

gcloud pubsub topics create topic-test

https://cloud.google.com/sdk/gcloud/reference/pubsub/topics/create?hl=ja

cloudfunctionの作成とサブスクリプションの作成

次に③と⑤を作成します。
今回、コンソールから作成しましたが、cloudfunctionのpubsubトリガーを指定するとサブスクリプションも同時に作成されます。

ドキュメントにもある通りpubsubではメッセージを受信した際、ackする必要があります。
基本的には明示的にackしますが、cloudfunctionでは例外をスローせず終了することでackします。

注: 例外がスローされ、自動再試行が有効になっていない場合、関数の呼び出し時、Cloud Functions は受信メッセージに内部で ack します。例外がスローされたときに呼び出しを自動的に再試行する方法の詳細は、バックグラウンド関数の再試行のドキュメントをご覧ください。
https://cloud.google.com/functions/docs/calling/pubsub?hl=ja

サブスクリプションの作成

④のpush型のサブスクリプションを作成します。

gcloud pubsub subscriptions create subscription-pull-test --topic=topic-test

https://cloud.google.com/sdk/gcloud/reference/pubsub/subscriptions/create?hl=ja

サブスクリプションの一覧の確認

作成したサブスクリプションはlistコマンドで確認できます。

gcloud pubsub subscriptions list

$ gcloud pubsub subscriptions list
---
ackDeadlineSeconds: 600
expirationPolicy: {}
messageRetentionDuration: 604800s
name: projects/${PROJECT}/subscriptions/gcf-subscribePushFunction-asia-northeast1-topic-test
pushConfig:
  attributes:
    x-goog-version: v1
  pushEndpoint: https://xxxxx.appspot.com/_ah/push-handlers/pubsub/projects/${PROJECT}/topics/topic-test?pubsub_trigger=true
topic: projects/${PROJECT}/topics/topic-test
---
ackDeadlineSeconds: 10
expirationPolicy:
  ttl: 2678400s
messageRetentionDuration: 604800s
name: projects/${PROJECT}/subscriptions/subscription-pull-test
pushConfig: {}
topic: projects/firebase-tutorial-suzuki/topics/topic-test

メッセージを受信するサブスクライバーを作成

⑥のローカルで動かすサブスクライバーを作成します。

subscribePullFunctions.ts
import { PubSub } from '@google-cloud/pubsub';

const pubSubClient = new PubSub({ projectId: '${PROJECT}', keyFilename: './key.json' });
const subscriptionName = 'subscription-pull-test';
const timeout = 300;

function listenForMessages() {
  const subscription = pubSubClient.subscription(subscriptionName);

  let messageCount = 0;
  const messageHandler = (message: any) => {
    console.log(`Received message :`, message.id);
    console.log(`Data: `, message.data.toString());
    console.log(`Attributes:`, message.attributes);
    messageCount += 1;

    message.ack();
  };

  subscription.on('message', messageHandler);

  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);
}

// 5分間pubsubのメッセージを受信する
listenForMessages();

実装はこちらのサンプルコードを参照しました。
https://cloud.google.com/pubsub/docs/pull

メッセージを作成するパブリッシャーを作成

publish.ts
import { PubSub } from '@google-cloud/pubsub';

const pubSubClient = new PubSub({ projectId: '${PROJECT}', keyFilename: './key.json' });
const topicName = 'topic-test';

async function publishMessageWithCustomAttributes() {
  for (let i = 0; i < 10; i++) {
    const dataBuffer = Buffer.from(JSON.stringify({ data: `message${i}` }));

    const customAttributes = {
      origin: 'nodejs-sample',
      username: 'gcp',
    };

    const messageId = await pubSubClient.topic(topicName).publish(dataBuffer, customAttributes);
    console.log(`Message ${messageId} published.`);
  }
}

publishMessageWithCustomAttributes().catch(console.error);

実装はこちらのサンプルコードを参照しました。
https://cloud.google.com/pubsub/docs/publisher

実行ログ確認

作成したsubscribePullFunctions.tsを起動し、publish.tsを実行した結果です。

publish.tsのログ

Message 2112903639819498 published.
Message 2112906498485253 published.
Message 2112905325376611 published.
Message 2112906488308818 published.
Message 2112906413743836 published.
Message 2112906048095929 published.
Message 2112905458525780 published.
Message 2112904243928711 published.
Message 2112904943349963 published.
Message 2112905305575893 published.

subscribePullFunctions.ts

Received message : 2112903639819498
Data:  {"data":"message0"}
Attributes: { origin: 'nodejs-sample', username: 'gcp' }
Received message : 2112906498485253
Data:  {"data":"message1"}
Attributes: { username: 'gcp', origin: 'nodejs-sample' }
Received message : 2112905325376611
Data:  {"data":"message2"}
Attributes: { origin: 'nodejs-sample', username: 'gcp' }
Received message : 2112906488308818
Data:  {"data":"message3"}
Attributes: { origin: 'nodejs-sample', username: 'gcp' }
Received message : 2112906413743836
Data:  {"data":"message4"}
Attributes: { username: 'gcp', origin: 'nodejs-sample' }
Received message : 2112906048095929
Data:  {"data":"message5"}
Attributes: { origin: 'nodejs-sample', username: 'gcp' }
Received message : 2112905458525780
Data:  {"data":"message6"}
Attributes: { origin: 'nodejs-sample', username: 'gcp' }
Received message : 2112904243928711
Data:  {"data":"message7"}
Attributes: { origin: 'nodejs-sample', username: 'gcp' }
Received message : 2112904943349963
Data:  {"data":"message8"}
Attributes: { username: 'gcp', origin: 'nodejs-sample' }
Received message : 2112905305575893
Data:  {"data":"message9"}
Attributes: { origin: 'nodejs-sample', username: 'gcp' }

cloudfunctionのログ

Cloudfunctionログを見ると送信順には実行されていません。PubSubはデフォルトは順番バラバラになるので、並べたい場合は順序指定キーを指定する必要があります。
https://cloud.google.com/pubsub/docs/publisher#using_ordering_keys

スキーマ定義

現在プレビュー版の機能ですが、メッセージのスキーマを定義することができます。
パブリッシャーはどんなメッセージでも送信可能ですが、時には特定の構造のメッセージだけを送信できるようにしたいこともあると思います。
こういう時にスキーマを定義することで、メッセージを送信する際に定義に合わないものを弾くことが可能です。

https://cloud.google.com/pubsub/docs/schemas#gcloud

スキーマの作成

今回は下記の構造をしたスキーマを定義します。

{
  StringField: string,
  FloatField: float,
  BooleanField: boolean
}

プレビュー版のため、gccloudはbeta版を使用して作成します。

gcloud beta pubsub schemas create testSchema \
--type=AVRO \
--definition="{\"type\":\"record\",\"name\":\"Avro\",\"fields\":[{\"name\":\"StringField\",\"type\":\"string\"},{\"name\":\"FloatField\",\"type\":\"float\"},{\"name\":\"BooleanField\",\"type\":\"boolean\"}]}"

topicの作成

スキーマは現在、トピック作成時のみに付与することが可能です。
トピックの更新でスキーマを外したりすることはできないようです。また、スキーマを削除すると紐付いたトピックへのメッセージはすべて失敗するので注意が必要です。

gcloud beta pubsub topics create avrotopic \
--message-encoding=JSON \
--schema=testSchema

メッセージの送信

構造が正しいメッセージを送信

$ gcloud pubsub topics publish avrotopic \
--message="{\"StringField\":\"test\",\"FloatField\":123,\"BooleanField\":false}"

messageIds:
- '2113507840724102'

構造が正しくないメッセージを送信

gcloud pubsub topics publish avrotopic \
--message="{\"StringField\":\"test\"}"

ERROR: (gcloud.pubsub.topics.publish) INVALID_ARGUMENT: Request contains an invalid argument.

終わりに

今回、Pub/Subを一通り触ることができました。メッセージの送信時にバリデーションができるスキーマは便利ですが、スキーマファイルをパブリッシャーとサブスクライバーで共有する方法、TOPICに紐づくスキーマを変更する方法など考えることが結構ありそうでした。まだベータ版なのでこの辺が改善されれば使えそうな気がします。

また、GCPにはPub/Subと似たサービスのCloud Taskがあります。比較についてはこちらが参考になります。
https://cloud.google.com/tasks/docs/comp-pub-sub?hl=ja

参考資料

https://future-architect.github.io/articles/20210309/

Discussion