📝

MQTTを用いたシステム開発(開発日誌)

に公開

📘 目次

  1. MQTTとは
  2. 従来の通信(HTTP)との違い
  3. MQTTの仕組み
  4. Pub/Subモデルの詳細
  5. QoS(Quality of Service)
  6. メリット・デメリット
  7. あなたのシステムでの使用例
  8. 他のプロトコルとの比較
  9. まとめ

MQTTとは

基本概念

MQTT (Message Queuing Telemetry Transport) は、軽量なメッセージング通信プロトコルです。

┌──────────────────────────────────────────────────────────┐
│  MQTT = IoT専用の軽量通信プロトコル                      │
│                                                          │
│  特徴:                                                   │
│  - 軽量(ヘッダー2バイト〜)                            │
│  - 低帯域・不安定なネットワークに最適                   │
│  - 双方向通信が可能                                     │
│  - Publish/Subscribe(発行/購読)モデル                │
└──────────────────────────────────────────────────────────┘

誕生の背景

1999年 IBM と Arcom が開発

【開発の動機】
- 石油パイプラインの監視システム用
- 衛星通信(低帯域・高遅延)に最適化
- バッテリー駆動デバイスで長持ちさせたい

【結果】
→ IoTデバイスに最適なプロトコルが完成

従来の通信(HTTP)との違い

HTTPとMQTTの根本的な違い

┌─────────────────────────────────────────────────────────────┐
│                    通信モデルの違い                         │
└─────────────────────────────────────────────────────────────┘

【HTTP: リクエスト/レスポンス モデル】

クライアント                         サーバー
   │                                    │
   │─── リクエスト送信 ────────────────→│
   │                                    │
   │         (待機中...)              │
   │                                    │
   │←── レスポンス返信 ─────────────────│
   │                                    │
   └─ 接続終了                         └─ 処理完了


【MQTT: Publish/Subscribe モデル】

Publisher          Broker          Subscriber
   │                 │                 │
   │── Publish ─────→│                 │
   │                 │─── Publish ────→│
   │                 │                 │
   │                 │←── Subscribe ───│
   │←── Publish ─────│                 │
   │                 │                 │
   └─ 常時接続      └─ 中継役        └─ 常時接続

詳細比較表

項目 HTTP MQTT
通信モデル リクエスト/レスポンス Publish/Subscribe
接続方式 都度接続(短命) 常時接続(長命)
通信方向 ❌ 単方向(Client→Server) ✅ 双方向
ヘッダーサイズ 200-500バイト 2バイト〜
オーバーヘッド 大きい 極小
サーバーからの通知 ❌ 不可能(ポーリング必要) ✅ 可能(Push通知)
バッテリー消費 高い 低い
ネットワーク帯域 必要 最小限
用途 Web、REST API IoT、リアルタイム通信

MQTTの仕組み

3つの主要コンポーネント

┌───────────────┐
│  Publisher    │ ← メッセージを発行する側(RaspberryPi等)
│ (発行者)    │
└───────────────┘
        │
        │ ① Publish(トピックにメッセージ送信)
        ↓
┌───────────────┐
│  MQTT Broker  │ ← メッセージを中継する仲介役(AWS IoT Core等)
│ (ブローカー)│
└───────────────┘
        │
        │ ② Deliver(購読者に配信)
        ↓
┌───────────────┐
│  Subscriber   │ ← メッセージを購読する側(Lambda等)
│ (購読者)    │
└───────────────┘

トピック(Topic)の概念

MQTTではトピックを使ってメッセージをルーティングします。

トピックの階層構造(スラッシュ区切り)

home/livingroom/temperature
home/livingroom/humidity
home/bedroom/temperature
home/bedroom/humidity

あなたのシステムの例:
sensor/data          ← センサーデータ
raspi/display        ← 表示メッセージ
raspi/command        ← コマンド

ワイルドカード

+ : 単一レベルのワイルドカード
# : 複数レベルのワイルドカード

例:
home/+/temperature   → home/livingroom/temperature
                     → home/bedroom/temperature
                     
home/#               → home/livingroom/temperature
                     → home/bedroom/humidity
                     → home/kitchen/light/status

Pub/Subモデルの詳細

従来のHTTP(リクエスト/レスポンス)

# ❌ HTTPの場合: サーバーから能動的に送信できない

# RaspberryPi側
while True:
    # センサーデータ送信
    requests.post("https://server.com/sensor", data=sensor_data)
    
    # サーバーからのコマンドをチェック(ポーリング)
    response = requests.get("https://server.com/commands")
    # ↑ コマンドがなくても毎回通信が必要(無駄)
    
    time.sleep(10)  # 10秒ごとにポーリング


# サーバー側
# RaspberryPiにrebootコマンドを送りたい...
# でもRaspberryPiがリクエストしてこないと送れない! ❌

HTTPの問題点

【問題1: ポーリングの無駄】
┌─────────────────────────────────────┐
│ 時刻    通信    結果                │
├─────────────────────────────────────┤
│ 10:00  GET     コマンドなし(無駄)  │
│ 10:10  GET     コマンドなし(無駄)  │
│ 10:20  GET     コマンドなし(無駄)  │
│ 10:30  GET     rebootコマンド発見!  │
└─────────────────────────────────────┘
→ 30分間で4回通信、実際に必要なのは1回だけ


【問題2: リアルタイム性の欠如】
サーバーがコマンド発行(10:25)
  ↓
RaspberryPiが次回ポーリング(10:30)
  ↓
5分の遅延が発生!


【問題3: NAT/Firewallの問題】
インターネット
     ↓
  ルーター(NAT)
     ↓
  RaspberryPi

サーバー → RaspberryPi の接続は不可能!
(外部からの直接接続がブロックされる)

MQTT(Publish/Subscribe)

# ✅ MQTTの場合: 双方向通信が可能

# RaspberryPi側
client = mqtt.Client()
client.connect("mqtt.broker.com", 8883)

# ① 購読(Subscribe)- 受信したいトピックを登録
client.subscribe("raspi/command")

# ② コールバック設定 - メッセージが来たら自動実行
def on_message(client, userdata, msg):
    if msg.topic == "raspi/command":
        command = json.loads(msg.payload)
        if command["action"] == "reboot":
            subprocess.run("sudo reboot", shell=True)

client.on_message = on_message

# ③ ループ開始 - バックグラウンドで受信監視
client.loop_start()

# ④ センサーデータを送信(Publisher)
while True:
    sensor_data = get_sensor_data()
    client.publish("sensor/data", json.dumps(sensor_data))
    time.sleep(60)  # 60秒ごと


# サーバー側(AWS Lambda等)
# RaspberryPiにrebootコマンドを送る
iot_client.publish(
    topic='raspi/command',
    payload=json.dumps({"action": "reboot"})
)
# → 即座にRaspberryPiに届く! ✅

MQTTの利点

【利点1: 無駄な通信が不要】
┌─────────────────────────────────────┐
│ 常時接続維持(Keep Alive)          │
│ メッセージがある時だけ通信          │
│                                     │
│ 10:00-10:25  通信なし              │
│ 10:25        rebootコマンド受信!   │
└─────────────────────────────────────┘
→ 必要な通信のみ(効率的)


【利点2: リアルタイム】
サーバーがコマンド発行(10:25:00)
  ↓
RaspberryPiが即座に受信(10:25:00.1)
  ↓
遅延: 0.1秒!(ほぼリアルタイム)


【利点3: NAT/Firewall越え】
RaspberryPi → Broker へ接続(1回だけ)
  ↓
この接続を双方向で使用
  ↓
Broker → RaspberryPi へ送信可能!
(外部からの直接接続は不要)

QoS(Quality of Service)

MQTTには3つのQoSレベルがあります。

QoS 0: At most once(最大1回)

Publisher → Broker → Subscriber
     │         │         │
     │  送信   │   配信  │
     └────────→└────────→
     
確認応答なし(Fire and Forget)

【特徴】
✅ 最速
✅ 最も軽量
❌ メッセージが届かない可能性あり
❌ 重複の可能性あり

【用途】
- センサーデータ(次のデータで上書きされる)
- リアルタイム性が重要

QoS 1: At least once(少なくとも1回)⭐

Publisher → Broker → Subscriber
     │         │         │
     │  PUBLISH│         │
     ├────────→│  PUBLISH│
     │         ├────────→│
     │         │  PUBACK │
     │         │←────────┤
     │  PUBACK │         │
     │←────────┤         │
     
確認応答あり

【特徴】
✅ 配信保証あり
✅ パフォーマンスも良好
⚠️ 重複の可能性あり(稀)
✅ あなたのシステムで使用中

【用途】
- 重要なセンサーデータ
- コマンド送信
- 一般的なIoT通信

QoS 2: Exactly once(正確に1回)

Publisher → Broker → Subscriber
     │         │         │
     │ PUBLISH │         │
     ├────────→│ PUBLISH │
     │         ├────────→│
     │ PUBREC  │         │
     │←────────┤ PUBREC  │
     │         │←────────┤
     │ PUBREL  │         │
     ├────────→│ PUBREL  │
     │         ├────────→│
     │ PUBCOMP │         │
     │←────────┤ PUBCOMP │
     │         │←────────┤
     
4ステップハンドシェイク

【特徴】
✅ 完全な配信保証
✅ 重複なし
❌ 最も遅い
❌ オーバーヘッド大

【用途】
- 金融取引
- 医療データ
- 絶対に重複してはいけないデータ

今回のシステムでの使用例

# QoS 1 を使用
client.publish("sensor/data", json.dumps(payload), qos=1)

【理由】
✅ センサーデータは確実に届いてほしい
✅ パフォーマンスも重要
⚠️ 稀な重複は許容範囲(DynamoDBで上書きされる)

メリット・デメリット

✅ メリット

1. 軽量

【ヘッダーサイズ比較】

HTTP/1.1:
GET / HTTP/1.1
Host: example.com
User-Agent: Mozilla/5.0...
Accept: text/html...
Cookie: session=abc123...
...
合計: 200-500バイト


MQTT:
固定ヘッダー: 2バイト
可変ヘッダー: トピック名の長さ
合計: 最小2バイト、通常10-20バイト

→ HTTPの1/10〜1/50のサイズ!

2. 低消費電力

【バッテリー駆動デバイスでの比較】

HTTP(ポーリング方式):
- 10秒ごとに接続・切断
- 1日あたり: 8,640回の接続
- バッテリー寿命: 約1週間


MQTT(常時接続):
- 1回接続してKeep Alive
- Keep Alive: 60秒に1回のPING(2バイト)
- バッテリー寿命: 約2ヶ月〜半年

→ 約8〜24倍長持ち!

3. 双方向通信

HTTP:
Client → Server ✅
Server → Client ❌(ポーリング必要)


MQTT:
Publisher ⇄ Broker ⇄ Subscriber ✅
完全な双方向通信

4. 不安定なネットワークに強い

【再接続の仕組み】

接続が切れた場合:
1. 自動再接続
2. QoS 1/2のメッセージは再送
3. Subscriberは再購読
4. セッションの復元(Clean Session=false)

→ 一時的な切断でもデータロスなし

❌ デメリット

1. Broker(仲介役)が必要

HTTP:
Client ←→ Server(直接通信)


MQTT:
Publisher ←→ Broker ←→ Subscriber
              ↑
         必須コンポーネント

→ Brokerの運用・管理が必要
(ただしAWS IoT Coreなら完全マネージド)

2. 大量データには不向き

MQTT:
- 小さなメッセージ(センサーデータ等)に最適
- 大きなファイル(画像・動画)には不向き

推奨メッセージサイズ: 1KB以下
最大メッセージサイズ: 256MB(実用的には1MB以下)

3. HTTPより複雑

HTTP: 誰でも知っている、ツールも豊富
MQTT: 専用ライブラリ・知識が必要

今回のシステムでの使用例

システム構成図

┌─────────────────────────────────────────────────────────────┐
│  RaspberryPi(Publisher & Subscriber)                      │
│                                                             │
│  [Publisher機能]                                            │
│  ├─ トピック: sensor/data                                  │
│  ├─ ペイロード: {temperature, humidity, pressure}         │
│  ├─ QoS: 1                                                 │
│  └─ 頻度: 60秒ごと                                         │
│                                                             │
│  [Subscriber機能]                                           │
│  ├─ トピック: raspi/display                                │
│  ├─ トピック: raspi/command                                │
│  └─ QoS: 1                                                 │
└─────────────────────────────────────────────────────────────┘
                          │
                          │ MQTT over TLS 1.2
                          │ (暗号化・証明書認証)
                          ↓
┌─────────────────────────────────────────────────────────────┐
│  AWS IoT Core(MQTT Broker)                                │
│                                                             │
│  [役割]                                                     │
│  ├─ メッセージの中継                                       │
│  ├─ トピックのルーティング                                 │
│  ├─ 証明書認証                                             │
│  ├─ デバイス管理                                           │
│  └─ IoT Rulesでの処理                                      │
└─────────────────────────────────────────────────────────────┘
                          │
         ┌────────────────┼────────────────┐
         ↓                ↓                ↓
    [Lambda]        [DynamoDB]           [S3]

実際の通信フロー

シナリオ1: センサーデータ送信(RaspberryPi → AWS)

時刻: 10:00:00

[RaspberryPi]
├─ 温度取得: 25.5℃
├─ 湿度取得: 60.2%
└─ 気圧取得: 1013.25 hPa

↓ MQTT Publish (QoS 1)

client.publish(
    topic="sensor/data",
    payload={
        "deviceId": "RaspiDevice01",
        "temperature": 25.5,
        "humidity": 60.2,
        "pressure": 1013.25,
        "timestamp": "2025-10-16 10:00:00"
    },
    qos=1
)

↓ 通信サイズ: 約150バイト

[AWS IoT Core]
├─ メッセージ受信
├─ PUBACK送信(確認応答)
├─ IoT Rules評価
└─ Lambda関数起動

↓

[Lambda]
├─ DynamoDB保存
├─ S3 JSON保存
└─ S3 CSV保存

【所要時間】
- RaspberryPi → AWS IoT Core: 50-100ms
- Lambda実行: 200-500ms
- 合計: 約300-600ms

シナリオ2: S3ファイル表示(AWS → RaspberryPi)

[AWS Lambda]
├─ S3からファイル取得
└─ MQTT Publish

iot_client.publish(
    topic='raspi/display',
    qos=1,
    payload={
        "source": "S3_Lambda",
        "filename": "message.txt",
        "message": "Hello from S3!"
    }
)

↓ 即座に配信(ポーリング不要)

[AWS IoT Core]
└─ 購読者に配信

↓ 0.1秒以内

[RaspberryPi]
├─ on_message() コールバック実行
└─ Sense HAT LEDに表示

【所要時間】
- AWS → RaspberryPi: 100-200ms
- リアルタイムに近い!

シナリオ3: リモート再起動(AWS → RaspberryPi)

[AWS IoT Console または Lambda]
└─ コマンド送信

iot_client.publish(
    topic='raspi/command',
    qos=1,
    payload={"command": "reboot"}
)

↓ Push通知(ポーリング不要)

[AWS IoT Core]
└─ 購読者に配信

↓ 即座

[RaspberryPi]
├─ on_message() 実行
├─ コマンド検証
├─ LED表示: "REBOOT"
└─ システム再起動

【HTTPとの比較】
HTTP: コマンド発行 → 最大10秒遅延(ポーリング間隔)
MQTT: コマンド発行 → 0.1秒で到達

Keep Alive(接続維持)の仕組み

あなたのシステムでの動作:

[時刻]  [RaspberryPi]           [AWS IoT Core]
10:00   ├─ CONNECT ────────────→ CONNACK
        │
10:01   │ (センサーデータ送信)
        │
10:02   │ (何もなし)
        │
10:03   │ (何もなし)
        │
...
10:10   ├─ PINGREQ ────────────→ PINGRESP
        │   (Keep Alive)
        │
10:20   ├─ PINGREQ ────────────→ PINGRESP
        │
        │ (接続維持中...)
        │
        │ ← メッセージ待機中 ←
        │   いつでも受信可能
        │
12:00   │ ← raspi/command ←─── Lambda
        └─ reboot実行!

【ポイント】
- 常時接続を1本維持
- Keep Aliveで生存確認(通常60-120秒ごと)
- この接続を使って双方向通信

他のプロトコルとの比較

MQTT vs HTTP vs WebSocket vs CoAP

項目 MQTT HTTP WebSocket CoAP
通信モデル Pub/Sub Req/Res 双方向 Req/Res
接続 常時接続 都度接続 常時接続 UDP(軽量)
ヘッダー 2バイト〜 200-500バイト 数十バイト 4バイト
QoS ✅ 3レベル ❌ なし ❌ なし ✅ 2レベル
双方向 ✅ 可能 ❌ 不可 ✅ 可能 ❌ 不可
消費電力 ⭐⭐⭐⭐⭐ ⭐⭐⭐ ⭐⭐⭐⭐
用途 IoT Web リアルタイムWeb 超低電力IoT

具体的な数値比較

【1日のデータ送信コスト】

条件: 1分ごとにセンサーデータ送信(1,440回/日)

HTTP:
├─ 接続: 1,440回
├─ データサイズ: 200バイト(ヘッダー) + 100バイト(データ) = 300バイト
├─ 合計: 432KB/日
└─ 消費電力: 約100mAh/日


MQTT (QoS 1):
├─ 接続: 1回(Keep Alive: 2バイト × 24回)
├─ データサイズ: 10バイト(ヘッダー) + 100バイト(データ) = 110バイト
├─ 合計: 158KB/日
└─ 消費電力: 約15mAh/日

→ MQTTは約1/3のデータ量、1/7の消費電力!

まとめ

MQTTの本質

┌─────────────────────────────────────────────────────────┐
│  「IoTのために生まれた最適化された通信プロトコル」    │
│                                                         │
│  HTTPがWebのために作られたように、                      │
│  MQTTはIoTのために作られた                             │
└─────────────────────────────────────────────────────────┘

なぜMQTTを使うのか?

【あなたのシステムの要件】
✅ RaspberryPi(バッテリー駆動ではないが省電力が望ましい)
✅ センサーデータの定期送信
✅ AWS からのコマンドをリアルタイム受信
✅ 不安定なWi-Fi環境でも動作
✅ 低コスト

【HTTPでは実現困難】
❌ サーバーからの通知ができない(ポーリング必要)
❌ 頻繁な接続・切断でバッテリー消費
❌ ヘッダーが大きくて無駄
❌ NAT/Firewall越えが困難

【MQTTなら完璧】
✅ 双方向通信(Push通知)
✅ 常時接続で低消費電力
✅ 軽量で高効率
✅ Broker経由でNAT/Firewall越え
✅ QoS 1で配信保証

MQTTが適している場合

✅ こんな場合にMQTTを選ぶべき:

- IoTデバイス通信
- センサーデータ収集
- リアルタイム通知が必要
- 低消費電力が重要
- 不安定なネットワーク
- 双方向通信が必要
- 大量の小さなメッセージ

MQTTが適していない場合

❌ こんな場合は他のプロトコルを検討:

- 大容量ファイル転送(画像・動画)
- 単純なAPI呼び出し(HTTPで十分)
- Brokerを運用したくない
- 既存のHTTPインフラを活用したい

キーポイント

1. Pub/Subモデル
   - Publisher(発行者)
   - Broker(仲介役)
   - Subscriber(購読者)

2. 軽量・高効率
   - ヘッダー2バイト〜
   - HTTPの1/10〜1/50のサイズ

3. QoS(配信品質)
   - QoS 0: 最速(At most once)
   - QoS 1: バランス型(At least once)⭐
   - QoS 2: 最高品質(Exactly once)

4. 双方向通信
   - 常時接続を維持
   - Push通知が可能
   - NAT/Firewall越え可能

5. IoTに最適
   - 低消費電力
   - 不安定なネットワークに強い
   - 自動再接続

参考リンク


付録: 今回のシステムの使用場面

接続確立

# MQTT over TLS 1.2 で接続
client.tls_set(
    ca_certs=ca_path,
    certfile=cert_path,
    keyfile=key_path,
    tls_version=ssl.PROTOCOL_TLSv1_2
)

client.connect(iot_endpoint, port)
# ↑ AWS IoT Coreに接続(1回だけ)

接続維持

client.loop_start()
# ↑ バックグラウンドスレッドで以下を実行:
#   - Keep Alive送信(PING/PONG)
#   - メッセージ受信監視
#   - コールバック実行

購読(Subscribe)

def on_connect(client, userdata, flags, rc):
    client.subscribe("raspi/display")
    client.subscribe("raspi/command")
# ↑ 接続完了後に自動購読

発行(Publish)

client.publish("sensor/data", json.dumps(payload), qos=1)
# ↑ QoS 1 で確実に配信

受信(Receive)

def on_message(client, userdata, msg):
    if msg.topic == "raspi/display":
        handle_display_message(msg)
    elif msg.topic == "raspi/command":
        handle_command_message(msg)
# ↑ メッセージ到着時に自動実行

Discussion