PythonのSocket通信を利用したprobe-rsデバッグ
1. はじめに
マイコン開発においては「開発環境の構築」「デバッグ機能の利用」「実機動作のリアルタイム確認」が重要です。一般的には、各メーカー製のIDEを使うことで、それらがすべてパッケージ化されているため、とても便利にデバッグが行えます。しかし一方で、そうした商用ツールのライセンス要件や各種ツールの連携のしやすさ、そしてOS・プラットフォームを跨いだ開発(Windows, macOS, Linuxなど)といった柔軟性の観点で課題も存在します。
これらの課題解決案として、今回の記事のポイントとなるのが、「Socket通信を使って外部からプローブにアクセスしてRTT通信する」 というアイデアです。Rust
組み込み開発環境+probe-rs
単体でも標準入力/標準出力を使ってRTT送受信が可能ですが、それだけだとローカルPC内で閉じてしまいます。
そこで、「RTTツールをサブプロセスで起動するPythonスクリプト」を作り、そのスクリプトがローカルホスト上のTCPソケット(あるいは実運用ではLAN/WAN上のアドレスでも可)を介して外部と通信を中継する形をとります。
この構成を実現するためのSocket通信サンプルとして、以下の2つのファイルを用意しています。
-
socket_server.py
: サーバ側スクリプト。probe_rs_rtt_cmd
(Rustバイナリ) をサブプロセスとして起動し、RTT入出力とネットワーク越しのクライアントを中継します。 -
socket_client.py
: クライアントスクリプト。ユーザがコマンドを入力してサーバに送信し、RTT経由でターゲットに届けられ、ターゲットからのRTT出力もサーバ経由で受け取ることができます。
2. サンプルコードの構成
サンプルコードは大きく3つに分かれています。
-
Rust実行ファイル (
probe_rs_rtt_cmd
)- Cargoプロジェクトとして
probe_rs_rtt_cmd.rs
とCargo.toml
を用意。
- Cargoプロジェクトとして
- Pythonサーバ (
socket_server.py
) - Pythonクライアント (
socket_client.py
)
このRust実行ファイル (probe_rs_rtt_cmd
) は、プローブ(CMISIS-DAP)を介してターゲットにRTTで接続し、標準入力→ターゲット、ターゲット→標準出力というパイプ的な動作をします。そしてこのRustバイナリをPythonサーバ側がサブプロセス起動して、ソケットクライアントからのコマンドをターゲットに送信し、ターゲットからのログをソケットクライアントに返す、という仕組みです。
以下、それぞれの役割をもう少し詳しく見ていきます。
probe_rs_rtt_cmd.rs
3. Rust側のサンプル: Pythonスクリプトから呼び出されるサブプロセスとしてのRustバイナリです。Cargoプロジェクトとして動作するために、**Cargo.toml
**には次のように記載しておきます。
3.1 Cargo.toml の例
[package]
name = "probe_rs_rtt_cmd"
version = "0.1.0"
edition = "2021"
[dependencies]
probe-rs = "0.25"
probe-rs
のバージョンは記事執筆時点で0.25
を指定しています。
この上で、src/main.rs
もしくは単一ファイル名(probe_rs_rtt_cmd.rs
)として以下のように書きます。
3.2 RTTを使った基本的な入出力
use probe_rs::Permissions;
use probe_rs::probe::list::Lister;
use probe_rs::rtt::Rtt;
use std::{
io::{self, Write},
thread,
time::Duration,
};
fn main() -> Result<(), Box<dyn std::error::Error>> {
// --- プローブのリストを取得 ---
let lister = Lister::new();
let probes = lister.list_all();
if probes.is_empty() {
eprintln!("No probes found!");
return Ok(());
}
// 最初に見つかったプローブを使用
let probe_info = &probes[0];
let probe = probe_info.open()?;
// セッションを開始
// "ATSAME54P20A" の部分は、使用するターゲットMCUに合わせて書き換える
let mut session = probe.attach("ATSAME54P20A", Permissions::default())?;
// RTT をアタッチ
let mut core = session.core(0)?;
let mut rtt = Rtt::attach(&mut core)?;
println!("RTT communication on ch0 started!");
let stdin = io::stdin();
loop {
// (1) ターゲット->ホスト: UpChannelを読む
{
let up_channels = rtt.up_channels();
let up_channel = up_channels.get_mut(0).ok_or("Up channel not found")?;
let mut buffer = [0u8; 1024];
if let Ok(count) = up_channel.read(&mut core, &mut buffer) {
if count > 0 {
let received = String::from_utf8_lossy(&buffer[..count]);
println!("(RTT from MCU): {}", received);
}
}
}
// (2) ユーザ入力(標準入力)を取得
print!("Enter message to MCU (RTT down ch0) > ");
io::stdout().flush()?;
let mut user_input = String::new();
stdin.read_line(&mut user_input)?;
let trimmed_input = user_input.trim();
// (3) ホスト->ターゲット: DownChannelに書く
if !trimmed_input.is_empty() {
let down_channels = rtt.down_channels();
let down_channel = down_channels.get_mut(0).ok_or("Down channel not found")?;
if let Ok(bytes_written) = down_channel.write(&mut core, trimmed_input.as_bytes()) {
println!("(RTT to MCU): Sent {} bytes: {}", bytes_written, trimmed_input);
}
}
// ポーリング間隔
thread::sleep(Duration::from_millis(100));
}
}
-
probe.attach("ATSAME54P20A", ...)
の箇所は、実際に使用するMCUに合わせて文字列を変更してください。 -
Rtt::attach()
を呼ぶことで、RTTチャネルへのアクセス(アップチャネル/ダウンチャネル)が可能になります。 - コマンドライン上で実行すると、標準入力でコマンドを受け付け、それをMCUに送信し、MCUからのログを表示するサンプルです。
- このプログラム単体でも実行すればCLIで動作します。まずは単体でRTT通信できるかをお試しください。
- この標準入出力の部分を、次に紹介するPythonソケット経由で中継すれば、リモート接続した状態で同様の入出力ができます。
socket_server.py
3.3 Socket Sverver # socket_server.py
import socket
import subprocess
import threading
import sys
import select
HOST = '127.0.0.1'
PORT = 50000
def main():
# 1) Rust製RTTツールをサブプロセスで起動
rtt_process = subprocess.Popen(
['./probe_rs_rtt_cmd'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=0
)
# 2) ソケットサーバを立ち上げる
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind((HOST, PORT))
server_socket.listen(1)
print(f"Socket server listening on {HOST}:{PORT}")
while True:
# クライアントからの接続待ち
client_conn, client_addr = server_socket.accept()
print(f"Client connected: {client_addr}")
# 新たなスレッドを立てて、RTTプロセスとの中継を担当
relay_thread = threading.Thread(
target=relay_client_and_rtt,
args=(client_conn, rtt_process),
daemon=True
)
relay_thread.start()
def relay_client_and_rtt(client_conn, rtt_process):
# RTTツール(サブプロセス)の標準出力 -> クライアントへ中継する
def rtt_stdout_reader():
while True:
output_line = rtt_process.stdout.readline()
if not output_line:
print("RTT process ended.")
break
client_conn.sendall(output_line.encode('utf-8'))
stdout_thread = threading.Thread(target=rtt_stdout_reader, daemon=True)
stdout_thread.start()
# クライアント -> RTTツール(サブプロセス標準入力) への転送
rtt_stdin = rtt_process.stdin
try:
while True:
data = client_conn.recv(1024)
if not data:
print("Client disconnected.")
break
rtt_stdin.write(data.decode('utf-8') + '\n')
rtt_stdin.flush()
except Exception as e:
print(f"Exception in relay loop: {e}")
finally:
client_conn.close()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print("Server shutting down (KeyboardInterrupt).")
sys.exit(0)
-
subprocess.Popen([...])
でprobe_rs_rtt_cmd
を起動しています。 - メインループでソケット待ち受けを行い、クライアントが接続してくれば
relay_client_and_rtt()
で中継を行います。 -
relay_client_and_rtt()
内部では- Rustの標準出力が出るたびにクライアントへ
sendall()
で転送 - クライアントから受信したデータをRustバイナリの標準入力へ書き込む
- Rustの標準出力が出るたびにクライアントへ
といった具合に、「標準入出力」と「ソケット」の双方向の橋渡しをしています。
socket_client.py
3.4 Socket Client # socket_client.py
import socket
import sys
HOST = '127.0.0.1'
PORT = 50000
def main():
try:
with socket.create_connection((HOST, PORT)) as s:
print(f"Connected to server {HOST}:{PORT}")
print("Enter commands to send to MCU via RTT. Type 'exit' or Ctrl+C to quit.")
while True:
user_input = input(">> ")
if user_input.lower() in ("exit", "quit"):
print("Exiting client.")
break
# RTT経由でターゲットに送りたいコマンドをサーバへ送信
s.sendall((user_input + "\n").encode('utf-8'))
# サーバ(Rustバイナリ)からの応答1行を受け取って表示
data = s.recv(1024)
if not data:
print("Server closed connection.")
break
print(f"MCU says: {data.decode('utf-8', errors='replace')}", end='')
except KeyboardInterrupt:
print("\nClient interrupted. Exiting.")
except Exception as e:
print(f"Error: {e}")
if __name__ == '__main__':
main()
- ソケットでサーバに接続し、ユーザの入力を送信。
- 受信データがあれば表示し続ける。
-
"exit"
と入力すれば終了する。
ポイントは、ターゲットへのコマンド送信とターゲットからの文字列表示をソケット越しに行っているだけです。
4. 実行イメージとワークフロー
実行の全体像は下記のようになります。
- マイコンとデバッグプローブ(ST-LinkやCMSIS-DAP規格のプローブなど)を接続し、マイコンがRTTを使ってコマンド受付と結果の出力できるようファームウェアを書き込んでおきます。
-
probe_rs_rtt_cmd
(Rustバイナリ) のビルドを行い、同じディレクトリに配置しておきます。 -
socket_server.py
を起動$ python socket_server.py
- すると、
probe_rs_rtt_cmd
がサブプロセスで動き、RTTにアタッチされる状態になります。
- すると、
-
socket_client.py
を別のターミナルから起動$ python socket_client.py
- これでクライアントが
socket_server.py
に接続します。
- これでクライアントが
-
クライアント画面でコマンドを入力
- 入力文字列がRTTのダウンチャネルに書き込まれ、マイコン上のファームウェアで受信処理。
- MCユーザプログラムが応答をRTTのアップチャネルに書き込む。
- Rustバイナリがそれを標準出力に表示し、
socket_server.py
がソケットクライアントに転送し、クライアント画面に出力されます。
ローカルホスト(127.0.0.1)だけでなく、LANやVPN経由でsocket_server.py
が待ち受けるポートにアクセスできる状態にすれば、離れた拠点からでもMCUのRTTやデバッグコマンドを読み書きできる環境が実現できます。今回は、uart通信をSWD経由のRTT通信で再現する簡単なコードですが、probe-rsには様々な機能がありますので、今回のサンプルコードをベースにより高度なデバッグツールを作成してくことが可能になると思います。
Discussion