😊

Rustの高度なジェネリクスと並行処理パターン

2025/03/02に公開

1. 基本的な並行処理パターン

Arc<T> - アトミック参照カウント

初めまして。カズマです。以下は自分用のメモ書きのようなものですが、投稿いたします。
Arc(Atomic Reference Counter)は、複数の所有者間でデータを安全に共有するためのスマートポインタです。

use std::sync::Arc;

// データを複数の所有者で共有
let data = Arc::new(vec![1, 2, 3]);
let data_clone = Arc::clone(&data); // 参照カウントを増やす
  • スレッド間で安全に共有できる
  • クローンしてもデータはコピーされず、参照カウントだけ増加
  • 最後のArcが破棄されるとデータも解放される
  • メモリレイアウト: スタック上にポインタ、ヒープ上に参照カウンタと実データ

Mutex<T> - 相互排他ロック

Mutexは、データへの排他的アクセスを提供するロック機構です。

use std::sync::Mutex;

let counter = Mutex::new(0);

// ロックを取得して値を変更
let mut value = counter.lock().unwrap();
*value += 1;
// ここでロックは自動的に解放される
  • 一度に一つのスレッドだけがデータにアクセス可能
  • .lock()でロックを取得し、返されたMutexGuardがスコープを抜けると自動的にロックを解放
  • デッドロックの可能性がある

2. 複合パターン

Arc<Mutex<T>> - 共有可変状態

最も一般的な共有可変状態の実装パターンです。

use std::sync::{Arc, Mutex};

// 複数スレッドで共有し、変更可能なデータ
let shared_data = Arc::new(Mutex::new(vec![1, 2, 3]));

// 別スレッドでの使用例
let data_clone = Arc::clone(&shared_data);
std::thread::spawn(move || {
    let mut data = data_clone.lock().unwrap();
    data.push(4);
});
  • Arcによって複数のスレッドからアクセス可能
  • Mutexによって変更時の排他制御を実現
  • アプリケーションの共有状態管理の基本パターン

Arc<RwLock<T>> - 読み書きロックによる最適化

読み取りが多く、書き込みが少ない場合に最適化されたパターン。

use std::sync::{Arc, RwLock};

let database = Arc::new(RwLock::new(HashMap::new()));

// 読み取り(複数スレッドが同時に可能)
let data = database.read().unwrap();
let value = data.get("key");

// 書き込み(排他的アクセス)
let mut data = database.write().unwrap();
data.insert("key", "value");
  • 複数のスレッドが同時に読み取り可能
  • 書き込み時のみ排他的ロックを取得
  • 読み取り中心のワークロードでMutexより高いパフォーマンス

3. 高度なパターニング

シャーディングによる細粒度ロック

データを複数の部分に分割し、それぞれに独立したロックを適用します。

struct ShardedMap<K, V> {
    shards: Vec<Mutex<HashMap<K, V>>>,
    shard_count: usize,
}

impl<K, V> ShardedMap<K, V> 
where 
    K: Hash + Eq,
{
    fn new(shard_count: usize) -> Self {
        let mut shards = Vec::with_capacity(shard_count);
        for _ in 0..shard_count {
            shards.push(Mutex::new(HashMap::new()));
        }
        Self { shards, shard_count }
    }
    
    fn get_shard(&self, key: &K) -> &Mutex<HashMap<K, V>> {
        let shard_index = self.hash(key) % self.shard_count;
        &self.shards[shard_index]
    }
    
    fn insert(&self, key: K, value: V) {
        let shard = self.get_shard(&key);
        let mut map = shard.lock().unwrap();
        map.insert(key, value);
    }
}
  • 複数のロックを使うことで競合を減らし、スループットを向上
  • 異なるキーへの操作が異なるロックを使用するため並列性が向上
  • ただし、シャード間のロジックが必要な操作が複雑になる

Actor Model(アクターモデル)

アクターが独自の状態を管理し、メッセージを通じて通信するパターン。

use tokio::sync::{mpsc, oneshot};

struct UserActor {
    state: Vec<User>,
    receiver: mpsc::Receiver<Command>,
}

enum Command {
    Create {
        user: User,
        respond_to: oneshot::Sender<Result<User, Error>>,
    },
    Get {
        id: String,
        respond_to: oneshot::Sender<Option<User>>,
    },
}

impl UserActor {
    async fn run(&mut self) {
        while let Some(command) = self.receiver.recv().await {
            match command {
                Command::Create { user, respond_to } => {
                    let result = self.create_user(user);
                    let _ = respond_to.send(result);
                },
                Command::Get { id, respond_to } => {
                    let user = self.state.iter()
                        .find(|u| u.id == id)
                        .cloned();
                    let _ = respond_to.send(user);
                }
            }
        }
    }
}

struct UserService {
    sender: mpsc::Sender<Command>,
}

impl UserService {
    async fn create_user(&self, user: User) -> Result<User, Error> {
        let (tx, rx) = oneshot::channel();
        self.sender.send(Command::Create {
            user,
            respond_to: tx,
        }).await?;
        rx.await?
    }
}
  • 明示的なロックの代わりにメッセージパッシングを使用
  • 各アクターは自身の状態のみを変更し、データ競合を排除
  • 複雑な共有状態の管理に適している

4. Lock-Free(ロックフリー)データ構造

アトミック操作を使ってロックを使わずに並行処理を実現します。

use std::sync::atomic::{AtomicUsize, Ordering};

struct Counter {
    count: AtomicUsize,
}

impl Counter {
    fn new() -> Self {
        Self { count: AtomicUsize::new(0) }
    }
    
    fn increment(&self) -> usize {
        self.count.fetch_add(1, Ordering::SeqCst)
    }
    
    fn get(&self) -> usize {
        self.count.load(Ordering::SeqCst)
    }
}

高度なロックフリーデータ構造の例(crossbeamライブラリを使用):

use crossbeam_skiplist::SkipMap;

let map = SkipMap::new();
map.insert("key", "value");
if let Some(entry) = map.get("key") {
    assert_eq!(*entry.value(), "value");
}
  • ロックを使わないので、スレッドの中断やデッドロックの問題がない
  • 非常に高いスケーラビリティを実現可能
  • 実装が複雑で、正確性の証明が難しい

5. 静的ディスパッチと動的ディスパッチ

Rustでは、一般的に二つの方法でポリモーフィズムを実現します:

静的ディスパッチ(ジェネリクス)

コンパイル時に具体的な実装が決定される方法です。

// ジェネリクスを使った静的ディスパッチ
fn process<T: Display>(item: T) {
    println!("{}", item);
}

// コンパイル時に別々のコードが生成される
process("hello"); // T=&str 用のコード
process(42);      // T=i32 用のコード

メリット:

  • オーバーヘッドがなく高速
  • コンパイラによる最適化の余地が大きい
  • 型安全性が高い

デメリット:

  • コード量が増える(コード爆発)
  • コンパイル時間が長くなる可能性がある
  • 型パラメータが増えると読みづらくなる

動的ディスパッチ(トレイトオブジェクト)

実行時に具体的な実装が決定される方法です。

// トレイトオブジェクトを使った動的ディスパッチ
fn process(item: &dyn Display) {
    println!("{}", item);
}

// 実行時にVテーブルを通じて適切なメソッドが呼ばれる
process(&"hello");
process(&42);

メリット:

  • コード量が少なくなる
  • 異なる型を同じコレクションで扱える
  • 実行時の柔軟性が高い

デメリット:

  • Vテーブル参照によるわずかなパフォーマンスオーバーヘッド
  • 実装できるのはObject Safeなトレイトのみ
  • サイズが静的に決まらないためBox等で包む必要がある

実用的な例:リポジトリパターン

静的ディスパッチを使用した例

trait UserRepository {
    fn find_by_id(&self, id: &str) -> Option<User>;
    fn save(&self, user: User) -> Result<(), Error>;
}

struct PostgresUserRepository {
    connection: PgConnection,
}

impl UserRepository for PostgresUserRepository {
    // 実装...
}

// ジェネリクスを使用したサービス
struct UserService<R: UserRepository> {
    repository: R,
}

impl<R: UserRepository> UserService<R> {
    fn get_user(&self, id: &str) -> Option<User> {
        self.repository.find_by_id(id)
    }
}

// 使用例
let repo = PostgresUserRepository::new(connection);
let service = UserService { repository: repo };

動的ディスパッチを使用した例

trait UserRepository {
    fn find_by_id(&self, id: &str) -> Option<User>;
    fn save(&self, user: User) -> Result<(), Error>;
}

struct PostgresUserRepository {
    connection: PgConnection,
}

impl UserRepository for PostgresUserRepository {
    // 実装...
}

// トレイトオブジェクトを使用したサービス
struct UserService {
    repository: Box<dyn UserRepository + Send + Sync>,
}

impl UserService {
    fn new(repository: Box<dyn UserRepository + Send + Sync>) -> Self {
        Self { repository }
    }
    
    fn get_user(&self, id: &str) -> Option<User> {
        self.repository.find_by_id(id)
    }
}

// 使用例
let repo = Box::new(PostgresUserRepository::new(connection));
let service = UserService::new(repo);

6. ライフタイムと参照

Rustの所有権システムの重要な部分であるライフタイムは、参照が有効である期間を表します。

基本的なライフタイム注釈

// 明示的なライフタイム注釈
fn longest<'a>(s1: &'a str, s2: &'a str) -> &'a str {
    if s1.len() > s2.len() { s1 } else { s2 }
}

このコードでは、'aというライフタイムパラメータが導入され、関数の引数と戻り値の関係を表現しています。

静的ライフタイム

'staticはプログラムの全実行期間を表す特別なライフタイムです。

// 文字列リテラルは'staticライフタイムを持つ
let s: &'static str = "Hello, world!";

トレイト境界としてのライフタイム

ライフタイムはトレイト境界の一部として使用することもできます。

// 型Tがライフタイム'a以上の期間有効であることを表す
fn process<'a, T: Display + 'a>(item: &'a T) {
    println!("{}", item);
}

7. 遅延初期化パターン

LazyCell と LazyLock

Rustでは、LazyCellLazyLockを使用して遅延初期化を実現できます。

use std::cell::LazyCell;
use std::lazy::LazyLock;

// 非スレッドセーフな遅延初期化
let lazy_value: LazyCell<String> = LazyCell::new(|| {
    println!("Initializing...");
    "Hello, world!".to_string()
});

// アクセス時に初期化される
println!("{}", lazy_value.get()); // "Initializing..." と "Hello, world!" が出力される

// スレッドセーフな静的変数の遅延初期化
static CONFIG: LazyLock<HashMap<String, String>> = LazyLock::new(|| {
    let mut map = HashMap::new();
    map.insert("host".to_string(), "localhost".to_string());
    map.insert("port".to_string(), "8080".to_string());
    map
});

依存性注入との組み合わせ

LazyLock と動的ディスパッチを組み合わせた依存性注入パターン:

use std::lazy::LazyLock;
use std::sync::Arc;

trait UserRepository: Send + Sync {
    fn find_by_id(&self, id: &str) -> Option<User>;
}

// アプリケーション全体で共有されるリポジトリインスタンス
static USER_REPO: LazyLock<Arc<dyn UserRepository + Send + Sync>> = LazyLock::new(|| {
    if cfg!(test) {
        // テスト時はモックを使用
        Arc::new(MockUserRepository::new())
    } else {
        // 本番環境では実際の実装を使用
        Arc::new(PostgresUserRepository::new())
    }
});

// どこからでもアクセス可能
fn get_user(id: &str) -> Option<User> {
    USER_REPO.find_by_id(id)
}

8. gRPCとストリーミング

gRPCサービスにおけるストリーミング実装は、Rustの型システムと並行処理パターンの良い例です。

type StreamDataStream = Pin<Box<dyn Stream<Item = Result<DataResponse, Status>> + Send>>;

この型定義の分解:

  1. Stream<Item = Result<DataResponse, Status>> - 非同期にデータを生成するイテレータのような型
  2. dyn ... + Send - トレイトオブジェクト(動的ディスパッチ)で、スレッド間で安全に送信可能
  3. Box<...> - サイズが動的なトレイトオブジェクトをヒープに格納
  4. Pin<...> - メモリ内の特定の場所から移動しないことを保証(非同期処理で重要)

実装例:

use tonic::{Request, Response, Status};
use futures::Stream;
use std::pin::Pin;
use tokio::sync::mpsc;

#[tonic::async_trait]
impl StreamingService for MyStreamingService {
    type StreamDataStream = Pin<Box<dyn Stream<Item = Result<DataResponse, Status>> + Send>>;
    
    async fn stream_data(
        &self,
        request: Request<DataRequest>,
    ) -> Result<Response<Self::StreamDataStream>, Status> {
        let (tx, rx) = mpsc::channel(4);
        
        tokio::spawn(async move {
            for i in 1..=10 {
                tx.send(Ok(DataResponse { 
                    message: format!("Message {}", i) 
                })).await.unwrap();
            }
        });
        
        Ok(Response::new(Box::pin(
            tokio_stream::wrappers::ReceiverStream::new(rx)
        )))
    }
}

この例では、クライアントにストリーミングでデータを送信するgRPCサービスを実装しています。

9. デリバティブ(自動導出トレイト)

Rustには#[derive(...)]属性を使って、多くの共通トレイトを自動的に実装する機能があります。

#[derive(Clone, Debug, PartialEq, Eq)]
struct User {
    id: String,
    name: String,
    email: String,
}

この例では:

  • Clone: user.clone()でオブジェクトのコピーを作成可能
  • Debug: println!("{:?}", user)でデバッグ表示可能
  • PartialEq, Eq: ==演算子でオブジェクトを比較可能

TypeScriptのimplements interfaceに似ていますが、自動的にコードが生成される点が異なります。

10. Rustとアーキテクチャパターン

クリーンアーキテクチャの実装

静的ディスパッチと動的ディスパッチを適切に組み合わせることで、保守性とパフォーマンスのバランスを取ったアーキテクチャを実現できます。

// ドメイン層 - エンティティとリポジトリインターフェース
mod domain {
    pub struct User {
        pub id: String,
        pub name: String,
    }

    pub trait UserRepository {
        fn find_by_id(&self, id: &str) -> Option<User>;
    }
}

// アプリケーション層 - ユースケース
mod application {
    use crate::domain::{User, UserRepository};

    pub struct GetUserUseCase<R: UserRepository> {
        repository: R,
    }

    impl<R: UserRepository> GetUserUseCase<R> {
        pub fn new(repository: R) -> Self {
            Self { repository }
        }

        pub fn execute(&self, id: &str) -> Option<User> {
            self.repository.find_by_id(id)
        }
    }
}

// インフラ層 - リポジトリ実装
mod infrastructure {
    use crate::domain::{User, UserRepository};
    
    pub struct PostgresUserRepository {
        // データベース接続情報
    }
    
    impl UserRepository for PostgresUserRepository {
        fn find_by_id(&self, id: &str) -> Option<User> {
            // 実装
        }
    }
}

// プレゼンテーション層 - コントローラ 
mod presentation {
    use crate::application::GetUserUseCase;
    use crate::domain::UserRepository;
    
    pub struct UserController<R: UserRepository> {
        get_user_usecase: GetUserUseCase<R>,
    }
    
    impl<R: UserRepository> UserController<R> {
        pub fn new(repository: R) -> Self {
            Self {
                get_user_usecase: GetUserUseCase::new(repository),
            }
        }
        
        pub fn get_user(&self, id: &str) -> String {
            match self.get_user_usecase.execute(id) {
                Some(user) => format!("User: {}", user.name),
                None => "User not found".to_string(),
            }
        }
    }
}

// 依存性の注入
fn main() {
    use crate::infrastructure::PostgresUserRepository;
    use crate::presentation::UserController;
    
    let repository = PostgresUserRepository::new();
    let controller = UserController::new(repository);
    
    println!("{}", controller.get_user("123"));
}

依存性注入パターンの代替案

Rustでは、依存性注入を実装する複数の方法があります:

  1. コンストラクタインジェクション(上記の例)
  2. トレイトオブジェクトを用いた動的ディスパッチ
pub struct UserService {
    repository: Arc<dyn UserRepository + Send + Sync>,
}

impl UserService {
    pub fn new(repository: Arc<dyn UserRepository + Send + Sync>) -> Self {
        Self { repository }
    }
}
  1. ファクトリパターン
trait RepositoryFactory {
    fn create_user_repository(&self) -> Box<dyn UserRepository + Send + Sync>;
}

struct UserService {
    repository: Box<dyn UserRepository + Send + Sync>,
}

impl UserService {
    fn new(factory: &dyn RepositoryFactory) -> Self {
        Self {
            repository: factory.create_user_repository(),
        }
    }
}

まとめ

Rustの型システムとメモリ安全性は、複雑なアプリケーションを構築する強力な基盤を提供します。静的ディスパッチと動的ディスパッチを適切に組み合わせることで、パフォーマンスと柔軟性のバランスを取りながら、保守性の高いコードを書くことができます。

並行処理パターンについては、Arc<Mutex<T>>Arc<RwLock<T>>といった基本的なパターンから、シャーディングやアクターモデル、ロックフリーデータ構造などの高度なテクニックまで、様々なアプローチがあります。これらを理解し適切に選択することで、安全で効率的な並行アプリケーションを構築できます。

Rustの型システムを活用したクリーンアーキテクチャの実装や、依存性注入パターンの適用により、テスト容易性と保守性を兼ね備えたシステム設計が可能になります。

Discussion