Closed7

Azure Iot Hubs を利用したリアルタイム処理の実装にむけた情報整理

manabianmanabian

概要

Azure Iot Hubs を利用したリアルタイム処理の実装にむけた情報整理を基準します。

注意事項

  • 記述内容に誤りが含まれている場合があります。
  • 情報が古い可能性があります。
manabianmanabian

基本的な情報整理

デバイス、親デバイス、モジュール

Azure IoT Hub では デバイス、親デバイス(基本的にはゲートウェイ)、モジュール の 3 つの ID を管理します。実装パターンに応じて「どの ID で MQTT/AMQP で通信できる メッセージを署名するか」が変わるため、まず概要を押さえ、次にパターン別の使い分けを理解することが設計の第一歩です。

3 つの ID を整理すると下記表のようになります。デバイスのみで管理することもでき、親デバイスとモジュールはオプションです。

階層 役割
デバイス IoT Hub に登録する最小単位。Twin/メッセージを直接やり取り。 土壌水分センサーCO₂ センサー、IP カメラ。それぞれが LoRa や BLE で親デバイスに接続し、テレメトリは自分の device identity で署名したメッセージとして IoT Hub に届く。
親デバイス 子デバイスを束ねてクラウドへ中継するゲートウェイ(IoT Edge が典型)。 ビニールハウスに置いた Raspberry Pi。LTE でクラウドと双方向通信し、ローカルで画像解析も実行。
モジュール 1 台のデバイス内部を機能別に分割するサブ ID。 camera-ai モジュール。 病害を推論し、結果をすぐクラウドに送信

実装パターンと資格情報の使い分けると下記のようになります。

パターン 子デバイスを IoT Hub に登録? 署名に使う鍵 主なユースケース
プロトコル翻訳ゲートウェイ しない 親デバイス鍵のみ ゲートウェイのみのシンプルな構成や完全独自プロトコル(Modbus など)をクラウドに集約 (Microsoft Learn)
透過ゲートウェイ する 子デバイス鍵 センサーが MQTT/AMQP で通信できる。ゲートウェイは TLS 転送のみ (Microsoft Learn, Microsoft Learn)
ID 翻訳ゲートウェイ する 親が子鍵で代理署名 子はレガシープロトコルだが Twin/C2D を子単位で使いたい (Microsoft Learn)
ネスト Edge 下位 Edge を子として登録 下位 Edge 鍵 多段ゲートウェイで回線断に耐える産業ネットワーク (Microsoft Learn, Microsoft Learn)

デバイスと Azure IoT Hub 間の通信オプション

Azure IoT Hub はデバイスとの双方通信が可能であり、 デバイスから Azure IoT Hub に通信することと device-to-cloud (D2C) とよび、Azure IoT Hub からデバイスに通信することを cloud-to-device (C2D) とよびます。それらには下記の通信オプションが提供されています。

  • device-to-cloud
    • デバイスからクラウドへのメッセージ(Device-to-Cloud メッセージ)
    • デバイス ツインの報告されるプロパティ(Device twin's reported properties)
    • ファイルのアップロード
  • cloud-to-device
    • ダイレクト メソッド
    • デバイス ツインの必要なプロパティ(Twin's desired properties)
    • クラウドからデバイスへのメッセージ(Cloud-to-Device メッセージ)
通信オプション 主な用途・特徴 オフライン耐性/保持
デバイスからクラウドへのメッセージ 高頻度テレメトリ/アラートなど時系列データ向け IoT Hub が最長 7 ⽇間キューイングし、順次バックエンドに配送
デバイス ツインの報告されるプロパティ デバイス状態・構成をクラウド側に公開(在庫・FW バージョン等) Twin ドキュメント内に永続保存。クエリ API で随時取得可
ファイルのアップロード 大容量メディア・圧縮バッチなど、メッセージ枠に収まらないデータ ファイル自体はストレージに永続。完了通知を IoT Hub で購読可
ダイレクトメソッド 即時応答が欲しい操作(リブート、リアルタイム制御など) 同期 RPC。デバイスがオンラインでなければ即 404/最大 300 秒待機可
デバイス ツインの必要なプロパティ 長期的な設定変更(送信間隔、構成配布など) 値は Twin に保存。デバイスが再接続後に取得
クラウドからデバイスへのメッセージ 一方向通知(LED 点灯指示など)/ACK 要求・再送制御が可能 IoT Hub が最長 48 時間保持。既定 TTL = 1 h(変更可)

メッセージルーティン先

メッセージ ルーティングに下記のエンドポイントがサポートされています。

  • 組み込みのエンドポイント
  • ストレージ コンテナー
  • Service Bus キュー
  • Service Bus トピック
  • Event Hubs
  • Cosmos DB

参考リンク

メッセージルーティン可能なデータの種類

まず概要だけ押さえると、IoT Hub からメッセージ ルーティングや Event Grid 統合を通じて外部へ送り出せるデータ ソースは 6 種類 です。
標準の Device Telemetry(D2C)に加えて、Twin/Lifecycle/Connection など “状態変化系” の 非テレメトリ イベント があり、すべて共通のシステム/アプリケーション プロパティを持つため、同じクエリ構文で振り分けられます。
さらに Event Grid 側では同様のイベントを CloudEvents として受け取ることもでき、アーキテクチャに応じて選択可能です。
以下の表に 6 種類を一覧で整理しました(ジョブ系イベントは現在プレビュー相当のため Route 画面に表示されない点に注意)。

データソース 発生タイミング ペイロード概要 主なユース ケース
デバイス テレメトリ メッセージ デバイスが D2C で送信した瞬間 本文は任意 JSON/Binary。システムプロパティに iothub-connection-device-id 等が付与 時系列解析、アラート、永続化
デバイス ツイン変更イベント Desired/Reported プロパティの 更新 または Twin 全置換時 更新後バージョンと差分(update)または Twin 全体(replace)を JSON で格納 構成変更監査、CMDB 連携
デバイス ライフサイクル イベント Device/Module ID の 作成・削除 対象 Twin のスナップショット(作成)または空ボディ(削除) ID 管理同期、インベントリ自動作成
デバイス ジョブ ライフサイクル イベント ジョブ API で一括 Twin 更新や Direct Method を 作成/進行/完了/失敗 jobId,status,startTime などジョブメタデータを含む JSON OTA/設定ジョブの進捗モニタリング
デジタル ツイン変更イベント IoT Plug & Play デジタルツインのプロパティ変更時 JSON Patch 配列(op,path,value)形式で変更点のみ通知 モデル駆動 UI 更新、PnP シミュレーション
デバイス接続状態イベント デバイス/モジュールの 接続・切断 ボディは単一 sequenceNumber(256-bit 連番) 死活監視、オンライン率 KPI

SAS について

IoT Hub を利用する際には、いつくかの SAS (Shared Access Signature) トークンがあり下記の表に整理します。Built-in Event Hubs SAS は、 サービス接続のアクセス許可を付与した IoT Hub Policy SAS をそのまま利用できます。

系統 スコープ 発行元 主な取得方法
IoT Hub Policy SAS ハブ全体 運用者 Portal / CLI
Device SAS 個別デバイス デバイスまたは運用者 SDK / CLI / Portal
Module SAS 個別モジュール モジュールまたは運用者 SDK / CLI
Built-in Event Hubs SAS D2C 受信用 運用者 Portal (Built-in endpoints)
FileUpload SAS URI 特定 BLOB IoT Hub デバイス API (CreateFileUploadSasUri)
manabianmanabian

基本的な利用方法

device-to-cloud メッセージ(テレメトリー)の送受信

下記の記事にて実行方法を紹介しています。

Device twin's reported properties の送信

ファイルのアップロード

下記の記事にて実行方法を紹介しています。

ダイレクトメソッドの呼び出し

下記の記事にて実行方法を紹介しています。

Twin's desired properties の送受信

下記の記事にて実行方法を紹介しています。

Cloud-to-device messages の送受信

下記の記事にて実行方法を紹介しています。

組み込みエンドポイントからのデータ取得

下記の記事にて実行方法を紹介しています。

manabianmanabian

メッセージルーティンの実施に向けて

IoT Hub からルーティンさせたデータを Databricks で参照する方法の検証記事

エンリッチについて

値には、静的な値と下記に示されている変数を指定できます。

出所:Azure IoT Hub メッセージ エンリッチメントの概要 | Microsoft Learn

IoT Hub のリソースごとに、10個しか作成できず、エンドポイントに設定する値であることに注意が必要。

出所:Azure IoT Hub メッセージ エンリッチメントの概要 | Microsoft Learn

IoT Hub -> Event Hubs (Kafka互換エンドポイント) -> Spark 経由でデータ連携をする場合には、 header (includeHeadersオプションをtrueと指定して追加される列)に含まれていることを確認。

メッセージルーティン先のカスタムエンドポイントの個数が多くないことに注意

Event Hubs を登録する際には Hub インスタンス(topic)ごとに登録する必要があるが、追加できるエンドポイントが 10 個しか作成できない。データソースが 6 種類あるため、データの発生頻度に応じて同一のエンドポイントに書き込みを実施して、その後の処理で分割する等の対応が必要となりそう。

出所:Azure IoT Hub クォータと調整について - Azure IoT Hub | Microsoft Learn

上記の処理を実装する際には、 メッセージ エンリッチメントが役立つ可能性あり。

出所:Azure IoT Hub メッセージ エンリッチメントの概要 | Microsoft Learn

manabianmanabian

組み込みエンドポイントについて

フォールバック ルート

フォールバック ルートを有効にすることで組み込みエンドポイントに書き込みが実施される。

メッセージ ルーティングが有効になっている場合は、フォールバック ルート機能を有効にすることができます。何らかのルートが作成されると、ルートが作成されていない組み込みのエンドポイントには、データが流れなくなります。

出所: フォールバック ルート - Azure IoT Hub | Microsoft Learn

メッセージ ルーティンを作成した場合にはフォールバック ルートが無効になります。

組み込みのエンドポイントにメッセージが流れないようにするため、1 つまたは複数のカスタム ルートが作成されました。引き続きそこにデータが流れるようにする場合は、こちらをクリックして、組み込みのエンドポイントへのルートを追加します。

Azure IoT Hub の Free tier の場合にはエンドポイントを1つしか追加でないことに注意してください。

Free の IoT Hub の場合は、1 個までエンドポイントを所有できます。

Azure Event Hubs 互換エンドポイントとして利用可能

Event Hubs と互換性エンドポイントであり、 AMQP と AMQP over WebSockets により接続できる旨がドキュメントに記載されています。 Event Hubs で利用できる Kafka 互換エンドポイントについては利用できない可能性が高いです。

既定では、メッセージは Event Hubs と互換性のある、サービスに接続された組み込みエンドポイント (messages/events) にルーティングされます。

このエンドポイントは、現在、ポート 5671 で AMQP プロトコルを使用して、またポート 443 で AMQP over WebSockets を使用して限定的に公開されています。

出所:組み込みエンドポイントを理解する - Azure IoT Hub | Microsoft Learn

Spark における Event Hubs コネクターについて

Azure Event Hubs Connector for Apache Spark が公開されていますが、2025年5月21日時点ででは開発が止まってります。 Spark から Iot Hub にはアクセスできないと捉えたほうがよさそうです。

出所:GitHub - Azure/azure-event-hubs-spark: Enabling Continuous Data Processing with Apache Spark and Azure Event Hubs

manabianmanabian

Databricks から Azure IoT Hub の操作方法

Databricks から Azure IoT Hub と連携する際に困ったこと

azure-iot-deviceインストール時に警告

Databricks 環境(Databricks Runtime 16.4)でazure-iot-deviceをインストールする際に、 ライブラリの依存関係に関する警告が表示された。

%pip install azure-iot-device -q
ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
botocore 1.34.39 requires urllib3<2.1,>=1.25.4; python_version >= "3.10", but you have urllib3 2.4.0 which is incompatible.
google-api-core 2.18.0 requires protobuf!=3.20.0,!=3.20.1,!=4.21.0,!=4.21.1,!=4.21.2,!=4.21.3,!=4.21.4,!=4.21.5,<5.0.0.dev0,>=3.19.5, but you have protobuf 5.29.3 which is incompatible.
mlflow-skinny 2.11.4 requires protobuf<5,>=3.12.0, but you have protobuf 5.29.3 which is incompatible.

Databricks Serverless にてazure-iot-deviceライブラリ内のconnectメソッドが完了しない

Databricks Serverless にてazure-iot-deviceライブラリ内のconnectメソッドが完了しない事象を確認しました。標準クラスターで実行したほうがよさそうです。

%pip install azure-iot-device -q
dbutils.library.restartPython()
from azure.iot.device import IoTHubDeviceClient

device_conn_str = "HostName=iot-hub-test-001.azure-devices.net;DeviceId=test;SharedAccessKey=WxKnlNTAokWLXXXXX="

# クライアントのインスタンス化
device_client = IoTHubDeviceClient.create_from_connection_string(device_conn_str)

# クライアントに接続
device_client.connect()

標準クラスターでは下記のように正常な動作を確認しました。

想定外の接続不良への暫定対応

想定外の接続不良が断続的に発生する場合があります。本質的な対応方法を確立できておりませんが、クラスターのDetache & Atacheで対応しています。

Exception caught in background thread.  Unable to handle.
['azure.iot.device.common.transport_exceptions.ConnectionDroppedError: Unexpected disconnection\n']
Exception caught in background thread.  Unable to handle.
['azure.iot.device.common.transport_exceptions.ConnectionDroppedError: Unexpected disconnection\n']
Exception caught in background thread.  Unable to handle.
['azure.iot.device.common.transport_exceptions.ConnectionDroppedError: Unexpected disconnection\n']


manabianmanabian

Azure IoT hub と Databricks によるストリーミング処理

基本的なデータ連携パターン

Azure IoT Hub(デバイス) -> Databricks

Azure IoT Hub から Databricks に連携するパターンは下記パターンがあり、要求レイテンシーやコストに応じて使い分ける必要がある。 IoT Hub から Databricks から直接参照する方法は確率できていない。

  1. IoT Hub -- メッセージルーティン --> Azure Event Hubs -> Databricks
  2. IoT Hub -- メッセージルーティン --> Azure Storage -> Databricks

Databricks --> デバイス

  1. Databricks DLT -> Event Hubs -> デバイス
  2. Databricks ノートブック-> Azure Storage -> デバイス
  3. Databricks DLT -- Azure IoT SDK --> Azure IoT hubs

基本的な実装方針

  • Databricks DLT をベースに実装
  • メダリオンアーキテクチャで実装し、データ統合とデータ活用を分離して実装
  • ソース(ストレージと Event Hubs)に依存しないようにテーブル構造を共通化
  • Azure IoT Hubとルーティン先のカスタムエンドポイントの関係性は多対多になるケースも許容

メッセージルーティン

メッセージルーティン先のエンドポイント個数が10 個である制限があるが、Databricks に対する連携を考慮すると最低でも下記の 4 パターンを設定できることが望ましい。

  1. Azure Event Hubs 経由でテレメトリーを送信
  2. Azure Event Hubs 経由で非テレメトリーを送信
  3. Azure Storage 経由でテレメトリーを送信
  4. Azure Storage 経由で非テレメトリーを送信

エンリッチはエンドポイントで設定するため、書き込み先が同一であっても、異なるエンリッチを設定する場合には新たなエンドポイントを作成する必要がある。

Azure IoT Hub におけるエンリッチ機能の活用

データ連携後に適切なフィルタリング実施するためには Azure IoT Hub にて下記のエンリッチの値を設定することを検討する。エンリッチの値であれば、 Databricks 側で最小限の処理結果によるフィルタリングが実施可能となる。 Event Hubs 経由であればheader列に、 Azure Storage 経由であればProperties列に値が格納されます。

  1. Azure IoT hub の名称($iothubname)
  2. ツイン上に記述したデータの種類($twin.tags)
  3. テレメトリーをルーティンする場合に、データソースの値(iothub-message-schemaDeviceMessagesを指定) *1

*1 iothub-message-schemaは非テレメトリーをルーティンする際の アプリケーションプロパティのいつであり、テレメトリーにおいて本項目値をセットすることで他メッセージルーティンデータと区別が可能となる。

イベント カテゴリに関連付けられたメッセージ スキーマ。たとえば、deviceLifecycleNotification です。

出所:Azure IoT Hub の非テレメトリ イベント スキーマ | Microsoft Learn

テーブル構造の共通化を実施するために

連携されたデータのカラム名がシステムにより付与したカラムと被らないようにすることが望ましい。下記のパターンがあり、それぞれの設定例を記述します。

  • IoT Hub にて設定されるプロパティ -> iot__を説頭語として設定する
  • Event Hubs (kafka 互換エンドポイント)連携時に追加されるカラム-> kakfka__を説頭語として設定する

また、Azure IoT Hubs からメッセージルーティン先のエンドポイントによって取得できる値やカラム名が異なることにも注意が必要。

システム プロパティ名 Event Hubs Azure Storage Service Bus Event Grid
メッセージ ID message-id messageId MessageId message-id
User id user-id userId UserId user-id
Connection device id iothub-connection-device-id connectionDeviceId iothub-connection-device-id iothub-connection-device-id
Connection module id iothub-connection-module-id connectionModuleId iothub-connection-module-id iothub-connection-module-id
Connection auth generation id iothub-connection-auth-generation-id connectionDeviceGenerationId iothub-connection-auth-generation-id iothub-connection-auth-generation-id
Connection auth method iothub-connection-auth-method connectionAuthMethod iothub-connection-auth-method iothub-connection-auth-method
contentType content-type contentType ContentType iothub-content-type
contentEncoding content-encoding contentEncoding ContentEncoding iothub-content-encoding
iothub-enqueuedtime iothub-enqueuedtime enqueuedTime 該当なし iothub-enqueuedtime
CorrelationId correlation-id correlationId CorrelationId correlation-id
dt-dataschema dt-dataschema dt-dataschema dt-dataschema dt-dataschema
dt-subject dt-subject dt-subject dt-subject dt-subject

引用元:メッセージ形式について - Azure IoT Hub | Microsoft Learn

Event Hubs (kafka 互換エンドポイント)によりデータフレームを作成した場合には下記のカラムを取得できる。

Column Type 備考
key binary
value binary
topic string
partition int
offset long
timestamp long
timestampType int
headers ARRAY includeHeadersオプションをtrueにした場合に取得可能

Bronze と Silver の実装パターン

データの管理主体とデータの種類による Silver テーブルを管理すべきだが、システム連携方法によっては統廃合が必要となるケースがある。

  1. raw_of_input で分離して append
  2. 複数の raw_of_outpup をひとつの enriched に append
  3. キーがあり重複を考慮する必要がある場合には apply change API
  4. raw_of_output をテーブル化して、enriched の先を変える
  5. azure storage のデータも統合したい場合には、 raw_of_output を実体化

Azure Event Hubs 経由の場合における実装は下記のようになる想定。 Azure Storage 経由の場合には、 landing のテーブルを作成しない。

このスクラップは3日前にクローズされました