🐍

MQTTでブローカーをAWS IoTにブリッジしてみる

2022/01/17に公開

ローカルだけでは勿体ないので、AWS IoTにブリッジしてネットワークを通じて疎通を試みる。

AWS IoTの設定

  • IoT Coreを選択
  • ポリシーを作成
  • モノを作成


  • デバイス証明書を発行(発行した認証類は後ほど使うので必ず保存する)
  • 発行した証明書にポリシーをアタッチ
  • 設定からデバイスデータエンドポイントを取得(これも後ほどソース内で使用する)

RaspberryPiの設定

pythonで扱うためのクライアントをインストール。

$ pip install paho-mqtt

認証類の設置

  • PATH_TO_ROOT = AWS IoTのルートCA(ex: AmazonRootCA1.pem)
  • PATH_TO_CERT = モノの証明書(ex: xxxxx-certificate.pem.crt)
  • PATH_TO_KEY = 秘密鍵(ex: xxxxx-private.pem.key)

Subテスト

sub.py
from paho.mqtt import client as mqtt
import ssl
import json

BROKER = 'xxxxx.ap-northeast-1.amazonaws.com'
PORT = 8883

PATH_TO_ROOT = './crt/root.pem'
PATH_TO_CERT = './crt/certificate.pem.crt'
PATH_TO_KEY = './crt/private.pem.key'
CLIENT_ID = 'rasp-test'
TOPIC = 'test/hello'


def on_init():
  client = mqtt.Client(
    client_id=CLIENT_ID,
    protocol=mqtt.MQTTv311)

  client.tls_set(
    ca_certs=PATH_TO_ROOT,
    certfile=PATH_TO_CERT,
    keyfile=PATH_TO_KEY,
    tls_version = ssl.PROTOCOL_TLSv1_2)
  client.tls_insecure_set(True)

  client.connect(
    BROKER,
    port=PORT,
    keepalive=60)

  return client


def on_connect(client, userdata, flags, rc):
  print(f'Connected `{str(rc)}`')
  sub(client)


def on_disconnect(client, userdata, rc):
  print(f'Disconnect `{str(rc)}`')


def on_publish(client, userdata, mid):
  print(f'Publish `{str(mid)}`')


def on_message(client, userdata, msg):
  dict_msg = json.loads(msg.payload)
  print(f'Sub `{dict_msg["message"]}`')


def sub(client: mqtt):
  client.subscribe(TOPIC)
  client.on_message = on_message


def main():
  client = on_init()

  client.on_connect = on_connect
  client.on_disconnect = on_disconnect

  client.loop_forever()


if __name__ == '__main__':
  main()
  • 起動してMQTT テストクライアントトピックに公開するからテスト送信する
  • ログが表示されれば成功!

Pubテスト

pub.py
from paho.mqtt import client as mqtt
from time import sleep
import ssl
import json

BROKER = 'xxxxx.ap-northeast-1.amazonaws.com'
PORT = 8883

PATH_TO_ROOT = './crt/root.pem'
PATH_TO_CERT = './crt/certificate.pem.crt'
PATH_TO_KEY = './crt/private.pem.key'
CLIENT_ID = 'rasp-test'
TOPIC = 'test/hello'


def on_init():
  client = mqtt.Client(
    client_id=CLIENT_ID,
    protocol=mqtt.MQTTv311)

  client.tls_set(
    ca_certs=PATH_TO_ROOT,
    certfile=PATH_TO_CERT,
    keyfile=PATH_TO_KEY,
    tls_version = ssl.PROTOCOL_TLSv1_2)
  client.tls_insecure_set(True)

  client.connect(
    BROKER,
    port=PORT,
    keepalive=60)

  return client


def on_connect(client, userdata, flags, rc):
  print(f'Connected `{str(rc)}`')
  pub(client)


def on_disconnect(client, userdata, rc):
  print(f'Disconnect `{str(rc)}`')


def on_publish(client, userdata, mid):
  print(f'Publish `{str(mid)}`')


def pub(client: mqtt):
  client.on_publish = on_publish


def pub_send(client: mqtt):
  msg = json.dumps({ 'message': 'Hello, MQTT' })
  client.publish('test/hello', msg)


def main():
  client = on_init()

  client.on_connect = on_connect
  client.on_disconnect = on_disconnect
  
  client.loop_start()
  while True:
    pub_send(client)
    sleep(1)


if __name__ == '__main__':
  main()
  • 起動してMQTT テストクライアントトピックをサブスクライブするtest/helloでフィルターを設定して、確認する
  • ログが表示されれば成功!

注意点

  • ポートは8883を使用する
  • Subの起動client.subscribe(TOPIC)はon_connectイベント後に設置する(地味にハマった)

Discussion