EMQXを使ったPub/Subについて
無料枠があるので、とりあえず味見をしてみる。
無料枠があるのは、Serverlessタイプだけ。
リージョンは、North America, Europe, Asia-Pacificの中から選ぶことができる。
消費限度額は、設定できるとのこと。
限度額を超えた場合、以下の2通りから選ぶことができる。
- デプロイ停止
- 注意を促して課金する
支払い方法を登録して、お試しで1個作ってみた。
デプロイには、2分ほどかかるみたい。
管理画面を開くと、チュートリアルが出てくるので親切。
画面構成は、スッキリしていて見やすい。
とりあえず、デプロイしたブローカーへ接続してみる。
EMQX Platform クイックスタートを参考に、ローカルマシンから接続してみる。
接続方法(?)は、以下の通り。
- Client Tools
- MQTTX
- MQTT.fx
- Arduino ESP8266
- Terminal Development Language Demo
- Python
- Golang
- Java
- Node.js
- C
- C#
- Vue.js
- React
- Electron
- Third-party SDK Recommendation
- Python
- Golang
- Java
- JavaScript
- Node.js
- C
- C#
- PHP
- iOS
- Android
普段からPythonを使っているので、Python SDK Demoの内容を参考に、コードを書いてみる。
上記ページでは、MQTT Clientとしてpaho-mqtt
を使っている。
なので、説明の通りインストールして、インポートできるか確認する。
# zsh
pip install paho-mqtt
# REPL
>>> from paho.mqtt import client as mqtt_client
無事、インポートできたので、Pub/Subするコードを書いていく。
接続方法には、TCPを使う方法と、SSL/TLSを使う方法がある。
今回は、SSL/TLSを使ってみる。
接続先の設定は、以下のようにすれば良いみたい。
broker = (接続情報のところに書いてある接続先)
port = 8883
topic = 'python/mqtt'
client_id = f'python-mqtt-{random.randint(0, 1000)}'
username = ("アクセス制御 -> 認証 -> 追加"で追加したユーザー)
password = ("アクセス制御 -> 認証 -> 追加"で追加したユーザーのパスワード)
Pubするコードを書いて実行してみたが、下記エラーが出てしまう。
client = mqtt_client.Client(client_id)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/kenichiro/.pyenv/versions/3.11.7/lib/python3.11/site-packages/paho/mqtt/client.py", line 772, in __init__
raise ValueError(ValueError: Unsupported callback API version: version 2.0 added a callback_api_version, see docs/migrations.rst for details
paho-mqttのAPI仕様が変わっているみたい…
GitHubのリポジトリ(https://github.com/eclipse/paho.mqtt.python)を確認してみたところ、やっぱり変わっていた。
CallbackAPIVersion.VERSION2
を使わないとダメみたいなので、色々と書き直してみる。
書き直したコードは、以下の通り。
test_emqx_pub.py
import random
import time
from paho.mqtt import client as mqtt_client
broker = (接続情報のところに書いてある接続先)
port = 8883
topic = 'python/mqtt'
client_id = f'python-mqtt-{random.randint(0, 1000)}'
username = ("アクセス制御 -> 認証 -> 追加"で追加したユーザー)
password = ("アクセス制御 -> 認証 -> 追加"で追加したユーザーのパスワード)
def connect_mqtt():
def on_connect(client, userdata, flags, reason_code, properties):
if reason_code == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", reason_code)
# Set Connecting Client ID
client = mqtt_client.Client(mqtt_client.CallbackAPIVersion.VERSION2)
# Set CA certificate
client.tls_set(ca_certs='./emqxsl-ca.crt')
client.username_pw_set(username, password)
client.on_connect = on_connect
client.connect(broker, port)
return client
def publish(client):
msg_count = 0
while True:
time.sleep(1)
msg = f"messages: {msg_count}"
result = client.publish(topic, msg)
# result: [0, 1]
status = result[0]
if status == 0:
print(f"Send `{msg}` to topic `{topic}`")
else:
print(f"Failed to send message to topic {topic}")
msg_count += 1
def run():
client = connect_mqtt()
client.loop_start()
publish(client)
if __name__ == '__main__':
run()
test_emqx_sub.py
import random
from paho.mqtt import client as mqtt_client
broker = (接続情報のところに書いてある接続先)
port = 8883
topic = 'python/mqtt'
client_id = f'python-mqtt-{random.randint(0, 1000)}'
username = ("アクセス制御 -> 認証 -> 追加"で追加したユーザー)
password = ("アクセス制御 -> 認証 -> 追加"で追加したユーザーのパスワード)
def connect_mqtt():
def on_connect(client, userdata, flags, reason_code, properties):
if reason_code == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", reason_code)
# Set Connecting Client ID
client = mqtt_client.Client(mqtt_client.CallbackAPIVersion.VERSION2)
# Set CA certificate
client.tls_set(ca_certs='./emqxsl-ca.crt')
client.username_pw_set(username, password)
client.on_connect = on_connect
client.connect(broker, port)
return client
def subscribe(client: mqtt_client):
def on_message(client, userdata, msg):
print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
client.subscribe(topic)
client.on_message = on_message
def run():
client = connect_mqtt()
subscribe(client)
client.loop_forever()
if __name__ == '__main__':
run()
作ったコードを別々のターミナルから実行してみる。
Pub/Subできているので、これで良いみたい。
1個目のターミナル (Pub)
❯ python test_emqx_pub.py
Connected to MQTT Broker!
Send `messages: 0` to topic `python/mqtt`
Send `messages: 1` to topic `python/mqtt`
Send `messages: 2` to topic `python/mqtt`
Send `messages: 3` to topic `python/mqtt`
Send `messages: 4` to topic `python/mqtt`
Send `messages: 5` to topic `python/mqtt`
Send `messages: 6` to topic `python/mqtt`
Send `messages: 7` to topic `python/mqtt`
Send `messages: 8` to topic `python/mqtt`
Send `messages: 9` to topic `python/mqtt`
Send `messages: 10` to topic `python/mqtt`
Send `messages: 11` to topic `python/mqtt`
Send `messages: 12` to topic `python/mqtt`
Send `messages: 13` to topic `python/mqtt`
Send `messages: 14` to topic `python/mqtt`
Send `messages: 15` to topic `python/mqtt`
2個目のターミナル (Sub)
❯ python test_emqx_sub.py
Connected to MQTT Broker!
Received `messages: 5` from `python/mqtt` topic
Received `messages: 6` from `python/mqtt` topic
Received `messages: 7` from `python/mqtt` topic
Received `messages: 8` from `python/mqtt` topic
Received `messages: 9` from `python/mqtt` topic
Received `messages: 10` from `python/mqtt` topic
Received `messages: 11` from `python/mqtt` topic
Received `messages: 12` from `python/mqtt` topic
Received `messages: 13` from `python/mqtt` topic
Received `messages: 14` from `python/mqtt` topic
Received `messages: 15` from `python/mqtt` topic
1個ずつPub/Subしているので、接続数は2になる。
結構簡単にPub/Subすることができたので、マイコンからPub/Subできるか試してみるつもり。
M5CoreS3を使って試してみたところ、少しだけハマったけどPub/Subすることができた。
試してみた限りでは、TCPは使えないらしい。
SSL/TLSが使えるマイコンは意外と少ないので、結構厳しい…
後程、やったことをメモする予定。
忘れないうちに、やったことをメモしていく。
M5CoreS3からPub/Subするとき、ここに書いてある情報が参考になった。
上で書いた通り、TCPは使えないようなので、"Connect over TLS/SSL Port"に書いてある情報を元に.inoファイルを作った。
開発環境は、Arduino IDE v2.2.1を使用した。
インストールしたボードと、ライブラリは以下の通り。
- ボード
- M5Stack 2.1.1
- ライブラリ
- M5CoreS3 1.0.0
- M5GFX 0.1.16
- M5Unified 0.1.16
- PubSubClient 2.8
"Connect over TLS/SSL Port"に書いてあるコードは、ESP8266用のものなので、M5CoreS3で動くように色々と変更した。変更箇所は、以下の通り。
- WiFi, MQTTブローカーの設定
- .inoファイルに直接書いても良いが、別ファイル(
arduino_secrets.h
)で管理するようにした
- .inoファイルに直接書いても良いが、別ファイル(
// arduino_secrets.h
// WiFi Settings
#define SECRET_SSID "(WiFiルーターのSSID)"
#define SECRET_PASS "(WiFiルーターの暗号化キー)"
// MQTT Broker Settings
#define MQTT_BROKER "(接続情報のところに書いてある接続先)"
#define MQTT_USER_NAME "("アクセス制御 -> 認証 -> 追加"で追加したユーザー)"
#define MQTT_USER_PASSWORD "("アクセス制御 -> 認証 -> 追加"で追加したユーザーのパスワード)"
- BearSSLの部分
- M5CoreS3ではBearSSLは使えないので、WiFiClientSecureに変更する
- CA証明書をセットするところ、以下のように変更する
// 変更前
BearSSL::X509List serverTrustedCA(ca_cert);
// 変更後
espClient.setCACert(ca_cert);
- connectToMQTT()のMACアドレスを取得する部分
- Serial.printfのところは、コンパイルエラーが発生してしまうので、snprintfを使うようにした
// 変更前
String client_id = "esp8266-client-" + String(WiFi.macAddress());
Serial.printf("Connecting to MQTT Broker as %s.....\n", client_id.c_str());
// 変更後
char buf[64];
String client_id = "M5CoreS3-client-" + String(WiFi.macAddress());
snprintf(buf, 64, "Connecting to MQTT Broker as %s.....\n", client_id.c_str());
Serial.write(buf);
修正後のソースコード全文は、以下の通り。
test_emqx.ino
#include <M5CoreS3.h>
#include <WiFi.h>
#include <WiFiClientSecure.h>
#include <PubSubClient.h>
#include <time.h>
#include "arduino_secrets.h"
// WiFi settings
char ssid[] = SECRET_SSID; // your network SSID (name)
char pass[] = SECRET_PASS; // your network password (use for WPA, or use as key for WEP)
int status = WL_IDLE_STATUS; // the Wifi radio's status
// MQTT Broker settings
const int mqtt_port = 8883; // MQTT port (TLS)
const char *mqtt_broker = MQTT_BROKER; // EMQX broker endpoint
const char *mqtt_topic = "emqx/m5cores3"; // MQTT topic
const char *mqtt_username = MQTT_USER_NAME; // MQTT username for authentication
const char *mqtt_password = MQTT_USER_PASSWORD; // MQTT password for authentication
// NTP Server settings
const char *ntp_server = "pool.ntp.org"; // Default NTP server
// const char* ntp_server = "cn.pool.ntp.org"; // Recommended NTP server for users in China
const long gmt_offset_sec = 0; // GMT offset in seconds (adjust for your time zone)
const int daylight_offset_sec = 0; // Daylight saving time offset in seconds
// WiFi and MQTT client initialization
WiFiClientSecure espClient;
PubSubClient mqtt_client(espClient);
// SSL certificate for MQTT broker
// Load DigiCert Global Root G2, which is used by EMQX Public Broker: broker.emqx.io
static const char ca_cert[] PROGMEM = R"EOF(
-----BEGIN CERTIFICATE-----
(CA証明書の内容を貼り付ける)
-----END CERTIFICATE-----
)EOF";
void connectToWiFi() {
WiFi.begin(ssid, pass);
while (WiFi.status() != WL_CONNECTED) {
delay(1000);
Serial.println("Connecting to WiFi...");
}
Serial.println("Connected to WiFi");
}
void syncTime() {
configTime(gmt_offset_sec, daylight_offset_sec, ntp_server);
Serial.print("Waiting for NTP time sync: ");
while (time(nullptr) < 8 * 3600 * 2) {
delay(1000);
Serial.print(".");
}
Serial.println("Time synchronized");
struct tm timeinfo;
if (getLocalTime(&timeinfo)) {
Serial.print("Current time: ");
Serial.println(asctime(&timeinfo));
} else {
Serial.println("Failed to obtain local time");
}
}
void mqttCallback(char *topic, byte *payload, unsigned int length) {
Serial.print("Message received on topic: ");
Serial.print(topic);
Serial.print("]: ");
for (int i = 0; i < length; i++) {
Serial.print((char)payload[i]);
}
Serial.println();
}
void connectToMQTT() {
espClient.setCACert(ca_cert);
char buf[64];
while (!mqtt_client.connected()) {
String client_id = "M5CoreS3-client-" + String(WiFi.macAddress());
snprintf(buf, 64, "Connecting to MQTT Broker as %s.....\n", client_id.c_str());
Serial.write(buf);
if (mqtt_client.connect(client_id.c_str(), mqtt_username, mqtt_password)) {
Serial.println("Connected to MQTT broker");
mqtt_client.subscribe(mqtt_topic);
// Publish message upon successful connection
mqtt_client.publish(mqtt_topic, "Hi EMQX I'm M5CoreS3 ^^");
} else {
Serial.print("Failed to connect to MQTT broker, rc=");
Serial.println(mqtt_client.state());
delay(5000);
}
}
}
void setup() {
M5.begin();
M5.Power.begin();
Serial.begin(115200);
while (!Serial);
connectToWiFi();
syncTime(); // X.509 validation requires synchronization time
mqtt_client.setServer(mqtt_broker, mqtt_port);
mqtt_client.setCallback(mqttCallback);
connectToMQTT();
}
void loop() {
if (!mqtt_client.connected()) {
connectToMQTT();
}
mqtt_client.loop();
}
arduino_secrets.h
// WiFi settings
#define SECRET_SSID "(WiFiルーターのSSID)"
#define SECRET_PASS "(WiFiルーターの暗号化キー)"
// MQTT Broker settings
#define MQTT_BROKER "(接続情報のところに書いてある接続先)"
#define MQTT_USER_NAME "("アクセス制御 -> 認証 -> 追加"で追加したユーザー)"
#define MQTT_USER_PASSWORD "("アクセス制御 -> 認証 -> 追加"で追加したユーザーのパスワード)"
上記ソースコードを、Arduino IDEを使ってM5CoreS3へ書き込む。
書き込んだ後、Serial Monitorを起動すると、下記のようなメッセージが出力される。
Connecting to WiFi...
Connecting to WiFi...
Connecting to WiFi...
Connected to WiFi
Waiting for NTP time sync: Time synchronized
Current time: Sat Jul 20 06:16:45 2024
Connecting to MQTT Broker as M5CoreS3-client-XX:XX:XX:XX:XX:XX.Connected to MQTT broker
Message received on topic: emqx/m5cores3]: Hi EMQX I'm M5CoreS3 ^^
デプロイしたMQTTブローカーとM5CoreS3が接続できているので、MQTTXを使ってSubしてみる。
Subするトピックをemqx/m5cores3
に設定し、M5CoreS3をリセットする。
リセット後、MQTTX上にメッセージが出力されているので、問題なくSubできている。
MQTTXからM5CoreS3に対してPubできるか確認するため、Plaintextで文字列を送ってみる。
下記メッセージが出力されているので、Pubの方も問題なく行えている。
MQTTX
Arduino IDE (Serial Monitor)
Connecting to MQTT Broker as M5CoreS3-client-48:27:E2:7A:43:54.Connected to MQTT broker
Message received on topic: emqx/m5cores3]: Hi EMQX I'm M5CoreS3 ^^
Message received on topic: emqx/m5cores3]: Hello M5CoreS3!!
EMQX PlatformとM5CoreS3を接続し、Pub/Subすることができたので、これから色々と試してみようと思う。