📝
MQTTを用いたシステム開発(開発日誌)
📘 目次
- MQTTとは
- 従来の通信(HTTP)との違い
- MQTTの仕組み
- Pub/Subモデルの詳細
- QoS(Quality of Service)
- メリット・デメリット
- あなたのシステムでの使用例
- 他のプロトコルとの比較
- まとめ
MQTTとは
基本概念
MQTT (Message Queuing Telemetry Transport) は、軽量なメッセージング通信プロトコルです。
┌──────────────────────────────────────────────────────────┐
│ MQTT = IoT専用の軽量通信プロトコル │
│ │
│ 特徴: │
│ - 軽量(ヘッダー2バイト〜) │
│ - 低帯域・不安定なネットワークに最適 │
│ - 双方向通信が可能 │
│ - Publish/Subscribe(発行/購読)モデル │
└──────────────────────────────────────────────────────────┘
誕生の背景
1999年 IBM と Arcom が開発
【開発の動機】
- 石油パイプラインの監視システム用
- 衛星通信(低帯域・高遅延)に最適化
- バッテリー駆動デバイスで長持ちさせたい
【結果】
→ IoTデバイスに最適なプロトコルが完成
従来の通信(HTTP)との違い
HTTPとMQTTの根本的な違い
┌─────────────────────────────────────────────────────────────┐
│ 通信モデルの違い │
└─────────────────────────────────────────────────────────────┘
【HTTP: リクエスト/レスポンス モデル】
クライアント サーバー
│ │
│─── リクエスト送信 ────────────────→│
│ │
│ (待機中...) │
│ │
│←── レスポンス返信 ─────────────────│
│ │
└─ 接続終了 └─ 処理完了
【MQTT: Publish/Subscribe モデル】
Publisher Broker Subscriber
│ │ │
│── Publish ─────→│ │
│ │─── Publish ────→│
│ │ │
│ │←── Subscribe ───│
│←── Publish ─────│ │
│ │ │
└─ 常時接続 └─ 中継役 └─ 常時接続
詳細比較表
| 項目 | HTTP | MQTT |
|---|---|---|
| 通信モデル | リクエスト/レスポンス | Publish/Subscribe |
| 接続方式 | 都度接続(短命) | 常時接続(長命) |
| 通信方向 | ❌ 単方向(Client→Server) | ✅ 双方向 |
| ヘッダーサイズ | 200-500バイト | 2バイト〜 |
| オーバーヘッド | 大きい | 極小 |
| サーバーからの通知 | ❌ 不可能(ポーリング必要) | ✅ 可能(Push通知) |
| バッテリー消費 | 高い | 低い |
| ネットワーク帯域 | 必要 | 最小限 |
| 用途 | Web、REST API | IoT、リアルタイム通信 |
MQTTの仕組み
3つの主要コンポーネント
┌───────────────┐
│ Publisher │ ← メッセージを発行する側(RaspberryPi等)
│ (発行者) │
└───────────────┘
│
│ ① Publish(トピックにメッセージ送信)
↓
┌───────────────┐
│ MQTT Broker │ ← メッセージを中継する仲介役(AWS IoT Core等)
│ (ブローカー)│
└───────────────┘
│
│ ② Deliver(購読者に配信)
↓
┌───────────────┐
│ Subscriber │ ← メッセージを購読する側(Lambda等)
│ (購読者) │
└───────────────┘
トピック(Topic)の概念
MQTTではトピックを使ってメッセージをルーティングします。
トピックの階層構造(スラッシュ区切り)
home/livingroom/temperature
home/livingroom/humidity
home/bedroom/temperature
home/bedroom/humidity
あなたのシステムの例:
sensor/data ← センサーデータ
raspi/display ← 表示メッセージ
raspi/command ← コマンド
ワイルドカード
+ : 単一レベルのワイルドカード
# : 複数レベルのワイルドカード
例:
home/+/temperature → home/livingroom/temperature
→ home/bedroom/temperature
home/# → home/livingroom/temperature
→ home/bedroom/humidity
→ home/kitchen/light/status
Pub/Subモデルの詳細
従来のHTTP(リクエスト/レスポンス)
# ❌ HTTPの場合: サーバーから能動的に送信できない
# RaspberryPi側
while True:
# センサーデータ送信
requests.post("https://server.com/sensor", data=sensor_data)
# サーバーからのコマンドをチェック(ポーリング)
response = requests.get("https://server.com/commands")
# ↑ コマンドがなくても毎回通信が必要(無駄)
time.sleep(10) # 10秒ごとにポーリング
# サーバー側
# RaspberryPiにrebootコマンドを送りたい...
# でもRaspberryPiがリクエストしてこないと送れない! ❌
HTTPの問題点
【問題1: ポーリングの無駄】
┌─────────────────────────────────────┐
│ 時刻 通信 結果 │
├─────────────────────────────────────┤
│ 10:00 GET コマンドなし(無駄) │
│ 10:10 GET コマンドなし(無駄) │
│ 10:20 GET コマンドなし(無駄) │
│ 10:30 GET rebootコマンド発見! │
└─────────────────────────────────────┘
→ 30分間で4回通信、実際に必要なのは1回だけ
【問題2: リアルタイム性の欠如】
サーバーがコマンド発行(10:25)
↓
RaspberryPiが次回ポーリング(10:30)
↓
5分の遅延が発生!
【問題3: NAT/Firewallの問題】
インターネット
↓
ルーター(NAT)
↓
RaspberryPi
サーバー → RaspberryPi の接続は不可能!
(外部からの直接接続がブロックされる)
MQTT(Publish/Subscribe)
# ✅ MQTTの場合: 双方向通信が可能
# RaspberryPi側
client = mqtt.Client()
client.connect("mqtt.broker.com", 8883)
# ① 購読(Subscribe)- 受信したいトピックを登録
client.subscribe("raspi/command")
# ② コールバック設定 - メッセージが来たら自動実行
def on_message(client, userdata, msg):
if msg.topic == "raspi/command":
command = json.loads(msg.payload)
if command["action"] == "reboot":
subprocess.run("sudo reboot", shell=True)
client.on_message = on_message
# ③ ループ開始 - バックグラウンドで受信監視
client.loop_start()
# ④ センサーデータを送信(Publisher)
while True:
sensor_data = get_sensor_data()
client.publish("sensor/data", json.dumps(sensor_data))
time.sleep(60) # 60秒ごと
# サーバー側(AWS Lambda等)
# RaspberryPiにrebootコマンドを送る
iot_client.publish(
topic='raspi/command',
payload=json.dumps({"action": "reboot"})
)
# → 即座にRaspberryPiに届く! ✅
MQTTの利点
【利点1: 無駄な通信が不要】
┌─────────────────────────────────────┐
│ 常時接続維持(Keep Alive) │
│ メッセージがある時だけ通信 │
│ │
│ 10:00-10:25 通信なし │
│ 10:25 rebootコマンド受信! │
└─────────────────────────────────────┘
→ 必要な通信のみ(効率的)
【利点2: リアルタイム】
サーバーがコマンド発行(10:25:00)
↓
RaspberryPiが即座に受信(10:25:00.1)
↓
遅延: 0.1秒!(ほぼリアルタイム)
【利点3: NAT/Firewall越え】
RaspberryPi → Broker へ接続(1回だけ)
↓
この接続を双方向で使用
↓
Broker → RaspberryPi へ送信可能!
(外部からの直接接続は不要)
QoS(Quality of Service)
MQTTには3つのQoSレベルがあります。
QoS 0: At most once(最大1回)
Publisher → Broker → Subscriber
│ │ │
│ 送信 │ 配信 │
└────────→└────────→
確認応答なし(Fire and Forget)
【特徴】
✅ 最速
✅ 最も軽量
❌ メッセージが届かない可能性あり
❌ 重複の可能性あり
【用途】
- センサーデータ(次のデータで上書きされる)
- リアルタイム性が重要
QoS 1: At least once(少なくとも1回)⭐
Publisher → Broker → Subscriber
│ │ │
│ PUBLISH│ │
├────────→│ PUBLISH│
│ ├────────→│
│ │ PUBACK │
│ │←────────┤
│ PUBACK │ │
│←────────┤ │
確認応答あり
【特徴】
✅ 配信保証あり
✅ パフォーマンスも良好
⚠️ 重複の可能性あり(稀)
✅ あなたのシステムで使用中
【用途】
- 重要なセンサーデータ
- コマンド送信
- 一般的なIoT通信
QoS 2: Exactly once(正確に1回)
Publisher → Broker → Subscriber
│ │ │
│ PUBLISH │ │
├────────→│ PUBLISH │
│ ├────────→│
│ PUBREC │ │
│←────────┤ PUBREC │
│ │←────────┤
│ PUBREL │ │
├────────→│ PUBREL │
│ ├────────→│
│ PUBCOMP │ │
│←────────┤ PUBCOMP │
│ │←────────┤
4ステップハンドシェイク
【特徴】
✅ 完全な配信保証
✅ 重複なし
❌ 最も遅い
❌ オーバーヘッド大
【用途】
- 金融取引
- 医療データ
- 絶対に重複してはいけないデータ
今回のシステムでの使用例
# QoS 1 を使用
client.publish("sensor/data", json.dumps(payload), qos=1)
【理由】
✅ センサーデータは確実に届いてほしい
✅ パフォーマンスも重要
⚠️ 稀な重複は許容範囲(DynamoDBで上書きされる)
メリット・デメリット
✅ メリット
1. 軽量
【ヘッダーサイズ比較】
HTTP/1.1:
GET / HTTP/1.1
Host: example.com
User-Agent: Mozilla/5.0...
Accept: text/html...
Cookie: session=abc123...
...
合計: 200-500バイト
MQTT:
固定ヘッダー: 2バイト
可変ヘッダー: トピック名の長さ
合計: 最小2バイト、通常10-20バイト
→ HTTPの1/10〜1/50のサイズ!
2. 低消費電力
【バッテリー駆動デバイスでの比較】
HTTP(ポーリング方式):
- 10秒ごとに接続・切断
- 1日あたり: 8,640回の接続
- バッテリー寿命: 約1週間
MQTT(常時接続):
- 1回接続してKeep Alive
- Keep Alive: 60秒に1回のPING(2バイト)
- バッテリー寿命: 約2ヶ月〜半年
→ 約8〜24倍長持ち!
3. 双方向通信
HTTP:
Client → Server ✅
Server → Client ❌(ポーリング必要)
MQTT:
Publisher ⇄ Broker ⇄ Subscriber ✅
完全な双方向通信
4. 不安定なネットワークに強い
【再接続の仕組み】
接続が切れた場合:
1. 自動再接続
2. QoS 1/2のメッセージは再送
3. Subscriberは再購読
4. セッションの復元(Clean Session=false)
→ 一時的な切断でもデータロスなし
❌ デメリット
1. Broker(仲介役)が必要
HTTP:
Client ←→ Server(直接通信)
MQTT:
Publisher ←→ Broker ←→ Subscriber
↑
必須コンポーネント
→ Brokerの運用・管理が必要
(ただしAWS IoT Coreなら完全マネージド)
2. 大量データには不向き
MQTT:
- 小さなメッセージ(センサーデータ等)に最適
- 大きなファイル(画像・動画)には不向き
推奨メッセージサイズ: 1KB以下
最大メッセージサイズ: 256MB(実用的には1MB以下)
3. HTTPより複雑
HTTP: 誰でも知っている、ツールも豊富
MQTT: 専用ライブラリ・知識が必要
今回のシステムでの使用例
システム構成図
┌─────────────────────────────────────────────────────────────┐
│ RaspberryPi(Publisher & Subscriber) │
│ │
│ [Publisher機能] │
│ ├─ トピック: sensor/data │
│ ├─ ペイロード: {temperature, humidity, pressure} │
│ ├─ QoS: 1 │
│ └─ 頻度: 60秒ごと │
│ │
│ [Subscriber機能] │
│ ├─ トピック: raspi/display │
│ ├─ トピック: raspi/command │
│ └─ QoS: 1 │
└─────────────────────────────────────────────────────────────┘
│
│ MQTT over TLS 1.2
│ (暗号化・証明書認証)
↓
┌─────────────────────────────────────────────────────────────┐
│ AWS IoT Core(MQTT Broker) │
│ │
│ [役割] │
│ ├─ メッセージの中継 │
│ ├─ トピックのルーティング │
│ ├─ 証明書認証 │
│ ├─ デバイス管理 │
│ └─ IoT Rulesでの処理 │
└─────────────────────────────────────────────────────────────┘
│
┌────────────────┼────────────────┐
↓ ↓ ↓
[Lambda] [DynamoDB] [S3]
実際の通信フロー
シナリオ1: センサーデータ送信(RaspberryPi → AWS)
時刻: 10:00:00
[RaspberryPi]
├─ 温度取得: 25.5℃
├─ 湿度取得: 60.2%
└─ 気圧取得: 1013.25 hPa
↓ MQTT Publish (QoS 1)
client.publish(
topic="sensor/data",
payload={
"deviceId": "RaspiDevice01",
"temperature": 25.5,
"humidity": 60.2,
"pressure": 1013.25,
"timestamp": "2025-10-16 10:00:00"
},
qos=1
)
↓ 通信サイズ: 約150バイト
[AWS IoT Core]
├─ メッセージ受信
├─ PUBACK送信(確認応答)
├─ IoT Rules評価
└─ Lambda関数起動
↓
[Lambda]
├─ DynamoDB保存
├─ S3 JSON保存
└─ S3 CSV保存
【所要時間】
- RaspberryPi → AWS IoT Core: 50-100ms
- Lambda実行: 200-500ms
- 合計: 約300-600ms
シナリオ2: S3ファイル表示(AWS → RaspberryPi)
[AWS Lambda]
├─ S3からファイル取得
└─ MQTT Publish
iot_client.publish(
topic='raspi/display',
qos=1,
payload={
"source": "S3_Lambda",
"filename": "message.txt",
"message": "Hello from S3!"
}
)
↓ 即座に配信(ポーリング不要)
[AWS IoT Core]
└─ 購読者に配信
↓ 0.1秒以内
[RaspberryPi]
├─ on_message() コールバック実行
└─ Sense HAT LEDに表示
【所要時間】
- AWS → RaspberryPi: 100-200ms
- リアルタイムに近い!
シナリオ3: リモート再起動(AWS → RaspberryPi)
[AWS IoT Console または Lambda]
└─ コマンド送信
iot_client.publish(
topic='raspi/command',
qos=1,
payload={"command": "reboot"}
)
↓ Push通知(ポーリング不要)
[AWS IoT Core]
└─ 購読者に配信
↓ 即座
[RaspberryPi]
├─ on_message() 実行
├─ コマンド検証
├─ LED表示: "REBOOT"
└─ システム再起動
【HTTPとの比較】
HTTP: コマンド発行 → 最大10秒遅延(ポーリング間隔)
MQTT: コマンド発行 → 0.1秒で到達
Keep Alive(接続維持)の仕組み
あなたのシステムでの動作:
[時刻] [RaspberryPi] [AWS IoT Core]
10:00 ├─ CONNECT ────────────→ CONNACK
│
10:01 │ (センサーデータ送信)
│
10:02 │ (何もなし)
│
10:03 │ (何もなし)
│
...
10:10 ├─ PINGREQ ────────────→ PINGRESP
│ (Keep Alive)
│
10:20 ├─ PINGREQ ────────────→ PINGRESP
│
│ (接続維持中...)
│
│ ← メッセージ待機中 ←
│ いつでも受信可能
│
12:00 │ ← raspi/command ←─── Lambda
└─ reboot実行!
【ポイント】
- 常時接続を1本維持
- Keep Aliveで生存確認(通常60-120秒ごと)
- この接続を使って双方向通信
他のプロトコルとの比較
MQTT vs HTTP vs WebSocket vs CoAP
| 項目 | MQTT | HTTP | WebSocket | CoAP |
|---|---|---|---|---|
| 通信モデル | Pub/Sub | Req/Res | 双方向 | Req/Res |
| 接続 | 常時接続 | 都度接続 | 常時接続 | UDP(軽量) |
| ヘッダー | 2バイト〜 | 200-500バイト | 数十バイト | 4バイト |
| QoS | ✅ 3レベル | ❌ なし | ❌ なし | ✅ 2レベル |
| 双方向 | ✅ 可能 | ❌ 不可 | ✅ 可能 | ❌ 不可 |
| 消費電力 | ⭐⭐⭐⭐⭐ | ⭐ | ⭐⭐⭐ | ⭐⭐⭐⭐ |
| 用途 | IoT | Web | リアルタイムWeb | 超低電力IoT |
具体的な数値比較
【1日のデータ送信コスト】
条件: 1分ごとにセンサーデータ送信(1,440回/日)
HTTP:
├─ 接続: 1,440回
├─ データサイズ: 200バイト(ヘッダー) + 100バイト(データ) = 300バイト
├─ 合計: 432KB/日
└─ 消費電力: 約100mAh/日
MQTT (QoS 1):
├─ 接続: 1回(Keep Alive: 2バイト × 24回)
├─ データサイズ: 10バイト(ヘッダー) + 100バイト(データ) = 110バイト
├─ 合計: 158KB/日
└─ 消費電力: 約15mAh/日
→ MQTTは約1/3のデータ量、1/7の消費電力!
まとめ
MQTTの本質
┌─────────────────────────────────────────────────────────┐
│ 「IoTのために生まれた最適化された通信プロトコル」 │
│ │
│ HTTPがWebのために作られたように、 │
│ MQTTはIoTのために作られた │
└─────────────────────────────────────────────────────────┘
なぜMQTTを使うのか?
【あなたのシステムの要件】
✅ RaspberryPi(バッテリー駆動ではないが省電力が望ましい)
✅ センサーデータの定期送信
✅ AWS からのコマンドをリアルタイム受信
✅ 不安定なWi-Fi環境でも動作
✅ 低コスト
【HTTPでは実現困難】
❌ サーバーからの通知ができない(ポーリング必要)
❌ 頻繁な接続・切断でバッテリー消費
❌ ヘッダーが大きくて無駄
❌ NAT/Firewall越えが困難
【MQTTなら完璧】
✅ 双方向通信(Push通知)
✅ 常時接続で低消費電力
✅ 軽量で高効率
✅ Broker経由でNAT/Firewall越え
✅ QoS 1で配信保証
MQTTが適している場合
✅ こんな場合にMQTTを選ぶべき:
- IoTデバイス通信
- センサーデータ収集
- リアルタイム通知が必要
- 低消費電力が重要
- 不安定なネットワーク
- 双方向通信が必要
- 大量の小さなメッセージ
MQTTが適していない場合
❌ こんな場合は他のプロトコルを検討:
- 大容量ファイル転送(画像・動画)
- 単純なAPI呼び出し(HTTPで十分)
- Brokerを運用したくない
- 既存のHTTPインフラを活用したい
キーポイント
1. Pub/Subモデル
- Publisher(発行者)
- Broker(仲介役)
- Subscriber(購読者)
2. 軽量・高効率
- ヘッダー2バイト〜
- HTTPの1/10〜1/50のサイズ
3. QoS(配信品質)
- QoS 0: 最速(At most once)
- QoS 1: バランス型(At least once)⭐
- QoS 2: 最高品質(Exactly once)
4. 双方向通信
- 常時接続を維持
- Push通知が可能
- NAT/Firewall越え可能
5. IoTに最適
- 低消費電力
- 不安定なネットワークに強い
- 自動再接続
参考リンク
付録: 今回のシステムの使用場面
接続確立
# MQTT over TLS 1.2 で接続
client.tls_set(
ca_certs=ca_path,
certfile=cert_path,
keyfile=key_path,
tls_version=ssl.PROTOCOL_TLSv1_2
)
client.connect(iot_endpoint, port)
# ↑ AWS IoT Coreに接続(1回だけ)
接続維持
client.loop_start()
# ↑ バックグラウンドスレッドで以下を実行:
# - Keep Alive送信(PING/PONG)
# - メッセージ受信監視
# - コールバック実行
購読(Subscribe)
def on_connect(client, userdata, flags, rc):
client.subscribe("raspi/display")
client.subscribe("raspi/command")
# ↑ 接続完了後に自動購読
発行(Publish)
client.publish("sensor/data", json.dumps(payload), qos=1)
# ↑ QoS 1 で確実に配信
受信(Receive)
def on_message(client, userdata, msg):
if msg.topic == "raspi/display":
handle_display_message(msg)
elif msg.topic == "raspi/command":
handle_command_message(msg)
# ↑ メッセージ到着時に自動実行
Discussion