👩‍👦

PythonのSocket通信を利用したprobe-rsデバッグ

2024/12/31に公開

1. はじめに

マイコン開発においては「開発環境の構築」「デバッグ機能の利用」「実機動作のリアルタイム確認」が重要です。一般的には、各メーカー製のIDEを使うことで、それらがすべてパッケージ化されているため、とても便利にデバッグが行えます。しかし一方で、そうした商用ツールのライセンス要件各種ツールの連携のしやすさ、そしてOS・プラットフォームを跨いだ開発(Windows, macOS, Linuxなど)といった柔軟性の観点で課題も存在します。

これらの課題解決案として、今回の記事のポイントとなるのが、「Socket通信を使って外部からプローブにアクセスしてRTT通信する」 というアイデアです。Rust組み込み開発環境+probe-rs単体でも標準入力/標準出力を使ってRTT送受信が可能ですが、それだけだとローカルPC内で閉じてしまいます。

そこで、「RTTツールをサブプロセスで起動するPythonスクリプト」を作り、そのスクリプトがローカルホスト上のTCPソケット(あるいは実運用ではLAN/WAN上のアドレスでも可)を介して外部と通信を中継する形をとります。

この構成を実現するためのSocket通信サンプルとして、以下の2つのファイルを用意しています。

  • socket_server.py: サーバ側スクリプト。probe_rs_rtt_cmd (Rustバイナリ) をサブプロセスとして起動し、RTT入出力とネットワーク越しのクライアントを中継します。
  • socket_client.py: クライアントスクリプト。ユーザがコマンドを入力してサーバに送信し、RTT経由でターゲットに届けられ、ターゲットからのRTT出力もサーバ経由で受け取ることができます。

2. サンプルコードの構成

サンプルコードは大きく3つに分かれています。

  1. Rust実行ファイル (probe_rs_rtt_cmd)
    • Cargoプロジェクトとしてprobe_rs_rtt_cmd.rsCargo.tomlを用意。
  2. Pythonサーバ (socket_server.py)
  3. Pythonクライアント (socket_client.py)

このRust実行ファイル (probe_rs_rtt_cmd) は、プローブ(CMISIS-DAP)を介してターゲットにRTTで接続し、標準入力→ターゲット、ターゲット→標準出力というパイプ的な動作をします。そしてこのRustバイナリをPythonサーバ側がサブプロセス起動して、ソケットクライアントからのコマンドをターゲットに送信し、ターゲットからのログをソケットクライアントに返す、という仕組みです。

以下、それぞれの役割をもう少し詳しく見ていきます。


3. Rust側のサンプル: probe_rs_rtt_cmd.rs

Pythonスクリプトから呼び出されるサブプロセスとしてのRustバイナリです。Cargoプロジェクトとして動作するために、**Cargo.toml**には次のように記載しておきます。

3.1 Cargo.toml の例

[package]
name = "probe_rs_rtt_cmd"
version = "0.1.0"
edition = "2021"

[dependencies]
probe-rs = "0.25"

probe-rsのバージョンは記事執筆時点で0.25を指定しています。
この上で、src/main.rs もしくは単一ファイル名(probe_rs_rtt_cmd.rs)として以下のように書きます。


3.2 RTTを使った基本的な入出力

use probe_rs::Permissions;
use probe_rs::probe::list::Lister;
use probe_rs::rtt::Rtt;
use std::{
    io::{self, Write},
    thread,
    time::Duration,
};

fn main() -> Result<(), Box<dyn std::error::Error>> {
    // --- プローブのリストを取得 ---
    let lister = Lister::new();
    let probes = lister.list_all();

    if probes.is_empty() {
        eprintln!("No probes found!");
        return Ok(());
    }

    // 最初に見つかったプローブを使用
    let probe_info = &probes[0];
    let probe = probe_info.open()?; 

    // セッションを開始
    // "ATSAME54P20A" の部分は、使用するターゲットMCUに合わせて書き換える
    let mut session = probe.attach("ATSAME54P20A", Permissions::default())?;

    // RTT をアタッチ
    let mut core = session.core(0)?;
    let mut rtt = Rtt::attach(&mut core)?;

    println!("RTT communication on ch0 started!");

    let stdin = io::stdin();

    loop {
        // (1) ターゲット->ホスト: UpChannelを読む
        {
            let up_channels = rtt.up_channels();
            let up_channel = up_channels.get_mut(0).ok_or("Up channel not found")?;

            let mut buffer = [0u8; 1024];
            if let Ok(count) = up_channel.read(&mut core, &mut buffer) {
                if count > 0 {
                    let received = String::from_utf8_lossy(&buffer[..count]);
                    println!("(RTT from MCU): {}", received);
                }
            }
        }

        // (2) ユーザ入力(標準入力)を取得
        print!("Enter message to MCU (RTT down ch0) > ");
        io::stdout().flush()?;
        let mut user_input = String::new();
        stdin.read_line(&mut user_input)?;
        let trimmed_input = user_input.trim();

        // (3) ホスト->ターゲット: DownChannelに書く
        if !trimmed_input.is_empty() {
            let down_channels = rtt.down_channels();
            let down_channel = down_channels.get_mut(0).ok_or("Down channel not found")?;

            if let Ok(bytes_written) = down_channel.write(&mut core, trimmed_input.as_bytes()) {
                println!("(RTT to MCU): Sent {} bytes: {}", bytes_written, trimmed_input);
            }
        }

        // ポーリング間隔
        thread::sleep(Duration::from_millis(100));
    }
}
  • probe.attach("ATSAME54P20A", ...) の箇所は、実際に使用するMCUに合わせて文字列を変更してください。
  • Rtt::attach() を呼ぶことで、RTTチャネルへのアクセス(アップチャネル/ダウンチャネル)が可能になります。
  • コマンドライン上で実行すると、標準入力でコマンドを受け付け、それをMCUに送信し、MCUからのログを表示するサンプルです。
  • このプログラム単体でも実行すればCLIで動作します。まずは単体でRTT通信できるかをお試しください。
  • この標準入出力の部分を、次に紹介するPythonソケット経由で中継すれば、リモート接続した状態で同様の入出力ができます。

3.3 Socket Sverver socket_server.py

# socket_server.py
import socket
import subprocess
import threading
import sys
import select

HOST = '127.0.0.1'
PORT = 50000

def main():
    # 1) Rust製RTTツールをサブプロセスで起動
    rtt_process = subprocess.Popen(
        ['./probe_rs_rtt_cmd'],
        stdin=subprocess.PIPE,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
        text=True,
        bufsize=0
    )

    # 2) ソケットサーバを立ち上げる
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_socket.bind((HOST, PORT))
    server_socket.listen(1)
    print(f"Socket server listening on {HOST}:{PORT}")

    while True:
        # クライアントからの接続待ち
        client_conn, client_addr = server_socket.accept()
        print(f"Client connected: {client_addr}")

        # 新たなスレッドを立てて、RTTプロセスとの中継を担当
        relay_thread = threading.Thread(
            target=relay_client_and_rtt,
            args=(client_conn, rtt_process),
            daemon=True
        )
        relay_thread.start()

def relay_client_and_rtt(client_conn, rtt_process):
    # RTTツール(サブプロセス)の標準出力 -> クライアントへ中継する
    def rtt_stdout_reader():
        while True:
            output_line = rtt_process.stdout.readline()
            if not output_line:
                print("RTT process ended.")
                break
            client_conn.sendall(output_line.encode('utf-8'))
    
    stdout_thread = threading.Thread(target=rtt_stdout_reader, daemon=True)
    stdout_thread.start()

    # クライアント -> RTTツール(サブプロセス標準入力) への転送
    rtt_stdin = rtt_process.stdin
    try:
        while True:
            data = client_conn.recv(1024)
            if not data:
                print("Client disconnected.")
                break
            rtt_stdin.write(data.decode('utf-8') + '\n')
            rtt_stdin.flush()
    except Exception as e:
        print(f"Exception in relay loop: {e}")
    finally:
        client_conn.close()

if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        print("Server shutting down (KeyboardInterrupt).")
        sys.exit(0)
  • subprocess.Popen([...])probe_rs_rtt_cmdを起動しています。
  • メインループでソケット待ち受けを行い、クライアントが接続してくればrelay_client_and_rtt()で中継を行います。
  • relay_client_and_rtt()内部では
    • Rustの標準出力が出るたびにクライアントへsendall()で転送
    • クライアントから受信したデータをRustバイナリの標準入力へ書き込む

といった具合に、「標準入出力」と「ソケット」の双方向の橋渡しをしています。


3.4 Socket Client socket_client.py

# socket_client.py
import socket
import sys

HOST = '127.0.0.1'
PORT = 50000

def main():
    try:
        with socket.create_connection((HOST, PORT)) as s:
            print(f"Connected to server {HOST}:{PORT}")
            print("Enter commands to send to MCU via RTT. Type 'exit' or Ctrl+C to quit.")

            while True:
                user_input = input(">> ")
                if user_input.lower() in ("exit", "quit"):
                    print("Exiting client.")
                    break

                # RTT経由でターゲットに送りたいコマンドをサーバへ送信
                s.sendall((user_input + "\n").encode('utf-8'))

                # サーバ(Rustバイナリ)からの応答1行を受け取って表示
                data = s.recv(1024)
                if not data:
                    print("Server closed connection.")
                    break
                print(f"MCU says: {data.decode('utf-8', errors='replace')}", end='')

    except KeyboardInterrupt:
        print("\nClient interrupted. Exiting.")
    except Exception as e:
        print(f"Error: {e}")

if __name__ == '__main__':
    main()
  • ソケットでサーバに接続し、ユーザの入力を送信。
  • 受信データがあれば表示し続ける。
  • "exit"と入力すれば終了する。

ポイントは、ターゲットへのコマンド送信ターゲットからの文字列表示ソケット越しに行っているだけです。


4. 実行イメージとワークフロー

実行の全体像は下記のようになります。

  1. マイコンとデバッグプローブ(ST-LinkやCMSIS-DAP規格のプローブなど)を接続し、マイコンがRTTを使ってコマンド受付と結果の出力できるようファームウェアを書き込んでおきます。
  2. probe_rs_rtt_cmd (Rustバイナリ) のビルドを行い、同じディレクトリに配置しておきます。
  3. socket_server.py を起動
    $ python socket_server.py
    
    • すると、probe_rs_rtt_cmdがサブプロセスで動き、RTTにアタッチされる状態になります。
  4. socket_client.py を別のターミナルから起動
    $ python socket_client.py
    
    • これでクライアントがsocket_server.pyに接続します。
  5. クライアント画面でコマンドを入力
    • 入力文字列がRTTのダウンチャネルに書き込まれ、マイコン上のファームウェアで受信処理。
    • MCユーザプログラムが応答をRTTのアップチャネルに書き込む。
    • Rustバイナリがそれを標準出力に表示し、socket_server.pyがソケットクライアントに転送し、クライアント画面に出力されます。

ローカルホスト(127.0.0.1)だけでなく、LANやVPN経由でsocket_server.pyが待ち受けるポートにアクセスできる状態にすれば、離れた拠点からでもMCUのRTTやデバッグコマンドを読み書きできる環境が実現できます。今回は、uart通信をSWD経由のRTT通信で再現する簡単なコードですが、probe-rsには様々な機能がありますので、今回のサンプルコードをベースにより高度なデバッグツールを作成してくことが可能になると思います。

Discussion