Closed21

EMQXを使ったPub/Subについて

kenichiro90kenichiro90

無料枠があるのは、Serverlessタイプだけ。
リージョンは、North America, Europe, Asia-Pacificの中から選ぶことができる。

消費限度額は、設定できるとのこと。

限度額を超えた場合、以下の2通りから選ぶことができる。

  • デプロイ停止
  • 注意を促して課金する
kenichiro90kenichiro90

支払い方法を登録して、お試しで1個作ってみた。
デプロイには、2分ほどかかるみたい。

管理画面を開くと、チュートリアルが出てくるので親切。
画面構成は、スッキリしていて見やすい。

とりあえず、デプロイしたブローカーへ接続してみる。

kenichiro90kenichiro90

EMQX Platform クイックスタートを参考に、ローカルマシンから接続してみる。

接続方法(?)は、以下の通り。

  • Client Tools
    • MQTTX
    • MQTT.fx
    • Arduino ESP8266
  • Terminal Development Language Demo
    • Python
    • Golang
    • Java
    • Node.js
    • C
    • C#
    • Vue.js
    • React
    • Electron
  • Third-party SDK Recommendation
    • Python
    • Golang
    • Java
    • JavaScript
    • Node.js
    • C
    • C#
    • PHP
    • iOS
    • Android
kenichiro90kenichiro90

無事、インポートできたので、Pub/Subするコードを書いていく。

接続方法には、TCPを使う方法と、SSL/TLSを使う方法がある。
今回は、SSL/TLSを使ってみる。

接続先の設定は、以下のようにすれば良いみたい。

broker = (接続情報のところに書いてある接続先)
port = 8883
topic = 'python/mqtt'
client_id = f'python-mqtt-{random.randint(0, 1000)}'
username = ("アクセス制御 -> 認証 -> 追加"で追加したユーザー)
password = ("アクセス制御 -> 認証 -> 追加"で追加したユーザーのパスワード)
kenichiro90kenichiro90

Pubするコードを書いて実行してみたが、下記エラーが出てしまう。

client = mqtt_client.Client(client_id)
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/kenichiro/.pyenv/versions/3.11.7/lib/python3.11/site-packages/paho/mqtt/client.py", line 772, in __init__
raise ValueError(ValueError: Unsupported callback API version: version 2.0 added a callback_api_version, see docs/migrations.rst for details

paho-mqttのAPI仕様が変わっているみたい…

kenichiro90kenichiro90

書き直したコードは、以下の通り。


test_emqx_pub.py

import random
import time
from paho.mqtt import client as mqtt_client

broker = (接続情報のところに書いてある接続先)
port = 8883
topic = 'python/mqtt'
client_id = f'python-mqtt-{random.randint(0, 1000)}'
username = ("アクセス制御 -> 認証 -> 追加"で追加したユーザー)
password = ("アクセス制御 -> 認証 -> 追加"で追加したユーザーのパスワード)

def connect_mqtt():
    def on_connect(client, userdata, flags, reason_code, properties):
        if reason_code == 0:
            print("Connected to MQTT Broker!")
        else:
            print("Failed to connect, return code %d\n", reason_code)
    # Set Connecting Client ID
    client = mqtt_client.Client(mqtt_client.CallbackAPIVersion.VERSION2)
    # Set CA certificate
    client.tls_set(ca_certs='./emqxsl-ca.crt')
    client.username_pw_set(username, password)
    client.on_connect = on_connect
    client.connect(broker, port)
    return client

def publish(client):
    msg_count = 0
    while True:
        time.sleep(1)
        msg = f"messages: {msg_count}"
        result = client.publish(topic, msg)
        # result: [0, 1]
        status = result[0]
        if status == 0:
           print(f"Send `{msg}` to topic `{topic}`")
        else:
           print(f"Failed to send message to topic {topic}")
        msg_count += 1

def run():
    client = connect_mqtt()
    client.loop_start()
    publish(client)


if __name__ == '__main__':
    run()

test_emqx_sub.py

import random
from paho.mqtt import client as mqtt_client

broker = (接続情報のところに書いてある接続先)
port = 8883
topic = 'python/mqtt'
client_id = f'python-mqtt-{random.randint(0, 1000)}'
username = ("アクセス制御 -> 認証 -> 追加"で追加したユーザー)
password = ("アクセス制御 -> 認証 -> 追加"で追加したユーザーのパスワード)

def connect_mqtt():
    def on_connect(client, userdata, flags, reason_code, properties):
        if reason_code == 0:
            print("Connected to MQTT Broker!")
        else:
            print("Failed to connect, return code %d\n", reason_code)
    # Set Connecting Client ID
    client = mqtt_client.Client(mqtt_client.CallbackAPIVersion.VERSION2)
    # Set CA certificate
    client.tls_set(ca_certs='./emqxsl-ca.crt')
    client.username_pw_set(username, password)
    client.on_connect = on_connect
    client.connect(broker, port)
    return client

def subscribe(client: mqtt_client):
    def on_message(client, userdata, msg):
        print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
    client.subscribe(topic)
    client.on_message = on_message

def run():
    client = connect_mqtt()
    subscribe(client)
    client.loop_forever()


if __name__ == '__main__':
    run()
kenichiro90kenichiro90

作ったコードを別々のターミナルから実行してみる。
Pub/Subできているので、これで良いみたい。


1個目のターミナル (Pub)

❯ python test_emqx_pub.py
Connected to MQTT Broker!
Send `messages: 0` to topic `python/mqtt`
Send `messages: 1` to topic `python/mqtt`
Send `messages: 2` to topic `python/mqtt`
Send `messages: 3` to topic `python/mqtt`
Send `messages: 4` to topic `python/mqtt`
Send `messages: 5` to topic `python/mqtt`
Send `messages: 6` to topic `python/mqtt`
Send `messages: 7` to topic `python/mqtt`
Send `messages: 8` to topic `python/mqtt`
Send `messages: 9` to topic `python/mqtt`
Send `messages: 10` to topic `python/mqtt`
Send `messages: 11` to topic `python/mqtt`
Send `messages: 12` to topic `python/mqtt`
Send `messages: 13` to topic `python/mqtt`
Send `messages: 14` to topic `python/mqtt`
Send `messages: 15` to topic `python/mqtt`

2個目のターミナル (Sub)

❯ python test_emqx_sub.py
Connected to MQTT Broker!
Received `messages: 5` from `python/mqtt` topic
Received `messages: 6` from `python/mqtt` topic
Received `messages: 7` from `python/mqtt` topic
Received `messages: 8` from `python/mqtt` topic
Received `messages: 9` from `python/mqtt` topic
Received `messages: 10` from `python/mqtt` topic
Received `messages: 11` from `python/mqtt` topic
Received `messages: 12` from `python/mqtt` topic
Received `messages: 13` from `python/mqtt` topic
Received `messages: 14` from `python/mqtt` topic
Received `messages: 15` from `python/mqtt` topic
kenichiro90kenichiro90

結構簡単にPub/Subすることができたので、マイコンからPub/Subできるか試してみるつもり。

kenichiro90kenichiro90

M5CoreS3を使って試してみたところ、少しだけハマったけどPub/Subすることができた。

試してみた限りでは、TCPは使えないらしい。
SSL/TLSが使えるマイコンは意外と少ないので、結構厳しい…

後程、やったことをメモする予定。

kenichiro90kenichiro90

開発環境は、Arduino IDE v2.2.1を使用した。
インストールしたボードと、ライブラリは以下の通り。

  • ボード
    • M5Stack 2.1.1
  • ライブラリ
kenichiro90kenichiro90

"Connect over TLS/SSL Port"に書いてあるコードは、ESP8266用のものなので、M5CoreS3で動くように色々と変更した。変更箇所は、以下の通り。

  • WiFi, MQTTブローカーの設定
    • .inoファイルに直接書いても良いが、別ファイル(arduino_secrets.h)で管理するようにした
// arduino_secrets.h

// WiFi Settings
#define SECRET_SSID "(WiFiルーターのSSID)"
#define SECRET_PASS "(WiFiルーターの暗号化キー)"

// MQTT Broker Settings
#define MQTT_BROKER "(接続情報のところに書いてある接続先)"
#define MQTT_USER_NAME "("アクセス制御 -> 認証 -> 追加"で追加したユーザー)"
#define MQTT_USER_PASSWORD "("アクセス制御 -> 認証 -> 追加"で追加したユーザーのパスワード)"
  • BearSSLの部分
    • M5CoreS3ではBearSSLは使えないので、WiFiClientSecureに変更する
    • CA証明書をセットするところ、以下のように変更する
// 変更前
BearSSL::X509List serverTrustedCA(ca_cert);

// 変更後
espClient.setCACert(ca_cert);
  • connectToMQTT()のMACアドレスを取得する部分
    • Serial.printfのところは、コンパイルエラーが発生してしまうので、snprintfを使うようにした
// 変更前
String client_id = "esp8266-client-" + String(WiFi.macAddress());
Serial.printf("Connecting to MQTT Broker as %s.....\n", client_id.c_str());

// 変更後
char buf[64];
String client_id = "M5CoreS3-client-" + String(WiFi.macAddress());
snprintf(buf, 64, "Connecting to MQTT Broker as %s.....\n", client_id.c_str());
Serial.write(buf);
kenichiro90kenichiro90

修正後のソースコード全文は、以下の通り。


test_emqx.ino

#include <M5CoreS3.h>
#include <WiFi.h>
#include <WiFiClientSecure.h>
#include <PubSubClient.h>
#include <time.h>
#include "arduino_secrets.h"
 
// WiFi settings
char ssid[] = SECRET_SSID;    // your network SSID (name)
char pass[] = SECRET_PASS;    // your network password (use for WPA, or use as key for WEP)
int status = WL_IDLE_STATUS;  // the Wifi radio's status

// MQTT Broker settings
const int mqtt_port = 8883;                       // MQTT port (TLS)
const char *mqtt_broker = MQTT_BROKER;            // EMQX broker endpoint
const char *mqtt_topic = "emqx/m5cores3";         // MQTT topic
const char *mqtt_username = MQTT_USER_NAME;       // MQTT username for authentication
const char *mqtt_password = MQTT_USER_PASSWORD;   // MQTT password for authentication

// NTP Server settings
const char *ntp_server = "pool.ntp.org";  // Default NTP server
// const char* ntp_server = "cn.pool.ntp.org"; // Recommended NTP server for users in China
const long gmt_offset_sec = 0;      // GMT offset in seconds (adjust for your time zone)
const int daylight_offset_sec = 0;  // Daylight saving time offset in seconds

// WiFi and MQTT client initialization
WiFiClientSecure espClient;
PubSubClient mqtt_client(espClient);

// SSL certificate for MQTT broker
// Load DigiCert Global Root G2, which is used by EMQX Public Broker: broker.emqx.io
static const char ca_cert[] PROGMEM = R"EOF(
-----BEGIN CERTIFICATE-----

(CA証明書の内容を貼り付ける)

-----END CERTIFICATE-----
)EOF";

void connectToWiFi() {
  WiFi.begin(ssid, pass);
  while (WiFi.status() != WL_CONNECTED) {
    delay(1000);
    Serial.println("Connecting to WiFi...");
  }
  Serial.println("Connected to WiFi");
}

void syncTime() {
  configTime(gmt_offset_sec, daylight_offset_sec, ntp_server);
  Serial.print("Waiting for NTP time sync: ");
  while (time(nullptr) < 8 * 3600 * 2) {
    delay(1000);
    Serial.print(".");
  }
  Serial.println("Time synchronized");
  struct tm timeinfo;
  if (getLocalTime(&timeinfo)) {
    Serial.print("Current time: ");
    Serial.println(asctime(&timeinfo));
  } else {
    Serial.println("Failed to obtain local time");
  }
}

void mqttCallback(char *topic, byte *payload, unsigned int length) {
  Serial.print("Message received on topic: ");
  Serial.print(topic);
  Serial.print("]: ");
  for (int i = 0; i < length; i++) {
    Serial.print((char)payload[i]);
  }
  Serial.println();
}

void connectToMQTT() {
  espClient.setCACert(ca_cert);
  char buf[64];
  while (!mqtt_client.connected()) {
    String client_id = "M5CoreS3-client-" + String(WiFi.macAddress());
    snprintf(buf, 64, "Connecting to MQTT Broker as %s.....\n", client_id.c_str());
    Serial.write(buf);
    if (mqtt_client.connect(client_id.c_str(), mqtt_username, mqtt_password)) {
      Serial.println("Connected to MQTT broker");
      mqtt_client.subscribe(mqtt_topic);
      // Publish message upon successful connection
      mqtt_client.publish(mqtt_topic, "Hi EMQX I'm M5CoreS3 ^^");
    } else {
      Serial.print("Failed to connect to MQTT broker, rc=");
      Serial.println(mqtt_client.state());
      delay(5000);
    }
  }
}

void setup() {
  M5.begin();
  M5.Power.begin();
  Serial.begin(115200);
  while (!Serial);
  connectToWiFi();
  syncTime();  // X.509 validation requires synchronization time
  mqtt_client.setServer(mqtt_broker, mqtt_port);
  mqtt_client.setCallback(mqttCallback);
  connectToMQTT();
}

void loop() {
  if (!mqtt_client.connected()) {
    connectToMQTT();
  }
  mqtt_client.loop();
}

arduino_secrets.h

// WiFi settings
#define SECRET_SSID "(WiFiルーターのSSID)"
#define SECRET_PASS "(WiFiルーターの暗号化キー)"

// MQTT Broker settings
#define MQTT_BROKER "(接続情報のところに書いてある接続先)"
#define MQTT_USER_NAME "("アクセス制御 -> 認証 -> 追加"で追加したユーザー)"
#define MQTT_USER_PASSWORD "("アクセス制御 -> 認証 -> 追加"で追加したユーザーのパスワード)"
kenichiro90kenichiro90

上記ソースコードを、Arduino IDEを使ってM5CoreS3へ書き込む。
書き込んだ後、Serial Monitorを起動すると、下記のようなメッセージが出力される。

Connecting to WiFi...
Connecting to WiFi...
Connecting to WiFi...
Connected to WiFi
Waiting for NTP time sync: Time synchronized
Current time: Sat Jul 20 06:16:45 2024

Connecting to MQTT Broker as M5CoreS3-client-XX:XX:XX:XX:XX:XX.Connected to MQTT broker
Message received on topic: emqx/m5cores3]: Hi EMQX I'm M5CoreS3 ^^
kenichiro90kenichiro90

デプロイしたMQTTブローカーとM5CoreS3が接続できているので、MQTTXを使ってSubしてみる。

https://mqttx.app/ja

Subするトピックをemqx/m5cores3に設定し、M5CoreS3をリセットする。
リセット後、MQTTX上にメッセージが出力されているので、問題なくSubできている。

kenichiro90kenichiro90

MQTTXからM5CoreS3に対してPubできるか確認するため、Plaintextで文字列を送ってみる。
下記メッセージが出力されているので、Pubの方も問題なく行えている。


MQTTX

Arduino IDE (Serial Monitor)

Connecting to MQTT Broker as M5CoreS3-client-48:27:E2:7A:43:54.Connected to MQTT broker
Message received on topic: emqx/m5cores3]: Hi EMQX I'm M5CoreS3 ^^
Message received on topic: emqx/m5cores3]: Hello M5CoreS3!!
kenichiro90kenichiro90

EMQX PlatformとM5CoreS3を接続し、Pub/Subすることができたので、これから色々と試してみようと思う。

このスクラップは4ヶ月前にクローズされました