Pythonを利用してPub/Subトピックを扱ってみた
今回はPythonを利用して、Google CloudのPub/Subを利用してみました。認定資格の勉強中にももちろん出てきますし、以前実施したGCSとDataflowの連携においてもPub/Subを利用しました。Pub/SubはGoogle Cloudのサービスの中でも特に重要な要素の一つだと思いますが、直接利用したことがなかったので、PythonのSDKを利用して実際に使ってみました。
実際に使ってみる
今回は、公式のこちらのドキュメントに従ってチュートリアルをしてみようと思います!
環境構築
まずはローカルの環境を構築していきます。検証用のフォルダを作成して以下のようにuvを利用して環境を作りました。uvについてはこちらの記事でも参照しているのでぜひご覧ください。
uv init -p 3.12
uv add google-cloud-pubsub
次に今回の検証に利用するトピックを作成します。以下のコマンドにてトピックを作成します。
gcloud pubsub topics create python-connect-topic
また、トピックをサブスクリプションするために、以下のコマンドにて先ほど作成したトピックに対するサブスクリプションを作成します。
gcloud pubsub subscriptions create python-connect-sub --topic python-connect-topic
パブリッシャーの作成
それでは次にメッセージをパブリッシュするコードを実装します。今回は以下のように実装しました。project_id
はご自身の利用するプロジェクトのIDにしてください。
import json
from google.cloud import pubsub_v1
project_id = "your-project-id"
topic_id = "python-connect-topic"
publisher = pubsub_v1.PublisherClient()
# The `topic_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/topics/{topic_id}`
topic_path = publisher.topic_path(project_id, topic_id)
while True:
# メッセージをパブリッシュする
message = input("Input Message: ")
if message == "quit":
break
message = {"text": message}
data = json.dumps(message).encode("utf-8")
future = publisher.publish(topic_path, data)
future.result()
print(f"Published messages to {topic_path}.")
今回は以下の要件で実装しています。
- ターミナルからユーザ入力を受け付ける(quitが入力されると終了)
- 入力された文字列をエンコードしてパブリッシュ
サブスクライバーの作成
次にサブスクライバーを実装します。project_id
はご自身の利用するプロジェクトのIDにしてください。
from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1
project_id = "your-project-id"
subscription_id = "python-connect-sub"
timeout = 5.0
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)
def callback(message: pubsub_v1.subscriber.message.Message) -> None:
print(f"Received {message}.")
message.ack()
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")
# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
try:
# When `timeout` is not set, result() will block indefinitely,
# unless an exception is encountered first.
streaming_pull_future.result(timeout=timeout)
except TimeoutError:
streaming_pull_future.cancel() # Trigger the shutdown.
streaming_pull_future.result() # Block until the shutdown is complete.
チュートリアルの実装と同様になっており、トピックを受信して5秒以上途切れるとプログラムは終了します。
動作チェック
今回は以下のようにして検証してみました。
- ターミナルをtmuxで左右に分割
- 左側でパブリッシャー
- 右側でサブスクライバー
- パブリッシャーに入力したらサブスクライバーで受け取れるか
その結果の画像を載せておきます。パブリッシャー側で接続が完了すると、入力したテキストが右側でも表されることを確認しました。ネットワークの問題などもあるかと思いますが、大方パブリッシュしたらすぐにサブスクライブできました。
まとめ
今回はPythonのSDKを利用してPub/Subのやり取りを実現しました。実際の応用としてはIoTデバイスからトピックに対してリアルタイムにデータを送信するなどが考えられるかなと思います。利用がとてもシンプルであるため、ぜひ皆さんも試してみてください。
Discussion