🍓

RP2350MQTTクライアントをRustで実装したい。

に公開

ゴール

rp2350(Raspberry pi pico 2 w)をMQTTクライアントとして実装する。

環境

  • Mac Book pro m3
  • raspberry pi pico 2 w
  • open wrt router(GLiNET)
    • Mosquitto(MQTTブローカー)

方針

現時点で、Wifi接続ができる状態が完成しています。

まずはMQTTのロジックについて再確認して、最小限のpub subができることを確認したいと思います。

MQTT

https://qiita.com/kbys-fumi/items/3ebb31a94fd3f9cc0b7a

こちらのサイトを参考にしています。

  • メッセージ
    • MQTTはメッセージ単位で情報をやり取り。
  • トピック
    • メッセージ送受信のキー
    • ブローカーがどのサブスクライバーに受信させるかを判断
  • パブリッシュ/サブスクライブ
    • イベント駆動型
    • メッセージを送信するクライアントをパブリッシャー
      • パブリッシャーはサブスクライバーを限定せずに情報を発信→複数のサブスクライバーはトピック選択して受信
    • メッセージを受信するクライアントをサブスクライバー
      • 知りたいものだけサブスクライブ
    • 仲介するのがブローカー
      • 全てのメッセージを受信、フィルタリング、トピックに関心を持つデバイスの決定、メッセージの送信

ブローカーにはMosquittoを使用します。

rp2350への実装

現状

まずは現状のコードを示します。

//! This example tests the RP Pico 2 W onboard LED.
//!
//! It does not work with the RP Pico 2 board. See `blinky.rs`.

#![no_std]
#![no_main]

use cyw43_pio::{PioSpi, RM2_CLOCK_DIVIDER};
use defmt::*;
use embassy_executor::Spawner;
use embassy_rp::bind_interrupts;
use embassy_rp::gpio::{Level, Output};
use embassy_rp::peripherals::{DMA_CH0, PIO0};
use embassy_rp::pio::{InterruptHandler, Pio};
use embassy_time::{Duration, Timer};
use static_cell::StaticCell;
use {defmt_rtt as _, panic_probe as _};
use cyw43::JoinOptions;

// グローバルロガー(defmt→RTT)
use defmt_rtt as _;
// panic時のバックトレース&停止(probe-rs 連携)
use panic_probe as _;

// ✅ defmtのパニックハンドラを提供(_defmt_panic を満たす)
#[defmt::panic_handler]
fn panic() -> ! {
    panic_probe::hard_fault()
}

// Program metadata for `picotool info`.
// This isn't needed, but it's recommended to have these minimal entries.
#[unsafe(link_section = ".bi_entries")]
#[used]
pub static PICOTOOL_ENTRIES: [embassy_rp::binary_info::EntryAddr; 4] = [
    embassy_rp::binary_info::rp_program_name!(c"Blinky Example"),
    embassy_rp::binary_info::rp_program_description!(
        c"This example tests the RP Pico 2 W's onboard LED, connected to GPIO 0 of the cyw43 \
        (WiFi chip) via PIO 0 over the SPI bus."
    ),
    embassy_rp::binary_info::rp_cargo_version!(),
    embassy_rp::binary_info::rp_program_build_attribute!(),
];

bind_interrupts!(struct Irqs {
    PIO0_IRQ_0 => InterruptHandler<PIO0>;
});

#[embassy_executor::task]
async fn cyw43_task(
    runner: cyw43::Runner<'static, Output<'static>, PioSpi<'static, PIO0, 0, DMA_CH0>>,
) -> ! {
    runner.run().await
}

#[embassy_executor::main]
async fn main(spawner: Spawner) {
    let p = embassy_rp::init(Default::default());
    let fw = include_bytes!("../../cyw43-firmware/43439A0.bin");
    let clm = include_bytes!("../../cyw43-firmware/43439A0_clm.bin");

    // To make flashing faster for development, you may want to flash the firmwares independently
    // at hardcoded addresses, instead of baking them into the program with `include_bytes!`:
    //     probe-rs download ../../cyw43-firmware/43439A0.bin --binary-format bin --chip RP235x --base-address 0x10100000
    //     probe-rs download ../../cyw43-firmware/43439A0_clm.bin --binary-format bin --chip RP235x --base-address 0x10140000
    //let fw = unsafe { core::slice::from_raw_parts(0x10100000 as *const u8, 230321) };
    //let clm = unsafe { core::slice::from_raw_parts(0x10140000 as *const u8, 4752) };

    let pwr = Output::new(p.PIN_23, Level::Low);
    let cs = Output::new(p.PIN_25, Level::High);
    let mut pio = Pio::new(p.PIO0, Irqs);
    let spi = PioSpi::new(
        &mut pio.common,
        pio.sm0,
        // SPI communication won't work if the speed is too high, so we use a divider larger than `DEFAULT_CLOCK_DIVIDER`.
        // See: https://github.com/embassy-rs/embassy/issues/3960.
        RM2_CLOCK_DIVIDER,
        pio.irq0,
        cs,
        p.PIN_24,
        p.PIN_29,
        p.DMA_CH0,
    );

    static STATE: StaticCell<cyw43::State> = StaticCell::new();
    let state = STATE.init(cyw43::State::new());
    let (_net_device, mut control, runner) = cyw43::new(state, pwr, spi, fw).await;
    spawner.spawn(cyw43_task(runner)).unwrap();

    control.init(clm).await;
    control
        .set_power_management(cyw43::PowerManagementMode::PowerSave)
        .await;

    const SSID: &str = "OpenWrt";
    const PASS: &str = "-Xzqz889TL*C";

    // 念のため LED を消灯して開始
    control.gpio_set(0, false).await;

    loop {
        // --- 接続チャレンジ中の点滅(200ms * 4 回) ---
        for _ in 0..4 {
            control.gpio_set(0, true).await; // ON
            Timer::after_millis(200).await;
            control.gpio_set(0, false).await; // OFF
            Timer::after_millis(200).await;
        }

        // --- 実際の接続試行 ---
        match control.join(SSID, JoinOptions::new(PASS.as_bytes())).await {
            Ok(_) => {
                // info!("joined Wi-Fi: {}", SSID);
                // 成功: LED を点灯して終了
                control.gpio_set(0, true).await;
                break ;
            }
            Err(_e) => {
                // info!("join failed: status = {}", e.status);
                // 失敗: 小休止して次の点滅→再試行へ
                Timer::after_millis(500).await;
            }
        }
    }
}

wifiルーターに接続するまでは点滅を繰り返し、完了次第点灯する実装です。

コードの内容は異なりますが、環境構築についてはこちらをご覧ください。

MQTTライブラリの選定

既存のRustプロジェクトの中で、MQTTを実装しているものはいくつかあり、最もメジャーなのはrumqttなのかな?と考えています。

しかしながら、rumqttはtokioを前提としているため、今回のnostd環境には実装ができません。

いくつかの候補から選定してみたいと思います。

  • minimq
    • embedded-nalベースでどのTCPスタックにも載せやすい。embassy-netとの相性が良い
  • mqttrust
    • セキュア接続を意識した軽量クライアント
  • embassy-net
    • raspi pico 2 wに搭載されているcyw43にembassy-net用Net driverが用意されており、Wifi接続後にそのままMQTTのTCP下層として利用できる。

現在のWifi接続までできているコードからも、embassy系の機能で統一した方が良いと思われますので、embassy-net + minimqの組み合わせで実装する方針でいこうと思います。

  1. embassy-netのスタックをのせる
  2. TcpClientを作成
  3. minimqを挿入

以上のような流れでMQTT Connectまで実装してみたいと思います。

と考えていたのですが、以下のようなリポジトリを発見しました。

https://github.com/mountainlizard/mountain-mqtt

ここにはrp2040向けのexampleが存在しています。

これを参考にrp2350向けのMQTTクライアントコードを書こうと思います。

MQTTクライアントの作成

ファイルの構成はexampleと同じです。

Cargo.tomlとmain.rsの中身のみ、下記のように変えています。

[package]
edition = "2024"
name = "embassy-rp2350-examples"
version = "0.1.0"

publish = false

[dependencies]
embassy-embedded-hal = { version = "0.3.0", features = ["defmt"] }
embassy-sync = { version = "0.6.2", features = ["defmt"] }
embassy-executor = { version = "0.7.0", features = [
  "arch-cortex-m",
  "executor-thread",
  "executor-interrupt",
  "defmt",
  "task-arena-size-131072",
] }
embassy-time = { version = "0.4.0", features = [
  "defmt",
  "defmt-timestamp-uptime",
] }
embassy-rp = { version = "0.4.0", default-features = false, features = [
  "defmt",
  "unstable-pac",
  "time-driver",
  "critical-section-impl",
  "rp235xa",
  "binary-info",
] }
embassy-usb = { version = "0.4.0", features = ["defmt"] }
embassy-net = { version = "0.7.0", features = [
  "defmt",
  # "icmp",
  "tcp",
  "udp",
  "raw",
  "dhcpv4",
  "medium-ethernet",
  "dns",
  # "proto-ipv4",
  # "proto-ipv6",
  # "multicast",
] }
embassy-net-wiznet = { version = "0.2.0", features = ["defmt"] }
embassy-futures = { version = "0.1.1" }
embassy-usb-logger = { version = "0.4.0" }
cyw43 = { version = "0.3.0", features = ["defmt", "firmware-logs"] }
cyw43-pio = { version = "0.4.0", features = ["defmt"] }

defmt = "1.0.1"
defmt-rtt = "1.0.0"
fixed = "1.23.1"
fixed-macro = "1.2"

# for web request example
reqwless = { version = "0.13.0", features = ["defmt"] }
serde = { version = "1.0.209", default-features = false, features = ["derive"] }
serde-json-core = "0.6.0"

#cortex-m = { version = "0.7.6", features = ["critical-section-single-core"] }
cortex-m = { version = "0.7.6", features = ["inline-asm"] }
cortex-m-rt = "0.7.0"
critical-section = "1.1"
panic-probe = { version = "1.0.0", features = ["print-defmt"] }
byte-slice-cast = { version = "1.2.0", default-features = false }
smart-leds = "0.4.0"
heapless = "0.8"
usbd-hid = "0.8.1"

embedded-hal-1 = { package = "embedded-hal", version = "1.0" }
embedded-hal-async = "1.0"
embedded-hal-bus = { version = "0.1", features = ["async"] }
embedded-io-async = { version = "0.6.1", features = ["defmt-03"] }
embedded-storage = { version = "0.3" }
static_cell = "2.1"
portable-atomic = { version = "1.5", features = ["critical-section"] }
log = "0.4"
rand = { version = "0.8.5", default-features = false }
embedded-sdmmc = "0.7.0"

mountain-mqtt = { version = "0.2.0", default-features = false, features = [
  "embedded-io-async",
  "embedded-hal-async",
  "defmt",
] }

mountain-mqtt-embassy = { version = "0.2.0"}
rand_core = "0.6"

[profile.release]
# Enable generation of debug symbols even on release builds
debug = true

[package.metadata.embassy]
build = [
  { target = "thumbv8m.main-none-eabihf", artifact-dir = "out/examples/rp235x" }
]

[[bin]]
name = "blinky_wifi"
path = "src/bin/blinky_wifi.rs"
test = false
doctest = false
bench = false

[[bin]]
name = "wifi_connect"
path = "src/bin/wifi_connect.rs"
test = false
doctest = false
bench = false

[[bin]]
name = "tcp_client"
path = "src/bin/tcp_client.rs"
test = false
doctest = false
bench = false

[[bin]]
name = "mqtt_client"
path = "src/bin/mqtt_client.rs"
test = false
doctest = false
bench = false

//! This example tests the RP Pico 2 W onboard LED.
//!
//! It does not work with the RP Pico 2 board. See `blinky.rs`.

#![no_std]
#![no_main]
#![allow(async_fn_in_trait)]

use core::fmt::Write as _;
use cyw43::JoinOptions;
use cyw43_pio::{PioSpi, RM2_CLOCK_DIVIDER};
use defmt::*;
use embassy_executor::Spawner;
use embassy_net::Ipv4Address;
// use embassy_net::dns::DnsSocket;
// use embassy_net::tcp::client::{TcpClient, TcpClientState};
use embassy_net::{Config, StackResources};
use embassy_rp::bind_interrupts;
use embassy_rp::clocks::RoscRng;
// use embassy_rp::flash::Async;
use embassy_rp::gpio::{Level, Output};
use embassy_rp::peripherals::{DMA_CH0, PIO0};
use embassy_rp::pio::{InterruptHandler, Pio};
use embassy_rp2350_examples::action::Action;
use embassy_rp2350_examples::channels::{ActionChannel, EventChannel};
use embassy_rp2350_examples::event::Event;
use embassy_rp2350_examples::example_mqtt_manager;
use embassy_rp2350_examples::ui::ui_task;
use embassy_sync::blocking_mutex::raw::NoopRawMutex;
use embassy_sync::pubsub::PubSubChannel;
use embassy_time::{Duration, Timer};
use heapless::String;
use rand_core::RngCore;
use static_cell::StaticCell;
use {defmt_rtt as _, panic_probe as _};

// グローバルロガー(defmt→RTT)
use defmt_rtt as _;
// panic時のバックトレース&停止(probe-rs 連携)
use panic_probe as _;

// ✅ defmtのパニックハンドラを提供(_defmt_panic を満たす)
#[defmt::panic_handler]
fn panic() -> ! {
    panic_probe::hard_fault()
}

// Program metadata for `picotool info`.
// This isn't needed, but it's recommended to have these minimal entries.
#[unsafe(link_section = ".bi_entries")]
#[used]
pub static PICOTOOL_ENTRIES: [embassy_rp::binary_info::EntryAddr; 4] = [
    embassy_rp::binary_info::rp_program_name!(c"Blinky Example"),
    embassy_rp::binary_info::rp_program_description!(
        c"This example tests the RP Pico 2 W's onboard LED, connected to GPIO 0 of the cyw43 \
        (WiFi chip) via PIO 0 over the SPI bus."
    ),
    embassy_rp::binary_info::rp_cargo_version!(),
    embassy_rp::binary_info::rp_program_build_attribute!(),
];

bind_interrupts!(struct Irqs {
    PIO0_IRQ_0 => InterruptHandler<PIO0>;
});

const SSID: &str = "SSID";
const PASS: &str = "PASS";

// const FLASH_SIZE: usize = 2 * 1024 * 1024;

const MQTT_HOST: &str = "";
const MQTT_PORT: &str = "";

static UID: StaticCell<String<64>> = StaticCell::new();
static CHIP_ID: StaticCell<String<64>> = StaticCell::new();
static EVENT_CHANNEL: StaticCell<EventChannel> = StaticCell::new();
static ACTION_CHANNEL: StaticCell<ActionChannel> = StaticCell::new();

#[embassy_executor::task]
async fn cyw43_task(
    runner: cyw43::Runner<'static, Output<'static>, PioSpi<'static, PIO0, 0, DMA_CH0>>,
) -> ! {
    runner.run().await
}

#[embassy_executor::task]
async fn net_task(mut runner: embassy_net::Runner<'static, cyw43::NetDriver<'static>>) -> ! {
    runner.run().await
}

#[embassy_executor::main]
async fn main(spawner: Spawner) {
    let p = embassy_rp::init(Default::default());

    // --- Read 64-bit chip unique ID (public device ID) from OTP (CHIPID0..3) ---
    // Reference: RP2350 datasheet, Chapter 13 (OTP). The ECC read data window is at 0x4013_0000,
    // and the *guarded* ECC read alias is at 0x4013_8000. CHIPID0..3 are at offsets 0x000..0x003.
    #[inline(always)]
    fn read_chip_uid() -> u64 {
        const OTP_DATA_GUARDED_BASE: usize = 0x4013_8000; // ECC, guarded
        unsafe {
            // Ensure the OTP USER (data) interface is enabled: USR.DCTRL=1 at 0x4012_0128 bit 0
            let usr = 0x4012_0128 as *mut u32;
            let v = core::ptr::read_volatile(usr);
            if (v & 1) == 0 {
                core::ptr::write_volatile(usr, v | 1);
            }

            // Read four 16-bit words: CHIPID0..3 (least to most significant)
            let p = OTP_DATA_GUARDED_BASE as *const u16;
            let w0 = core::ptr::read_volatile(p.add(0)) as u64; // CHIPID0, bits 15:0
            let w1 = core::ptr::read_volatile(p.add(1)) as u64; // CHIPID1, bits 31:16
            let w2 = core::ptr::read_volatile(p.add(2)) as u64; // CHIPID2, bits 47:32
            let w3 = core::ptr::read_volatile(p.add(3)) as u64; // CHIPID3, bits 63:48
            (w3 << 48) | (w2 << 32) | (w1 << 16) | w0
        }
    }

    let chip_uid = read_chip_uid();

    // Format and store into global strings
    let chip_id_handle = CHIP_ID.init(String::new());
    let uid_handle = UID.init(String::new());
    // 16 hex digits, zero-padded
    let _ = core::write!(chip_id_handle, "{:016X}", chip_uid);
    let _ = core::write!(uid_handle, "embassy-example-{}", chip_id_handle);

    let mut rng = RoscRng;
    let fw = include_bytes!("../../cyw43-firmware/43439A0.bin");
    let clm = include_bytes!("../../cyw43-firmware/43439A0_clm.bin");

    // To make flashing faster for development, you may want to flash the firmwares independently
    // at hardcoded addresses, instead of baking them into the program with `include_bytes!`:
    //     probe-rs download ../../cyw43-firmware/43439A0.bin --binary-format bin --chip RP235x --base-address 0x10100000
    //     probe-rs download ../../cyw43-firmware/43439A0_clm.bin --binary-format bin --chip RP235x --base-address 0x10140000
    //let fw = unsafe { core::slice::from_raw_parts(0x10100000 as *const u8, 230321) };
    //let clm = unsafe { core::slice::from_raw_parts(0x10140000 as *const u8, 4752) };

    let pwr = Output::new(p.PIN_23, Level::Low);
    let cs = Output::new(p.PIN_25, Level::High);
    let mut pio = Pio::new(p.PIO0, Irqs);
    let spi = PioSpi::new(
        &mut pio.common,
        pio.sm0,
        // SPI communication won't work if the speed is too high, so we use a divider larger than `DEFAULT_CLOCK_DIVIDER`.
        // See: https://github.com/embassy-rs/embassy/issues/3960.
        RM2_CLOCK_DIVIDER,
        pio.irq0,
        cs,
        p.PIN_24,
        p.PIN_29,
        p.DMA_CH0,
    );

    static STATE: StaticCell<cyw43::State> = StaticCell::new();
    let state = STATE.init(cyw43::State::new());
    let (net_device, mut control, runner) = cyw43::new(state, pwr, spi, fw).await;
    spawner.spawn(cyw43_task(runner)).unwrap();

    control.init(clm).await;
    control
        .set_power_management(cyw43::PowerManagementMode::PowerSave)
        .await;

    let config = Config::dhcpv4(Default::default());

    let seed = rng.next_u64();

    static RESOURCES: StaticCell<StackResources<5>> = StaticCell::new();
    let (stack, runner) = embassy_net::new(
        net_device,
        config,
        RESOURCES.init(StackResources::new()),
        seed,
    );

    spawner.spawn(net_task(runner)).unwrap();

    // 念のため LED を消灯して開始
    control.gpio_set(0, false).await;

    loop {
        // --- 接続チャレンジ中の点滅(200ms * 4 回) ---
        for _ in 0..4 {
            control.gpio_set(0, true).await; // ON
            Timer::after_millis(200).await;
            control.gpio_set(0, false).await; // OFF
            Timer::after_millis(200).await;
        }

        // --- 実際の接続試行 ---
        match control.join(SSID, JoinOptions::new(PASS.as_bytes())).await {
            Ok(_) => {
                // info!("joined Wi-Fi: {}", SSID);
                // 成功: LED を点灯して終了
                control.gpio_set(0, true).await;
                break;
            }
            Err(_e) => {
                // info!("join failed: status = {}", e.status);
                // 失敗: 小休止して次の点滅→再試行へ
                Timer::after_millis(500).await;
            }
        }
    }

    stack.wait_link_up().await;

    stack.wait_config_up().await;

    let event_channel = EVENT_CHANNEL.init(PubSubChannel::<NoopRawMutex, Event, 16, 4, 2>::new());
    let event_pub_mqtt = event_channel.publisher().unwrap();
    let event_sub_ui = event_channel.subscriber().unwrap();

    let action_channel =
        ACTION_CHANNEL.init(PubSubChannel::<NoopRawMutex, Action, 16, 4, 4>::new());
    let action_pub_ui = action_channel.publisher().unwrap();
    let action_sub = action_channel.subscriber().unwrap();

    let host = MQTT_HOST.parse::<Ipv4Address>().unwrap();
    let port = MQTT_PORT.parse::<u16>().unwrap();

    unwrap!(spawner.spawn(ui_task(event_sub_ui, action_pub_ui, p.PIN_12, control)));

    example_mqtt_manager::init(
        &spawner,
        stack,
        uid_handle,
        event_pub_mqtt,
        action_sub,
        host,
        port,
    )
    .await;

    loop {
        Timer::after(Duration::from_secs(5)).await;
    }
}

変更している箇所はユニークなID(MQTT用)を生成する際にrp2040に依存している箇所を書き換えているくらいです。

それでは書き込んでみます。

cargo build --bin mqtt_client --release
picotool load -u -v -x -t elf target/thumbv8m.main-none-eabihf/release/mqtt_client

Mosquittoとの接続

現在の実装では接続チャレンジ中にLEDが点滅し、接続完了時にLEDが点灯します。

電源を入れたところ、無事に点灯しました。

OpenWRTの管理画面からも、無事に接続が確認できます。

Mosquittoで該当するembassy-example-rp2350wのトピックをサブスクライブし、Buttonの動作をチェックしてみます。

mosquitto_sub -h 192.168.110.1 -p 1883 -t embassy-example-rp2035w-button -v
embassy-example-rp2035w-button true
embassy-example-rp2035w-button false
embassy-example-rp2035w-button true
embassy-example-rp2035w-button false
embassy-example-rp2035w-button true
embassy-example-rp2035w-button false
embassy-example-rp2035w-button true
embassy-example-rp2035w-button false
embassy-example-rp2035w-button true
embassy-example-rp2035w-button false
embassy-example-rp2035w-button true
embassy-example-rp2035w-button false

良い感じです。

続いてLEDのトピックにパブリッシュしてみます。

mosquitto_pub -h 192.168.110.1 -p 1883 -t embassy-example-rp2035w-led -m false

無事にLEDが消灯しました。

これにて目標達成です。

Discussion