🎃

Azure Event Hub に HTTPS でイベントを送信する Python サンプル

2022/10/18に公開

Python で Azure Event Hub に HTTPS でイベントを送信するサンプルです。このサンプルでは認証として Shared Access Signature (SAS) トークンを使い、送信するデータは送信タイムスタンプを含んだ CSV 形式 (1行) とします。また、使用する Python は 3 系とします。

前提条件

  • Event Hub の名前空間とインスタンスを作成しておきます。
    • このサンプルでは、名前空間を EH_NAMESPACE 、インスタンスを EVENTHUB とします。
  • Event Hub インスタンスに対して SAS ポリシーを作成し、SAS キー値を取得しておきます。
    • このサンプルでは SAS ポリシーを SAS_POLICY、キー値を SAS_KEY とします。

必要なライブラリのインポート

import time
import datetime
import hmac
import base64
import hashlib
import requests
from urllib.parse import quote as url_parse_quote

パラメータの設定

以下のパラメータを設定します。

# Event Hub 名前空間の FQDN
FULLY_QUALIFIED_NAMESPACE = 'EH_NAMESPACE.servicebus.windows.net'

# Event Hub のインスタンス名
EVENTHUB_NAME = 'EVENTHUB'

# SAS ポリシー名
SAS_POLICY = 'SAS_POLICY'

# SAS キー値
SAS_KEY = 'SAS_KEY'

SAS トークンを取得する関数の定義

Event Hub のエンドポイント、SAS ポリシー名、SAS キー値、TTL を引数とし、SAS トークンを取得する関数を定義します。

def generate_sas_token(uri, sas_name, sas_value, token_ttl):
    sas = sas_value.encode('utf-8')
    expiry = str(int(time.time() + token_ttl))
    string_to_sign = (uri + '\n' + expiry).encode('utf-8')
    signed_hmac_sha256 = hmac.HMAC(sas, string_to_sign, hashlib.sha256)
    signature = url_parse_quote(base64.b64encode(signed_hmac_sha256.digest()))

    return 'SharedAccessSignature sr={}&sig={}&se={}&skn={}'.format(uri, signature, expiry, sas_name)

SAS トークンの取得

TTL を 3000 秒とする SAS トークンを取得します。

uri = "sb://{}/{}".format(FULLY_QUALIFIED_NAMESPACE, EVENTHUB_NAME)
token_ttl = 3000 
sas_token = generate_sas_token(uri, SAS_POLICY, SAS_KEY, token_ttl)

Event Hub へのイベントの送信

AAA,BBB,<送信タイムスタンプ> をイベントデータとするイベントを送信します。

# Event Hub のエンドポイントとヘッダーパラメータの設定
eventhub_endpoint = 'https://{}/{}/messages'.format(FULLY_QUALIFIED_NAMESPACE, EVENTHUB_NAME)
headers = {'Authorization': sas_token,
            'Content-Type': 'application/atom+xml;type=entry;charset=utf-8'}

# 送信タイムスタンプと送信イベントデータの設定
current_time = datetime.datetime.utcnow()
data = 'AAA,BBB,' + current_time.isoformat(timespec='milliseconds')

# イベントデータの送信とレスポンスの表示
response = requests.post(eventhub_endpoint, headers=headers, data=data)
print(response)

Discussion