🦀

RustのtokioでオレオレTCP relay serverを作ってみる

2021/12/14に公開

この記事はRust Advent Calendar 2021 (カレンダー2)の14日目の記事です。

作ったもの

https://github.com/higumachan/tcp-tunnel-rs

ngrokというサービスに触発されて、自分でも似たようなもの作ってみようという動機で作ってみました。

出来ることは、TCPのrelayするためのオレオレプロトコルのrelay serverとclient(agent)になります。

思い立つ

野暮用により、RaspberryPi(以下RPi)を短い期間に遠隔で動すということが必要になったことがありました。
その際にメンテナンス用にsshを行いたいと画策して、最初はOpenVPNとかSoftEatherみたいなVPN案件かなと思っていたのですが、ngrokを見つけて使ってみたところすごく簡単に使えたので感動しました。

そこで、自分でも作って見ようかと考えてみました。

ngrokでTCP tunnelを利用する方法

  1. ngrokに登録する

  2. ngrokのダウンロード

https://ngrok.com/download

  1. authtokenを設定する
ngrok authtoken <YOUR_AUTHTOKEN>
  1. 特定のコマンドを打つ
ngrok tcp 22

https://ngrok.com/docs#tcp-examples

たったこれだけで、たまに使うssh環境が出来ます。

⚠ 基本的に全世界に公開することになるので長く運用する場合はRaspberryPi側のssh serverは適切な設定で運用してください。

方法を考える

あとから調べてみたら、やること自体はTURNだったみたいです。

今回は楽しさ優先で自分で考えてみました。

目標

同じLANに無いraspberry piにPCからsshで接続することを目標とします。

なぜ普通にSSHで繋げないのか

なぜ出来ないのかというと、NAT超えが出来ないからです。
かなりアバウトに言うと、一般的なLAN内に配置されてる機器にはグローバルIPが割り当てられていないのでLANの外からIPを利用してパケットを送ることが出来ないということです。

この状況を図に書くとこんな感じになります。

中継サーバを置くことで解決を試みる

このように(GlobalIPを持った)中継サーバを置くことで解決することが出来ます。

NAT越しでも中継サーバはGlobal IPを持っているので、
PC => 中継サーバ
RPi => 中継サーバ
という方向で通信を開始することが出来るのでその通信を保っておけば良い感じです。

登場人物

agent

RPi側で動くRustで記述されたプログラム
serverの制御ポートとやりとりしながらRPiのssh server(本当はTCPであれば何でも良い)とやり取りする

server

中継サーバで動くRustで記述されたプログラム
agentの制御ポート、agentの通信ポート、clientとやり取りする。

client

TCPのclientになるものであれば何でも良い。
今回はssh clientで行う。

シーケンス図

良い感じに複数のTCP portを作りながらクライアントとagentをつなぐシーケンスを考えていきます。

新しいエージェントが追加されたときのシーケンス図

新しいクライアントが登録されたときのシーケンス図

実装を考える

ここからやっと、Rustの話が出てきます。

Rustを選択した理由

  1. 勉強
  2. 非同期処理が比較的に得意そうだったから

非同期ランタイムとしては安定のtokioを利用しました。

プロトコルの実装

シーケンス図で定義した独自のプロトコルを定義します。

各メッセージをenumとして定義して、serde +bincodeで(デ)シリアライズしてTokioのAsyncRead/Writeに対して読み書きをするコードを書きます。

#[derive(Debug, PartialEq, Serialize, Deserialize)]
pub enum ProtocolSeverToAgent {
    NewClientRequest { address: String },
    NewAgentResponse { address: String },
}

#[derive(Debug, PartialEq, Serialize, Deserialize)]
pub enum ProtocolAgentToSever {
    NewClientResponse,
}

pub async fn read_protocol<R: AsyncRead + std::marker::Unpin, P>(
    mut reader: impl DerefMut<Target = R>,
) -> Result<P, anyhow::Error>
where
    P: for<'de> Deserialize<'de>,
{
    let mut buf_protocol = [0; 128];
    let n = reader.deref_mut().read(&mut buf_protocol).await?;

    Ok(bincode::deserialize(&buf_protocol[0..n])?)
}

pub async fn write_protocol<W: AsyncWrite + AsyncWriteExt + std::marker::Unpin, P: Serialize>(
    mut writer: impl DerefMut<Target = W>,
    protocol: &P,
) -> Result<(), anyhow::Error> {
    Ok(writer
        .deref_mut()
        .write_all(&bincode::serialize(protocol)?)
        .await?)
}

Rustのenumは強力なのでオレオレプロトコルを作るのは簡単ですね。(どのようにバイト列に落ちるのかを整合性取る必要が無いので)

agentを実装する

agent.rs
use std::net::SocketAddr;
use std::sync::Arc;
use structopt::StructOpt;
use tcp_tunnel_rs::{read_protocol, write_protocol, ProtocolAgentToSever, ProtocolSeverToAgent};
use tokio;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::RwLock;
use tokio::time::{sleep, Duration};

const BUFFER_SIZE: usize = 32 * 1024;

#[derive(Debug, StructOpt)]
struct Opt {
    #[structopt(short = "c", long = "control_address", parse(try_from_str))]
    control_address: SocketAddr,
    #[structopt(short = "p", long = "target_port")]
    target_port: u16,
}

async fn agent_main(mut control_server_stream: TcpStream, target_port: u16) -> anyhow::Result<()> {
    loop {
        let protocol = { read_protocol(&mut control_server_stream).await.unwrap() }; 

        match protocol {
            ProtocolSeverToAgent::NewClientRequest { address } => {
                let mut tunnel_server_stream = TcpStream::connect(address.clone()).await.unwrap();  # 通知された通信用ポートに接続する
                println!("connect {}", &address);
                let mut target_server_stream =
                    TcpStream::connect(format!("127.0.0.1:{}", target_port))
                        .await
                        .unwrap();  # clientと接続するポートを開く
                println!("connect target");
                write_protocol(
                    &mut control_server_stream,
                    &ProtocolAgentToSever::NewClientResponse,
                )
                .await
                .unwrap();
                println!("send request");
                tokio::spawn(async move {
                    let mut buf_tunnel = [0; BUFFER_SIZE];
                    let mut buf_target = [0; BUFFER_SIZE];

                    # serverから来たデータをclientに流して、clientに来たデータをserverに流す。
                    loop {
                        tokio::select! {
                            n = tunnel_server_stream.read(&mut buf_tunnel) => {
                                let n = match n {
                                    Ok(n) if n == 0 => {
                                        println!("close tunnel server");
                                        return;
                                    },
                                    Ok(n) => n,
                                    Err(e) => {
                                        eprintln!("failed to read from socket; err = {:?}", e);
                                        return;
                                    }
                                };
                                println!("tunnel -> target {}", n);
                                target_server_stream.write_all(&buf_tunnel[0..n]).await.unwrap();
                            }
                            n = target_server_stream.read(&mut buf_target) => {
                                let n = match n {
                                    Ok(n) if n == 0 => {
                                        println!("close target server");
                                        return;
                                    },
                                    Ok(n) => n,
                                    Err(e) => {
                                        eprintln!("failed to read from socket; err = {:?}", e);
                                        return;
                                    }
                                };
                                println!("target -> tunnel {}", n);
                                tunnel_server_stream.write_all(&buf_target[0..n]).await.unwrap();
                            }
                        }
                    }
                });
            }
            ProtocolSeverToAgent::NewAgentResponse { address } => {
                println!("tunnel server address {}", address);  # サーバのaddressとポートを画面に表示する
            }
            _ => {
                unreachable!()
            }
        };
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let opt = Opt::from_args();

    loop {
        println!("try connect control sever");
        if let Ok(mut control_server_stream) = TcpStream::connect(opt.control_address).await # serverの制御ポートに接続する。
	{
            agent_main(control_server_stream, opt.target_port).await?;
        } else {
            println!("fail connect control server.");
            sleep(Duration::from_secs(1)).await;
        }
    }
}

serverを実装する

server.rs
use std::net::{IpAddr, SocketAddr};
use std::sync::Arc;
use structopt::StructOpt;
use tcp_tunnel_rs::{read_protocol, write_protocol, PortAssigner, PortRange};
use tcp_tunnel_rs::{ProtocolAgentToSever, ProtocolSeverToAgent};
use tokio;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
use tokio::sync::RwLock;

const BUFFER_SIZE: usize = 32 * 1024;

#[derive(Debug, StructOpt)]
struct Opt {
    #[structopt(short = "c", long = "control_address", parse(try_from_str))]
    control_address: SocketAddr,
    #[structopt(
        short = "C",
        long = "client_port_range",
        parse(try_from_str),
        default_value = "10000:19999"
    )]
    client_port_range: PortRange,
    #[structopt(
        short = "a",
        long = "agent_port_range",
        parse(try_from_str),
        default_value = "20000:29999"
    )]
    tunnel_port_range: PortRange,
    #[structopt(short = "g", long = "myglobal_ip_address", parse(try_from_str))]
    myglobal_ip_address: IpAddr,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let opt = Opt::from_args();

    let listener_for_agent_control = TcpListener::bind(opt.control_address.clone()).await?;  # 制御ポートを開く
    let tunnel_port_assigner = Arc::new(RwLock::new(PortAssigner::new(opt.tunnel_port_range)));
    let client_port_assigner = Arc::new(RwLock::new(PortAssigner::new(opt.client_port_range)));

    println!("start server");
    loop {
        let tunnel_port_assigner = tunnel_port_assigner.clone();
        let client_port_assigner = client_port_assigner.clone();
        let mut socket_agent_control = listener_for_agent_control.accept().await?.0; # 新しいagentが制御ポートに接続されるのを待つ
        println!("connect agent");
        let new_client_port = client_port_assigner.write().await.next(); # 新しいポート番号を発行する
        let listener_for_client = TcpListener::bind(format!("0.0.0.0:{}", new_client_port)).await?;
        let myglobal_ip_address = opt.myglobal_ip_address.clone();

        write_protocol(
            &mut socket_agent_control,
            &ProtocolSeverToAgent::NewAgentResponse {
                address: format!("{}:{}", &myglobal_ip_address, new_client_port),
            },
        )
        .await
        .unwrap(); # agentに登録完了のレスポンスを送る

        tokio::spawn(async move {
            loop {
                let (mut socket_client, _) = listener_for_client.accept().await.unwrap();  # 新しいclientが接続されるのを待つ
                println!("connect client");

                let new_port = tunnel_port_assigner.write().await.next(); #tunnel用のポートをアサインする
                let new_address = format!("{}:{}", myglobal_ip_address.clone(), new_port);
                let listener_for_agent = TcpListener::bind(format!("0.0.0.0:{}", new_port))
                    .await
                    .unwrap();
                println!("bind {}", new_address);
                write_protocol(
                    &mut socket_agent_control,
                    &ProtocolSeverToAgent::NewClientRequest {
                        address: new_address,
                    },
                )
                .await
                .unwrap();  # agentに新しいクライアントが接続されたことを通知する

                println!("send protocol");
                let mut socket_agent = listener_for_agent.accept().await.unwrap().0;
                println!("accept");

                let protocol: ProtocolAgentToSever =
                    read_protocol(&mut socket_agent_control).await.unwrap();  # agentが新しい準備が出来たことを確認する

                assert_eq!(protocol, ProtocolAgentToSever::NewClientResponse);
                println!("received new client response");

                tokio::spawn(async move {
                    let mut buf_client = [0; BUFFER_SIZE];
                    let mut buf_agent = [0; BUFFER_SIZE];

                    # 空のパケットが来るまで、clientから来たデータをagentに流して、agentから来たデータをclientに流す
                    loop {
                        tokio::select! {
                            n = socket_client.read(&mut buf_client) => {
                                let n = match n {
                                    Ok(n) if n == 0 => return,
                                    Ok(n) => n,
                                    Err(e) => {
                                        eprintln!("failed to read from socket; err = {:?}", e);
                                        return;
                                    }
                                };
                                socket_agent.write_all(&buf_client[0..n]).await.unwrap();
                            }
                            n = socket_agent.read(&mut buf_agent) => {
                                let n = match n {
                                    Ok(n) if n == 0 => return,
                                    Ok(n) => n,
                                    Err(e) => {
                                        eprintln!("failed to read from socket; err = {:?}", e);
                                        return;
                                    }
                                };
                                socket_client.write_all(&buf_agent[0..n]).await.unwrap();
                            }
                        };
                    }
                });
            }
        });
    }
}

という感じで実装が出来ます。

Portのアサインについて

Portのアサインに関しては

pub struct PortAssigner {
   port_range: PortRange,
   current: PortRange,
}

impl PortAssigner {
   pub fn new(port_range: PortRange) -> Self {
       Self {
           current: port_range.clone(),
           port_range: port_range.clone(),
       }
   }
   pub fn next(&mut self) -> u16 {
       if let Some(n) = self.current.range.next() {
           n
       } else {
           self.current = self.port_range.clone();
           self.current.range.next().unwrap()
       }
   }
}

このような感じで与えられたレンジに対してインクリメントしていくようなものを作っています。(今回はエラー処理を書いていないのでポートがぶつかると死にます)

PortAssignerはserver.rsの中で利用するのですが、tokioのタスクをまたいで利用してかつmutableであるため。
Arctokio::RwLockでwrapしてあげることでSend + Syncと内部可変性を与えることが出来ます。

    let tunnel_port_assigner = Arc::new(RwLock::new(PortAssigner::new(opt.tunnel_port_range)));
    let client_port_assigner = Arc::new(RwLock::new(PortAssigner::new(opt.client_port_range)));

複数のソケットからの受信を待つ

今回はserver.rs, agent.rsともに複数のソケットからの受信待ちを行う必要があります。

その場合はtokioのselect!を利用します。

    # 空のパケットが来るまで、clientから来たデータをagentに流して、agentから来たデータをclientに流す
    loop {
	tokio::select! {
	    n = socket_client.read(&mut buf_client) => {
		let n = match n {
		    Ok(n) if n == 0 => return,
		    Ok(n) => n,
		    Err(e) => {
			eprintln!("failed to read from socket; err = {:?}", e);
			return;
		    }
		};
		socket_agent.write_all(&buf_client[0..n]).await.unwrap();
	    }
	    n = socket_agent.read(&mut buf_agent) => {
		let n = match n {
		    Ok(n) if n == 0 => return,
		    Ok(n) => n,
		    Err(e) => {
			eprintln!("failed to read from socket; err = {:?}", e);
			return;
		    }
		};
		socket_client.write_all(&buf_agent[0..n]).await.unwrap();
	    }
	};
    }

このように

select! {
    x = 待ちたいFuture1 {
        # 待ちたいFuture1が返って来たときの処理
    }
    x = 待ちたいFuture2 {
        # 待ちたいFuture2が返って来たときの処理
    }
}

というふうにFutureを並べていくと同時に待ち、どちらかが返ってきたタイミングで各々の処理を行うことが出来ます。

実行

本当はRPiでssh serverを動かしてtunnelを試したかったところなのですが、この記事を書いている時点で僕のRPiがお亡くなりになっていたため、ローカルのecho-serverでやっていこうかと思います。

普通にecho serverを実行する

asciicast

relayサーバを経由してecho serverに接続する

asciicast

という感じで、中継サーバ越しにecho serverに接続することが出来ました。(ちょっと迫力に書けますね😅)

感想

今回は思いつきでオレオレTCP relayプロトコルを設計して実装してみました。

Rustで書いた感想としては、昔にC言語で通信周りを書いたときに比べると格段に実行エラーと通信プログラムあるあるの全然パケット来ないじゃんとか、非同期処理の周りがうまく行かないという問題にほとんどエンカウントしなかった気がします。

エラー処理などが入っていないですが、プログラムの行数的にも以下のようになっており、Pythonとかで書いてもこれくらいの行数になりそうだなっていう感じの規模感でコンパクトに実装できました。(tokioのおかげがデカイですが)

===================================================================================================================================================================================================================
 Language                                                                                                                                                Files        Lines         Code     Comments       Blanks
===================================================================================================================================================================================================================
 Rust                                                                                                                                                        4          358          319            0           39
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 ./src/bin/echo_server.rs                                                                                                                                                39           32            0            7
 ./src/bin/server.rs                                                                                                                                                    126          113            0           13
 ./src/lib.rs                                                                                                                                                            89           77            0           12
 ./src/bin/agent.rs                                                                                                                                                     104           97            0            7
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 TOML                                                                                                                                                        1           13           10            1            2
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 ./Cargo.toml                                                                                                                                                            13           10            1            2
===================================================================================================================================================================================================================
 Total                                                                                                                                                       5          371          329            1           41
===================================================================================================================================================================================================================

Rustを使うとこのくらいの通信を行うプログラムだったらサクッとかつバグが起きづらい形で実装できるのでみなさんもぜひ試してみてください!

Discussion