🐍
MQTTでブローカーをAWS IoTにブリッジしてみる
ローカルだけでは勿体ないので、AWS IoTにブリッジしてネットワークを通じて疎通を試みる。
AWS IoTの設定
- IoT Coreを選択
- ポリシーを作成
- モノを作成
- デバイス証明書を発行(発行した認証類は後ほど使うので必ず保存する)
- 発行した証明書にポリシーをアタッチ
- 設定からデバイスデータエンドポイントを取得(これも後ほどソース内で使用する)
RaspberryPiの設定
pythonで扱うためのクライアントをインストール。
$ pip install paho-mqtt
認証類の設置
- PATH_TO_ROOT = AWS IoTのルートCA(ex: AmazonRootCA1.pem)
- PATH_TO_CERT = モノの証明書(ex: xxxxx-certificate.pem.crt)
- PATH_TO_KEY = 秘密鍵(ex: xxxxx-private.pem.key)
Subテスト
sub.py
from paho.mqtt import client as mqtt
import ssl
import json
BROKER = 'xxxxx.ap-northeast-1.amazonaws.com'
PORT = 8883
PATH_TO_ROOT = './crt/root.pem'
PATH_TO_CERT = './crt/certificate.pem.crt'
PATH_TO_KEY = './crt/private.pem.key'
CLIENT_ID = 'rasp-test'
TOPIC = 'test/hello'
def on_init():
client = mqtt.Client(
client_id=CLIENT_ID,
protocol=mqtt.MQTTv311)
client.tls_set(
ca_certs=PATH_TO_ROOT,
certfile=PATH_TO_CERT,
keyfile=PATH_TO_KEY,
tls_version = ssl.PROTOCOL_TLSv1_2)
client.tls_insecure_set(True)
client.connect(
BROKER,
port=PORT,
keepalive=60)
return client
def on_connect(client, userdata, flags, rc):
print(f'Connected `{str(rc)}`')
sub(client)
def on_disconnect(client, userdata, rc):
print(f'Disconnect `{str(rc)}`')
def on_publish(client, userdata, mid):
print(f'Publish `{str(mid)}`')
def on_message(client, userdata, msg):
dict_msg = json.loads(msg.payload)
print(f'Sub `{dict_msg["message"]}`')
def sub(client: mqtt):
client.subscribe(TOPIC)
client.on_message = on_message
def main():
client = on_init()
client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.loop_forever()
if __name__ == '__main__':
main()
- 起動して
MQTT テストクライアント
のトピックに公開する
からテスト送信する
- ログが表示されれば成功!
Pubテスト
pub.py
from paho.mqtt import client as mqtt
from time import sleep
import ssl
import json
BROKER = 'xxxxx.ap-northeast-1.amazonaws.com'
PORT = 8883
PATH_TO_ROOT = './crt/root.pem'
PATH_TO_CERT = './crt/certificate.pem.crt'
PATH_TO_KEY = './crt/private.pem.key'
CLIENT_ID = 'rasp-test'
TOPIC = 'test/hello'
def on_init():
client = mqtt.Client(
client_id=CLIENT_ID,
protocol=mqtt.MQTTv311)
client.tls_set(
ca_certs=PATH_TO_ROOT,
certfile=PATH_TO_CERT,
keyfile=PATH_TO_KEY,
tls_version = ssl.PROTOCOL_TLSv1_2)
client.tls_insecure_set(True)
client.connect(
BROKER,
port=PORT,
keepalive=60)
return client
def on_connect(client, userdata, flags, rc):
print(f'Connected `{str(rc)}`')
pub(client)
def on_disconnect(client, userdata, rc):
print(f'Disconnect `{str(rc)}`')
def on_publish(client, userdata, mid):
print(f'Publish `{str(mid)}`')
def pub(client: mqtt):
client.on_publish = on_publish
def pub_send(client: mqtt):
msg = json.dumps({ 'message': 'Hello, MQTT' })
client.publish('test/hello', msg)
def main():
client = on_init()
client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.loop_start()
while True:
pub_send(client)
sleep(1)
if __name__ == '__main__':
main()
- 起動して
MQTT テストクライアント
のトピックをサブスクライブする
をtest/hello
でフィルターを設定して、確認する - ログが表示されれば成功!
注意点
- ポートは8883を使用する
- Subの起動
client.subscribe(TOPIC)
はon_connectイベント後に設置する(地味にハマった)
Discussion