【Tauri】Rust から低遅延にフロントエンドにデータをストリーミングする
はじめに
個人開発で、OSC で送られてきた値を Rust で受信し、それを低遅延でフロントエンドに転送してリアルタイムに更新したい要件がありました。公式ドキュメントを確認したところ、目的に合う Channel API があったため、学習を兼ねて記事にまとめました。
OSCとは
OSC(Open Sound Control)は、MIDIの後継として設計された、音楽やメディア制御に特化した高速かつ柔軟な通信プロトコルです。シンプルな構造で低遅延性があり、音楽制作・インタラクティブアート・IoTなど幅広い分野で使われています。
進め方
Rust と JavaScript 間でデータをやり取りするために、最初に両言語で型を定義し、その型に基づいて実装を進めていきます。
データ型を定義する
データは source (識別のアドレス) と packet (送られた値) だけの最小限の入出力を想定しています。
{
"event": "message",
"data": {
"source": "127.0.0.1:xxxxx",
"packet": "/target/3"
}
Rustでの定義
#[derive(Clone, Serialize)]
#[serde(rename_all = "camelCase", tag = "event", content = "data")]
enum OscEvent {
Message { source: String, packet: String },
}
derive とは
属性マクロの一種で、構造体や列挙体に対して標準的なトレイト(機能)を自動で実装してくれる仕組み
#[derive(Clone, Serialize)]
Clone
実際に値が来た時に所有権を移さずに複製する処理の自動化
Serialize
シリアライズ(特定のフォーマットに変換)できることを静的に保証
serde とは
データをシリアライズ/デシリアライズするための標準的なライブラリです。
#[serde(rename_all = "camelCase", tag = "event", content = "data")]
rename_all = “camelCase”
フィールド名(ここだとsource
packet
)をシリアライズ時に キャメルケース に変換する指定
tag = “event”
バリアント名(ここだと Message
)を JSON にするときに "event"
というキーに入れる指定
content = “data”
データ(ここだと { source: String, packet: String }
)を "data"
というキーに入れる指定
フロントエンドでの定義
type OscEvent = {
event: "message";
data: {
source: string;
packet: string;
};
};
上で定義したフォーマットに合わせて、型定義を作成するだけです。
チャンネルとOSC受信のロジックを作成
// グローバル変数: OSC受信用のリスナー状態を保持するシングルトン関数
fn global_osc_listener() -> &'static SharedListener {
// OnceLock: 一度だけ初期化されるグローバルな箱
// Mutex: 読み書きともに同時に1スレッドだけアクセスできる排他ロック
static STATE: OnceLock<SharedListener> = OnceLock::new();
STATE.get_or_init(|| Mutex::new(None))
}
// グローバル変数: フロントエンドとの通信チャネルを保持するシングルトン関数
fn current_channel() -> &'static RwLock<Option<Channel<OscEvent>>> {
// OnceLock: 一度だけ初期化されるグローバルな箱
// RwLock: 複数スレッドからの同時読み取り可、書き込みは1スレッドだけ
static CH: OnceLock<RwLock<Option<Channel<OscEvent>>>> = OnceLock::new();
CH.get_or_init(|| RwLock::new(None))
}
// 指定アドレスにバインドしたUDPソケットを作成
fn make_socket() -> Result<UdpSocket, String> {
let socket = UdpSocket::bind(BIND_ADDR).map_err(|err| err.to_string())?;
Ok(socket)
}
// 起動時に常駐リスナーを起動
fn start_listener() -> Result<(), String> {
// グローバルに共有されているリスナー状態を取得 (最初は何も入っていない箱が渡される)
let mut slot = global_osc_listener().lock().unwrap();
// 指定アドレスにバインドしたUDPソケットを作成
let socket = make_socket()?;
// UDPパケットを無限ループで受信し、フロントにイベント送信するスレッドを生成
let osc_listener_thread = std::thread::spawn(move || {
// 固定サイズのバッファを用意し、UDPパケット受信に備える
let mut buf = [0u8; BUF_SIZE];
loop {
// socket.recv_from はUDPパケットが到着するまでブロックする
match socket.recv_from(&mut buf) {
Ok((size, src)) =>
// 受信した生データからOSCパケットをデコードする
match decoder::decode_udp(&buf[..size]) {
Ok((_, packet)) => {
// チャネルをロックし、接続されているか確認する
if let Some(ch) = current_channel().read().unwrap().as_ref() {
// OSCパケットを文字列表現に変換する
let packet_repr = format!("{:?}", packet);
// フロントエンドにイベントを送信
if ch
.send(OscEvent::Message {
source: src.to_string(),
packet: packet_repr,
})
.is_err()
{
// 送信失敗は無視(フロント未接続など)
}
}
}
Err(err) => {
eprintln!("OSC decode error: {}", err);
continue;
}
},
Err(err) => {
eprintln!("UDP receive error: {}", err);
continue;
}
}
}
});
// グローバル状態を起動済みのスレッドに差し替える(= OnceLock に値がセットされる)
*slot = Some(osc_listener_thread);
Ok(())
}
// フロントエンド側が受け取りたい「イベントの受信口」。
#[tauri::command]
fn listen_osc(_app: AppHandle, on_event: Channel<OscEvent>) -> Result<(), String> {
{
// グローバルに共有されているリスナー状態を取得 (最初は何も入っていない箱が渡される)
let mut ch = current_channel().write().unwrap();
// 複数回呼ばれる可能性があるため、セットされいたらスキップ (webviewのリロードなど)
let _ = ch.replace(on_event).is_some();
Ok(())
}
}
起動時に UDP の受信スレッドを常駐起動し、フロントが listen_osc を呼んだタイミングで Channel を登録します。以後は受信した OSC をイベント化して、その Channel にプッシュし続けます(遅延よりも即時性を優先)。
ファイル全体
use rosc::decoder;
use serde::Serialize;
use std::{
net::UdpSocket,
sync::{
Mutex, RwLock, OnceLock,
},
};
use std::thread::JoinHandle;
use tauri::{ipc::Channel, AppHandle, Builder};
type Listener = JoinHandle<()>;
type SharedListener = Mutex<Option<Listener>>;
const BIND_ADDR: &str = "0.0.0.0:9000";
const BUF_SIZE: usize = 2048;
#[derive(Clone, Serialize)]
#[serde(rename_all = "camelCase", tag = "event", content = "data")]
enum OscEvent {
Message { source: String, packet: String },
}
// グローバル変数: OSC受信用のリスナー状態を保持するシングルトン関数
fn global_osc_listener() -> &'static SharedListener {
// OnceLock: 一度だけ初期化されるグローバルな箱
// Mutex: 読み書きともに同時に1スレッドだけアクセスできる排他ロック
static STATE: OnceLock<SharedListener> = OnceLock::new();
STATE.get_or_init(|| Mutex::new(None))
}
// グローバル変数: フロントエンドとの通信チャネルを保持するシングルトン関数
fn current_channel() -> &'static RwLock<Option<Channel<OscEvent>>> {
// OnceLock: 一度だけ初期化されるグローバルな箱
// RwLock: 複数スレッドからの同時読み取り可、書き込みは1スレッドだけ
static CH: OnceLock<RwLock<Option<Channel<OscEvent>>>> = OnceLock::new();
CH.get_or_init(|| RwLock::new(None))
}
// 指定アドレスにバインドしたUDPソケットを作成
fn make_socket() -> Result<UdpSocket, String> {
let socket = UdpSocket::bind(BIND_ADDR).map_err(|err| err.to_string())?;
Ok(socket)
}
// 起動時に常駐リスナーを起動
fn start_listener() -> Result<(), String> {
// グローバルに共有されているリスナー状態を取得 (最初は何も入っていない箱が渡される)
let mut slot = global_osc_listener().lock().unwrap();
// 指定アドレスにバインドしたUDPソケットを作成
let socket = make_socket()?;
// UDPパケットを無限ループで受信し、フロントにイベント送信するスレッドを生成
let osc_listener_thread = std::thread::spawn(move || {
// 固定サイズのバッファを用意し、UDPパケット受信に備える
let mut buf = [0u8; BUF_SIZE];
loop {
// socket.recv_from はUDPパケットが到着するまでブロックする
match socket.recv_from(&mut buf) {
Ok((size, src)) =>
// 受信した生データからOSCパケットをデコードする
match decoder::decode_udp(&buf[..size]) {
Ok((_, packet)) => {
// チャネルをロックし、接続されているか確認する
if let Some(ch) = current_channel().read().unwrap().as_ref() {
// OSCパケットを文字列表現に変換する
let packet_repr = format!("{:?}", packet);
// フロントエンドにイベントを送信
if ch
.send(OscEvent::Message {
source: src.to_string(),
packet: packet_repr,
})
.is_err()
{
// 送信失敗は無視(フロント未接続など)
}
}
}
Err(err) => {
eprintln!("OSC decode error: {}", err);
continue;
}
},
Err(err) => {
eprintln!("UDP receive error: {}", err);
continue;
}
}
}
});
// グローバル状態を起動済みのスレッドに差し替える(= OnceLock に値がセットされる)
*slot = Some(osc_listener_thread);
Ok(())
}
// フロントエンド側が受け取りたい「イベントの受信口」。
#[tauri::command]
fn listen_osc(_app: AppHandle, on_event: Channel<OscEvent>) -> Result<(), String> {
{
// グローバルに共有されているリスナー状態を取得 (最初は何も入っていない箱が渡される)
let mut ch = current_channel().write().unwrap();
// 複数回呼ばれる可能性があるため、セットされいたらスキップ (webviewのリロードなど)
let _ = ch.replace(on_event).is_some();
Ok(())
}
}
pub fn run() {
Builder::default()
.setup(|_app| {
start_listener()?;
Ok(())
})
.invoke_handler(tauri::generate_handler![listen_osc])
.run(tauri::generate_context!())
.expect("error while running tauri application");
}
フロントエンドから呼び出してみる
import { useEffect, useRef, useState } from "react";
import { Channel, invoke } from "@tauri-apps/api/core";
type OscEvent = {
event: "Message";
data: {
source: string;
packet: string;
};
};
export function useOsc() {
const [connected, setConnected] = useState(false);
const [latest, setLatest] = useState<OscEvent | null>(null);
const channelRef = useRef<Channel<OscEvent> | null>(null);
useEffect(() => {
// Channel を作成し、受信時の処理を登録
const ch = new Channel<OscEvent>((evt) => {
setLatest(evt);
});
channelRef.current = ch;
// Rust 側のコマンドを呼んで Channel を登録
invoke("listen_osc", { onEvent: ch })
.then(() => setConnected(true))
.catch(() => setConnected(false));
// アンマウント時に参照を解放
return () => {
channelRef.current = null;
};
}, []);
return { connected, latest };
}
このフックで、起動時にRust側へチャネルを登録し、OSCメッセージをリアルタイムに受信します。受信内容はlatestで参照でき、接続状態はconnectedで確認できて、UIから簡単にOSCイベントを扱っています。
実際に動かしてみる
手前がPure Dataで作成したOSCを9000番に送信している処理で、奥のTauriのViewにリアルタイムに送信された値が表示・更新されています。
まとめ
どれくらいの遅延があるのか、予想もつかないまま実装を始めましたが、結果的にほぼ体感できないくらいの遅延で済んだのは衝撃です。
最低限の処理しかないので、ここから色々な機能を追加したいと思いました。
Discussion