🎫

Azure Event Hubsにメッセージ送信

2024/02/23に公開

やりたいこと

Azure Event Hubsのメッセージをトリガーにしている処理をテストするために、
Azure Event Hubsに毎秒くらいでメッセージを送り続けたい。

課題

公式サンプルの接続情報に下記とあります。

EVENT_HUB_CONNECTION_STR = "EVENT_HUB_CONNECTION_STR"
EVENT_HUB_NAME = "EVENT_HUB_NAME"

Event Hubsには名前空間とインスタンスがあり、接続文字列もそれぞれあるので、
どっちに何を設定するのか迷う。

公式ドキュメント
https://learn.microsoft.com/ja-jp/azure/event-hubs/event-hubs-python-get-started-send?tabs=connection-string%2Croles-azure-portal

結果

EVENT_HUB_CONNECTION_STR = "EventHubsの名前空間の接続文字列"
EVENT_HUB_NAME = "インスタンスの名前"

EVENT_HUB_NAMEって書かれると、名前空間なのかな?と思っちゃいましたが、
インスタンスの名前です。

ちなみに、接続文字列は共有アクセスポリシーで見れます。

インスタンスの接続文字列は使う?

インスタンスの接続文字列と名前空間の接続文字列はこのような関係だったので、

"インスタンスの接続文字列" = "EventHubsの名前空間の接続文字列" + ";EntityPath=インスタンスの名前"

下記でも接続できるのかなと思ったら、接続できました。

EVENT_HUB_CONNECTION_STR = "インスタンスの接続文字列"

async def run():
    producer = EventHubProducerClient.from_connection_string(
        conn_str=EVENT_HUB_CONNECTION_STR
    )

コード

import asyncio
import datetime, json

from azure.eventhub import EventData
from azure.eventhub.aio import EventHubProducerClient

EVENT_HUB_CONNECTION_STR = "EventHubsの名前空間の接続文字列"
EVENT_HUB_NAME = "インスタンスの名前"

async def run(i):

    producer = EventHubProducerClient.from_connection_string(
        conn_str=EVENT_HUB_CONNECTION_STR, eventhub_name=EVENT_HUB_NAME
    )
    async with producer:

        event_data_batch = await producer.create_batch()

        json_object = [
            {
                "timestamp": datetime.datetime.utcnow().isoformat() + "Z",
                "message": "test",
                "index": i,
                "nestedKey": {"nestedKey1": "nestedValue1"},
                "arrayKey": ["arrayValue1", "arrayValue2"],
            }
        ]
        json_string = json.dumps(json_object, indent=4)
        event_data_batch.add(EventData(json_string))
        await producer.send_batch(event_data_batch)
        print(i)

async def run_every_second():
    i = 0
    while True:
        i += 1
        await run(i)
        await asyncio.sleep(1)  # wait for 1 second

async def main():
    task = asyncio.create_task(run_every_second())
    await asyncio.sleep(100)  # run for 100 seconds
    task.cancel()  # stop the task

asyncio.run(main())

大体、毎秒くらいでメッセージを作成し続けます。

ヘッドウォータース

Discussion