💨

【電子工作】micropythonのMQTT通信忘備録

2024/07/17に公開

先日開催されたテックシーカーハッカソン&コレクションで、MQTTブローカーshiftrを教えてもらったので、試しに使ってみました。
https://cloud.shiftr.io/

ダッシュボードの緑色は趣味が悪い配色と感じますが、ノード間のデータの流れが見やすくIoTが捗りそうです。

使用マイコン&開発環境

  • ラズパイpico W
  • Micropy Version: 4.2.2
  • Micopico Device controller(VSCode)

C/C++で書いている人はmicropythonをdisりがちな印象ですが、趣味の開発でそこまでシビアになることもないので可読性が高く開発もしやすいmicropythonで書けば良いのにと思います。
(ちなみに私はどちらも使います)

shiftrの準備

shiftrを使うにあたって、インスタンス(Namespace)とグループという2つの概念を理解しておく必要があります。
以下、perplexityに聞いた答えです。

インスタンス(Namespace)

  • インスタンス(Namespaceとも呼ばれる)は、shiftr.ioにおける独立したMQTTブローカー環境です。
  • 各インスタンスは、完全に分離された通信空間を提供します。同じトピック名を異なるインスタンスで使用しても、データが混ざることはありません。
  • ユーザーは複数のインスタンスを作成・管理できます。これにより、異なるプロジェクトや用途ごとに独立したMQTT環境を設定できます。
  • インスタンスには固有のホスト名(例:your_namespace.cloud.shiftr.io)が割り当てられ、これがMQTTブローカーのアドレスとなります。

グループ

  • グループの概念は、shiftr.ioの公式ドキュメンテーションでは明確に定義されていません。
  • しかし、一般的なMQTTの文脈では、グループは関連するトピックをまとめる方法として使用されることがあります。
  • 例えば、"home/livingroom/temperature" と "home/bedroom/temperature" というトピックがある場合、"home" をグループとして考えることができます。
  • shiftr.ioでは、このようなグループ化はトピック名の階層構造を通じて実現できます。ただし、これはshiftr.io固有の機能ではなく、MQTT全般の概念です。

プロジェクトの単位としてインスタンスを分け、同じインスタンスの中で用途によってグループをわけると思えば良いでしょう。

今回はsample0716というインスタンスを作成しました

sample0716.cloud.shiftr.ioをクリックすると、ダッシュボード画面に入ることができます。
この「sample0716.cloud.shiftr.io」というのがMQTTブローカーです。あとでラズパイpicoでMQTT_BROKERに入れますので覚えておいてください。

かなり分かりづらいですが、右下の歯車をクリックするとセッティング画面が表示されます。
左側メニューのTokensから、MQTT通信を使ってデータを送るためのトークンを設定することができます。

一度作成したトークンを確認するには、これまた分かりづらいですが、0716testの文字をクリックすると、SecretやPermissionを設定できます

ラズパイPico側

MQTT通信にはこちらのライブラリを使いましょう
https://pypi.org/project/micropython-umqtt.simple/

% micropy install micropython-umqtt.simple
boot.py
import network

SSID = "************"
PASSWORD = "*************"

wlan = network.WLAN(network.STA_IF)

wlan.active(True)

if not wlan.isconnected():
    print('connecting to network...')
    wlan.connect(SSID,PASSWORD)
    while not wlan.isconnected():
        pass
print('network config:', wlan.ifconfig())

main.py
import ujson
from utime import sleep
from umqtt.simple import MQTTClient

MQTT_BROKER = "sample0716.cloud.shiftr.io" # 上記のMQTTブローカー名 
MQTT_PORT = 1883 # 
MQTT_CLIENT_ID = "raspberry_pi_pico" # 好きな名前で設定可能
MQTT_USER = "sample0716" # インスタンス名
MQTT_PASSWORD = "****" # Tokenで作成した時に設定したSecret
MQTT_TOPIC = "sample1_topic" # 好きな名称を設定可能

count = 1

# MQTTメッセージをpublish
def publish_message(client):
    global count
    while True:
        message = {
            'message': 'hello',
            'count': count
        }
        json_message = ujson.dumps(message)
        client.publish(MQTT_TOPIC, json_message)
        print(f"Published: {json_message}")
        count += 1
        sleep(5)  # 5秒ごとにpublish

# メイン処理
def main():
    client = MQTTClient(MQTT_CLIENT_ID, MQTT_BROKER, MQTT_PORT, MQTT_USER, MQTT_PASSWORD)
    client.connect()
    print("MQTT接続成功")

    try:
        publish_message(client)
    except Exception as e:
        print(f"エラー発生: {e}")
    finally:
        client.disconnect()

main()

これを実行すると、shiftr.ioのダッシュボードにリアルタイムに表示されます。

一度実行したら、MQTT_TOPICの名前を変えて実行してみてください。
別の名称でデータを送ることができます。

例えば3台の温湿度計があった場合、それぞれにMQTT_TOPICの名前をつけてやると良いでしょう

メッセージを受け取る(subscribe)

以下のコードでsubscribeできます。

# MQTTメッセージをsubscribe
def subscribe_message(client):
    def on_message(topic, message):
        print(f"Received: {topic} {message}")

    client.set_callback(on_message)
    client.subscribe(MQTT_TOPIC)
    print(f"Subscribed to {MQTT_TOPIC}")

    while True:
        client.wait_msg()

これを実行してshiftr.ioのダッシュボード画面を見てみましょう。

メッセージの流れが可視化されます。便利!

Discussion