IoTメッセージングプロトコル・MQTTの基本 (Rustのクライアント実装付き)
IoTデバイスと相互通信するためのプロトコルとして、MQTTがしばしば使われます。
本エントリではMQTT 3.1.1について、基本と特徴をさらっと見つつ、メインとなる送受信シーケンスとそこでやり取りされるパケットからコアとなる機能について掘り下げていきます。
また、RustでMQTTクライアントの実装もしてますので参考にしてみてください。3.1.1の仕様に沿って、セッション状態管理以外の必要な機能は概ねカバーしているかと思います。
MQTTとは
MQTT (Message Queuing Telemetry Transport) は、IBMが1990年代に開発し、OASIS (Organization for the Advancement of Structured Information Standards Group) によって2014年から標準化されているメッセージングプロトコルです。
- 現在の最新バージョンは5.0 (2019/3リリース)
- ただし、5.0に対応していないプラットフォームも多いため、3.1.1も広く使われています
- IBMは当初、石油パイプラインのセンサーと人工衛星を通信させる目的で開発したそうです [1]
- OASISはMQTT以外にも、SAMLやebXMLなどの標準化も行なっています
アーキテクチャ
MQTTで登場するエンティティは2つです。
-
クライアント ... メッセージを送受信する
- 送信する側(パブリッシャー)は、トピック宛にメッセージを送信する
- 受信する側(サブスクライバー)は、トピックをサブスクライブして、メッセージを受信する
-
サーバー (本エントリではブローカーと呼びます) ... クライアントから全てのメッセージを受信し、それらを関連するクライアントに送信する
- パブリッシャーにとっては、ブローカーはサブスクライバーという関係
- サブスクライバーにとっては、ブローカーはパブリッシャーという関係
https://developer.ibm.com/articles/iot-mqtt-why-good-for-iot/ Figure 1抜粋
特徴
総じて利用可能なネットワーク帯域や電力に制限があるIoTデバイス向けのプロトコルとしてまとまっています。
- パブリッシュ/サブスクライブモデルに基づいた非同期通信
- トピックという概念でメッセージをルーティングすることで、1対1/1対多の双方向通信が可能
- 最小2バイトのヘッダで軽量
- ペイロードは任意のバイナリ
- TCPベースで信頼性が高い
- QoSでアプリケーションから送達性レベルを制御可能
- IoTに特化した機能
- セッション状態を保持するクリーンセッションフラグ
- Willメッセージでクライアントのオフライン時の挙動を制御可能
プラットフォーム
MQTTのブローカーやクライアントライブラリは、OSSでいくつも公開されています。
- Eclipse Mosquitto (ブローカー)
- ActiveMQ (ブローカー)
- eclipse/paho.mqtt.python (Pythonクライアント)
大量のデバイスをセキュアに接続したいといった要件でMQTTを採用しようとすると、ブローカーの構築・運用がネックになります。各社のクラウドインフラではこういったブローカーサーバーに加えて、さらに認証やデバイス管理なども含めたIoT向けのワンストップサービスを提供しています。
- AWS IoT Core
- Azure IoT Hub
- IBM Cloud IoT Solutions
- Google Cloud IoT Core (2024/3現在はサービス終了済み)
プロトコルと機能
メッセージを送受信するシーケンスと、パケットレイアウトについて簡単に説明します。
また、CONNECT・SUBSCRIBE・PUBLISHの3つのパケットを掘り下げ、MQTTのコアとなる機能について紹介していきます。
シーケンス
MQTTは、以下の流れでメッセージがやり取りされます。クライアント1がサブスクライバー、2がパブリッシャーになります。
- クライアント1が、TCPハンドシェイク後に、ブローカーに接続 (CONNECTパケット)
- クライアント1が、トピックをサブスクライブ (SUBSCRIBEパケット)
- クライアント2が、TCPハンドシェイク後に、ブローカーに接続 (CONNECTパケット)
- クライアント2が、トピック宛にメッセージをパブリッシュ (PUBLISHパケット)
- クライアント2が、ブローカーから切断 (DISCONNECTパケット)
- クライアント1が、ブローカーからメッセージを受信 (PUBLISHパケット) => 任意の処理
- (受信する必要がなくなったら) クライアント1が、ブローカーから切断 (DISCONNECTパケット)
パケットレイアウト
MQTTの1パケットは以下の構成となっています。Fixed headerのパケットタイプによって、Variable headerやPayloadで持つべき値が異なります。
- Fixed header (必須)
- パケットタイプ + フラグ (1バイト)
- Variable header + Payloadの長さ (1~4バイト)
- Variable header (オプション)
- PUBLISHパケットなら、メッセージ送信先のトピックなどを含みます
- Payload (オプション)
- PUBLISHパケットなら、本文になります
Fixed headerは、先頭4ビットでパケットタイプ、末尾4ビットで追加フラグが表現されています。3.1.1ではPUBLISHパケット以外は、パケットタイプによって固定値がセットされるようになっています。
タイプ名 | 先頭4ビット | 末尾4ビット |
---|---|---|
CONNECT | 0001 |
0000 |
PUBLISH | 0003 |
{DUP 1ビット}{QoS 2ビット}{RETAIN 1ビット} |
SUBSCRIBE | 1000 |
0010 |
DISCONNECT | 1111 |
0000 |
CONNECT
Variable header
- Protocol Name (
MQTT
固定値) - Protocol Version (3.1.1の場合、4)
- User Name, Passwordの有無フラグ
- 認証情報を文字列で渡す
- Willフラグと、それに付随するWill Retainフラグ/Will QoS (後述)
- Clean Sessionフラグ (後述)
- Keep Alive
- パケットを送ってから次のパケットを送信するまでの最大時間を秒数で指定する
- 必要であれば、クライアントから定期的にPINGREQパケットを送ることで、接続を継続する
Payload
- ClientID (必須) (後述)
- Will Topic, Will Message (Willフラグがセットされた場合、必須)
- User Name (User Nameフラグがセットされた場合、必須)
- Password (Passwordフラグがセットされた場合、必須)
CONNACK
CONNECTパケットを受け取ったブローカーは、CONNECTパケットが正しい場合に、CONNACKパケットを返します。正しくなければ、コネクションが切断されます。
CONNACKのVariable headerにはリターンコードを持ち、成功した場合は0がセットされます。
ClientID
ClientIDは、ブローカーにてクライアントを一意に識別する文字列です。
フォーマットは[0-9A-Za-z]{1,23}
で、クライアント間で重複しないように設定する必要があります。
文字列の表現
Protocol Name、User Name、Passwordや、SUBSCRIBE/PUBLISHで現れるトピックなどは文字列情報となります。
MQTTでは長さ情報 (2バイト) + UTF-8エンコード文字列
で表現されます。例えば"Hello"は0x000548656c6c6f
となります。長さ情報が2バイトで持つので、64KiBまで表現可能です。
Will
MQTTでは、接続時にメッセージ (PayloadのWill Message) をブローカーに渡しておくことで、意図しないクライアント切断時にそのメッセージを指定したトピック (PayloadのWill Topic) に送信することができます。この機能はWill (遺言) と呼ばれています。
サーバーにて切断したクライアントを検知して何らかの処理を行う、といったことが可能になります。
QoSとRetainについては、PUBLISHで説明します。
Clean Session
MQTTでは、クライアントとブローカーの双方でセッション状態を保存できます。セッションは以下のデータを持ちます。
- クライアント
- 送信済みだが、承認(ACK)されていないQoS 1/2のメッセージ
- ブローカーから受信済みだが、完了していないQoS 2のメッセージ
- ブローカーは、ClientIDごとに以下の
- クライアントがサブスクライブしていた全てのトピックフィルター
- 送信済みだが、承認(ACK)されていないQoS 1/2のメッセージ
- 受信完了していない、QoS 2のメッセージ
- 送信を保留しているQoS 1/2のメッセージ
- (オプション) QoS 0のメッセージ
Clean Sessionフラグが0の場合、セッションを再開し、保存していたメッセージの送受信が行われます。
Clean Sessionフラグが1の場合、双方のセッション状態が破棄して、新しいセッションが開始されます。
SUBSCRIBE
Variable header
- PacketID (必須)
- 2バイト数値 (後述)
Payload
1件以上のTopic Filter + Requested QoSを指定します。
- Topic Filter
- 例えば
Building1/Floor3/Device324
といった文字列になります
- 例えば
- Requested QoS
- このQoSレベルが保証されるわけではなく、PUBLISHにセットされたQoSが上限となる
- Requested QoS=2にセットしても、PUBLISHのQoS=1なら、QoS=1のシーケンスになる
SUBACK
SUBSCRIBEの返信として、SUBACKがブローカーから返されます。
SUBSCRIBEでサブスクライブ指定したトピックフィルターのリストに対応する、リターンコード0x00 (QoS0) or 0x01 (QoS1) or 0x02 (QoS2) or 0x80 (Failure) のリストが返されます。
PacketID
MQTTでは、1本のTCPコネクション上で複数のメッセージを同時にやり取りするため、受信したSUBACKパケットと過去に送信したSUBSCRIBEパケットの対応づけが必要となります。
このため、SUBSCRIBEパケットでは一意に識別するためのPacketIDが付与し、SUBACKパケットでは同じPacketIDを付与して返すことで、クライアントで対応づけができるようになります。
SUBSCRIBE/SUBACK以外にも、UNSUBSCRIBE/UNSUBACK、PUBLISH (QoS1)/PUBACK、PUBLISH(QoS2)/PUBREC、PUBREL/PUBCOMPでも同様に、ClientIDを用いた対応づけが行われます。
トピック
MQTTでは、メッセージのトピックに基づいて、ブローカーがメッセージの配信先を制御します。
パブリッシャーは、以下のようなトピック宛てにメッセージを送信します。トピックはUTF-8文字列です。/
は特別扱いされており、階層構造を表現します。
Building1/Floor3/Device324/temperature
サブスクライバーは、例えば以下のトピックフィルターでサブスクライブすると、上記のメッセージとマッチして受信できるようになります。
Building1/Floor3/Device324/temperature
Building1/Floor3/+/temperature
Building1/#
トピックフィルターでは、/
と組み合わせて、+
と#
が使えます。
-
+
... 任意の1階層とマッチする-
Building1/Floor3/+/temperature
なら、Building1/Floor3/Device324/temperature
やBuilding1/Floor3/Sensor15/temperature
などとマッチする
-
-
#
... 末尾の1つ以上の階層とマッチする-
Building1/#
なら、Building1/Floor3/Device324/temperature
やBuilding1/Floor5/Server56/status/temperature
などとマッチします
-
以下のトピックフィルターでは上記メッセージは受信できません。
Building1/Floor3/Device324
Building1/+/temperature
Building1/Floor4/#
PUBLISH
Fixed header
- DUPフラグ
- 同じメッセージを再送している場合のみ、1 (QoS0なら必ず0)
- QoSレベル
- 0 or 1 or 2
- RETAINフラグ (後述)
QoS
MQTTでは、メッセージの送達の信頼性に応じて、3段階のQoSレベルを設定できます。QoSレベルが高いほど、送達担保のための制御メッセージの送受信が発生するので、高コストになります。
- QoS0
- At Most Once、受信できないこともある
- PUBLISHを送信 (終了)
- QoS1
- At Least Once、同じメッセージを2回以上受信することもある
- PUBLISHを送信 => PUBACKを受信 (終了)
- PUBACKを受信できなければ、PUBLISHを再送 (DUP=1)
- QoS2
- Exactly Once、同じメッセージは厳密に1回の受信
- PUBLISHを送信 => PUBRECを受信 => PUBRELを送信 => PUBCOMPを受信 (終了)
Retain
RETAINフラグがセットされたメッセージは、ブローカーで保持されます。
将来、そのメッセージのトピックとマッチするトピックフィルターでサブスクライブすると、保持したメッセージはそのサブスクライバーに配信されます。
トピック単位で最新メッセージのみを保持するため、同じトピックでメッセージを送信すると、保持されたメッセージは削除されます。
まとめ
MQTT 3.1.1を取り上げて、基本機能とコアとなるシーケンスを紹介しました。
プロトコル自体が軽量であること、膨大な数のデバイスとのルーティングをトピックで抽象化していること、デバイスそのものや通信環境が不安定でもセッションやRETAINやWILLなどの機能でカバーしようとしていることなど、全体的にIoT特有の事情を汲み取った仕様となっているのが伺えます。
実装と合わせて理解の一助になれば幸いです。
参考
- https://mqtt.org/
- https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html
- https://developer.ibm.com/articles/iot-mqtt-why-good-for-iot/
Discussion