👻

Pythonを利用してPub/Subトピックを扱ってみた

に公開

今回はPythonを利用して、Google CloudのPub/Subを利用してみました。認定資格の勉強中にももちろん出てきますし、以前実施したGCSとDataflowの連携においてもPub/Subを利用しました。Pub/SubはGoogle Cloudのサービスの中でも特に重要な要素の一つだと思いますが、直接利用したことがなかったので、PythonのSDKを利用して実際に使ってみました。

https://zenn.dev/akasan/articles/e17a1867408c53

実際に使ってみる

今回は、公式のこちらのドキュメントに従ってチュートリアルをしてみようと思います!
https://cloud.google.com/pubsub/docs/publish-receive-messages-client-library?hl=ja

環境構築

まずはローカルの環境を構築していきます。検証用のフォルダを作成して以下のようにuvを利用して環境を作りました。uvについてはこちらの記事でも参照しているのでぜひご覧ください。
https://zenn.dev/akasan/articles/39f81f8bd15790

uv init -p 3.12
uv add google-cloud-pubsub

次に今回の検証に利用するトピックを作成します。以下のコマンドにてトピックを作成します。

gcloud pubsub topics create python-connect-topic

また、トピックをサブスクリプションするために、以下のコマンドにて先ほど作成したトピックに対するサブスクリプションを作成します。

gcloud pubsub subscriptions create python-connect-sub --topic python-connect-topic

パブリッシャーの作成

それでは次にメッセージをパブリッシュするコードを実装します。今回は以下のように実装しました。project_idはご自身の利用するプロジェクトのIDにしてください。

import json
from google.cloud import pubsub_v1

project_id = "your-project-id"
topic_id = "python-connect-topic"

publisher = pubsub_v1.PublisherClient()
# The `topic_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/topics/{topic_id}`
topic_path = publisher.topic_path(project_id, topic_id)

while True:
    # メッセージをパブリッシュする
    message = input("Input Message: ")
    if message == "quit":
        break

    message = {"text": message}
    data = json.dumps(message).encode("utf-8")
    future = publisher.publish(topic_path, data)
    future.result()


print(f"Published messages to {topic_path}.")

今回は以下の要件で実装しています。

  • ターミナルからユーザ入力を受け付ける(quitが入力されると終了)
  • 入力された文字列をエンコードしてパブリッシュ

サブスクライバーの作成

次にサブスクライバーを実装します。project_idはご自身の利用するプロジェクトのIDにしてください。

from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1

project_id = "your-project-id"
subscription_id = "python-connect-sub"
timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    print(f"Received {message}.")
    message.ack()


streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.

チュートリアルの実装と同様になっており、トピックを受信して5秒以上途切れるとプログラムは終了します。

動作チェック

今回は以下のようにして検証してみました。

  • ターミナルをtmuxで左右に分割
    • 左側でパブリッシャー
    • 右側でサブスクライバー
  • パブリッシャーに入力したらサブスクライバーで受け取れるか

その結果の画像を載せておきます。パブリッシャー側で接続が完了すると、入力したテキストが右側でも表されることを確認しました。ネットワークの問題などもあるかと思いますが、大方パブリッシュしたらすぐにサブスクライブできました。

まとめ

今回はPythonのSDKを利用してPub/Subのやり取りを実現しました。実際の応用としてはIoTデバイスからトピックに対してリアルタイムにデータを送信するなどが考えられるかなと思います。利用がとてもシンプルであるため、ぜひ皆さんも試してみてください。

Discussion