🔌

〚Rust〛MCP ServerをE2Eテストする

に公開

背景

https://doc.rust-jp.rs/book-ja/ch11-03-test-organization.html

Rustでテストを書く場合、tests/下に配置される結合テストも通常はアプリケーション本体をライブラリとして参照する形になるはずだ。

しかしMCP serverは明確にインターフェースが定義されており、どうせならそのレイヤーでテストがしたい。

「完全に独立したアプリケーションとして動かす形での統合テスト」は、Rustの tests/ ディレクトリに書くのが最も自然で推奨される方法です。

github copilot(GPT-4.1)もこう言っている。統合テストというかE2Eテストかもしれない。

実装

バイナリの発見

当初claudeは次のようなコードを書いてきた(schedulerというのがMCP serverの名前):

use tokio::process::Command as TokioCommand;

impl McpClient {
    pub async fn new() -> Result<Self, Box<dyn std::error::Error>> {
        let binary_path = std::env::current_dir()
            .unwrap()
            .join("target/debug/scheduler");

        let mut child = TokioCommand::new(binary_path)
            .env("TZ", "UTC")
            .stdin(Stdio::piped())
            .stdout(Stdio::piped())
            .stderr(Stdio::inherit()) // Show stderr for debugging
            .spawn()?;

        let stdin = child.stdin.take().expect("Failed to get stdin");
        let stdout = child.stdout.take().expect("Failed to get stdout");
        let stdout_reader = BufReader::new(stdout);
        let stdout_lines = stdout_reader.lines();

        // Wait for server startup message
        tokio::time::sleep(Duration::from_millis(500)).await;

        Ok(Self {
            child,
            stdin,
            stdout_lines,
            request_id: 1,
        })
    }
……

テスト前にバイナリが無ければわざわざビルドする。

/// Build the scheduler binary before running tests
fn ensure_binary_built() {
    let output = Command::new("cargo")
        .args(["build", "--bin", "scheduler"])
        .output()
        .expect("Failed to execute cargo build");

    if !output.status.success() {
        panic!(
            "Failed to build scheduler binary: {}",
            String::from_utf8_lossy(&output.stderr)
        );
    }
}

テスト実行時のカレントディレクトリは常に一定なのか? debugバイナリが必ずあるのか、位置は一定なのか? 何とも吹けば飛ぶようなコードだ。

https://github.com/rust-lang/cargo/issues/5758

こういうのは大体ちゃんと先人が困っているし、運が良ければ対処法も出来ている。
そして今回の場合運が良かった、つまりCARGO_BIN_EXE_<binary target name>で良い。

https://doc.rust-lang.org/cargo/reference/environment-variables.html#environment-variables-cargo-sets-for-crates

CARGO_BIN_EXE_<name> — The absolute path to a binary target’s executable. This is only set when building an integration test or benchmark. This may be used with the env macro to find the executable to run for testing purposes. The <name> is the name of the binary target, exactly as-is. For example, CARGO_BIN_EXE_my-program for a binary named my-program. Binaries are automatically built when the test is built, unless the binary has required features that are not enabled.

(「バイナリを発見する部分を https://github.com/rust-lang/cargo/pull/7697 によって改善できる?」と尋ねると特にfetchせずに「そうだね」とか言い出したが、知っていたなら最初からやってほしいところだ。)

よってパス取得はこれで良い:

        let binary_path = env!("CARGO_BIN_EXE_scheduler");

クライアント

サーバー実装は基本的にrmcpを使うことになるが、テストでクライアント側もrmcpを使うとやや簡単になる。

use rmcp::transport::TokioChildProcess;
use rmcp::{model::CallToolRequestParam, service::ServiceExt};
use std::process::Stdio;
use tokio::process::Command as TokioCommand;

fn start_server() -> TokioChildProcess {
    let binary_path = env!("CARGO_BIN_EXE_scheduler");

    let mut command = TokioCommand::new(binary_path);
    command
        .env("TZ", "UTC")
        .stdin(Stdio::piped())
        .stdout(Stdio::piped())
        .stderr(Stdio::inherit()); // Show stderr for debugging

    TokioChildProcess::new(command).unwrap()
}

#[tokio::test]
async fn test_set_schedule_daily() {
    let service = ().serve(start_server()).await.unwrap();

    // Set a daily schedule
    let response = service
        .call_tool(CallToolRequestParam {
            name: "set_schedule".into(),
            arguments: Some(rmcp::object!({
                "name": "test_daily",
                "time": "09:00",
                "message": "Daily reminder"
            })),
        })
        .await
        .unwrap();

    assert_eq!(response.is_error, Some(false));
    assert_eq!(response.content.len(), 1);
    assert_eq!(response.content[0].raw.as_text().unwrap().text, "Succeeded");
}

CallToolRequestParam.argumentsの要求はMapなので、serde_json::json!を使うと地味に困る。ちゃんとrmcp::object!が用意されているのでこれを使う。

通知

resourceを使っている人間は全然いないのではないかという気がするが、ユーザー入力ではなくMCPサーバー側のタイミングで通知を飛ばせるので自律的に動作させたい場合は便利な筈だ。(コストコントロールの面では不確実性が増すが)

通知はサーバー側から飛んでくるので、クライアントはそれをどうにかして待つ。rmcpではrmcp::ClientHandlerトレイトにon_resource_updatedメソッドがあり、これを実装すると通知が受け取れる。メインスレッドと別に実行されるので、テストの場合以下のようにメッセージをメインスレッドへ更に送る必要がある。

use std::process::Stdio;

use rmcp::{
    ClientHandler,
    model::{CallToolRequestParam, ResourceUpdatedNotificationParam, SubscribeRequestParam},
    service::{NotificationContext, ServiceExt},
    transport::async_rw::AsyncRwTransport,
};
use tokio::process::Command as TokioCommand;
use tokio::sync::mpsc;

pub struct Client {
    notification_channel: mpsc::Sender<ResourceUpdatedNotificationParam>,
}

impl ClientHandler for Client {
    async fn on_resource_updated(
        &self,
        params: ResourceUpdatedNotificationParam,
        _context: NotificationContext<rmcp::RoleClient>,
    ) {
        let _ = self
            .notification_channel
            .send(params.clone())
            .await
            .unwrap();
    }
}

fn create_server_command() -> TokioCommand {
    let binary_path = env!("CARGO_BIN_EXE_scheduler");

    let mut command = TokioCommand::new(binary_path);
    command
        .env("TZ", "UTC")
        .stdin(Stdio::piped())
        .stdout(Stdio::piped())
        .stderr(Stdio::inherit()); // Show stderr for debugging

    command
}

#[tokio::test]
async fn test_resource_subscription() {
    // Start the server process
    let mut child = create_server_command().spawn().unwrap();

    // Client transport using the child's stdin and stdout
    let stdin = child.stdin.take().expect("Failed to get stdin");
    let stdout = child.stdout.take().expect("Failed to get stdout");
    let client_transport = AsyncRwTransport::new(stdout, stdin);

    // Notification channel
    let (tx, mut rx) = mpsc::channel(8);

    // Create the client
    let client = Client {
        notification_channel: tx,
    }
    .serve(client_transport)
    .await
    .unwrap();

    // Subscribe to the fired_schedule resource
    client
        .subscribe(SubscribeRequestParam {
            uri: "fired_schedule://recent".into(),
        })
        .await
        .unwrap();

    // Set a schedule that will fire soon
    let time = chrono::Local::now();
    let response = client
        .call_tool(CallToolRequestParam {
            name: "set_schedule".into(),
            arguments: Some(rmcp::object!({
                "name": "test",
                "time": time.to_rfc3339(),
                "message": "Subscription test"
            })),
        })
        .await
        .unwrap();
    assert_eq!(response.is_error, Some(false));

    // Wait for the notification
    let notification = rx.recv().await.unwrap();
    assert_eq!(
        notification,
        rmcp::model::ResourceUpdatedNotificationParam {
            uri: "fired_schedule://recent".to_string(),
            title: "Subscription test".to_string()
        }
    );

    client.cancel().await.unwrap();
}

rmcp::transport::TokioChildProcessStdio::piped()を渡すと入出力を親プロセス(テストコード側)に渡すものと思うが、クライアントをまた別(多分スレッド)に動かす場合はそっちに繋ぐ必要がある。それでAsyncRwTransportを構築してあげるこのような形になる筈。

Discussion