🦔

Arcの仕組みを理解する - Rustの並行性とメモリ安全性の両立

に公開

はじめに

こんにちは、moriです。
FAST株式会社にてRustを用いてデスクトップアプリケーション、バックエンドの開発を行っています。

非同期やマルチスレッドプログラミングでは、ArcMutexが頻繁に使用されますが、
その内部動作を理解せずに使っている方も多いのではないでしょうか。
私自身、Arcの仕組みをより深く理解したいと思い、この記事を書くことにしました。
この記事では、Rustの所有権システムを土台に、Arcがどのように動作し、
なぜ必要なのかの基本的な概念と実装の要点を解説します。

TL;DR

  • Arcは複数のスレッド間で安全にデータを共有するためのスマートポインタ
  • 参照カウンター方式により、最後の所有者がドロップされた時に自動的にメモリを解放
  • Arc::cloneは軽量で、データそのものではなくポインターのコピーと参照カウンターを増やすだけ
  • Arc<Mutex<T>>パターンで共有可変データへの安全なアクセスを実現

想定読者

  • Rustの基本的な所有権と借用の概念を理解している方
  • 並列処理に興味がある方
  • Arcの仕組みに興味がある方

Arcとは何か

Arc(Atomic Reference Counting)は、Rustにおいて複数のスレッド間でデータを安全に共有するためのスマートポインタです。
通常、Rustの所有権システムでは、ある値は一つの所有者しか持てませんが、Arcを使用することで、複数の所有者がデータを共有できるようになります。

Arcの主な特徴:

  • 参照カウンタ方式:内部で参照カウンタを管理し、複数の所有者がいる間はデータを保持
  • スレッドセーフ:アトミック操作により、複数スレッドから安全にアクセス可能
  • 自動メモリ管理:最後の所有者がドロップされると自動的にメモリを解放
  • 不変共有Arc<T>自体は不変参照のみ提供(可変アクセスにはArc<Mutex<T>>等を使用)

ArcRc(Reference Counting)のスレッドセーフ版と考えることができます。
Rcは単一スレッド環境での参照カウント型スマートポインタですが、Arcはアトミック操作を使用することで、マルチスレッド環境でも安全に動作します。

なぜArcが必要か?

Arcの内部構造を見る前に、まずどのような場面でArcが活躍するのか、具体的なシナリオを通して見ていきましょう。
複数のスレッドで、クローンするにはコストが高いデータを共有したい、という状況を考えます。

Arcを使わずに、単純な参照を各スレッドに渡そうとするとどうなるでしょうか。

// コンパイルエラーになるコード
use std::thread;

fn main() {
    let data = String::from("Some large data");
    let data_ref = &data;

    thread::spawn(move || {
        // `data_ref`は`main`関数より長く生存できないため、
        // スレッドに所有権を渡すことができない。
        println!("{}", data_ref);
    }).join().unwrap();
}

以下のようなコンパイルエラーが発生します。

error[E0597]: `data` does not live long enough
  --> src/main.rs:5:20
   |
 4 |       let data = String::from("Some large data");
   |           ---- binding `data` declared here
 5 |       let data_ref = &data;
   |                      ^^^^^ borrowed value does not live long enough
...
 8 | /     thread::spawn(move || {
 9 | |         // `data_ref`は`main`関数より長く生存できないため、
10 | |         // スレッドに所有権を渡すことができない。
11 | |         println!("{}", data_ref);
12 | |     }).join().unwrap();
   | |______- argument requires that `data` is borrowed for `'static`
13 |   }
   |   - `data` dropped here while still borrowed

なぜなら、thread::spawnで生成された新しいスレッドは、現在のmain関数よりも長く生存する可能性があるからです。
しかし、data_refmain関数内のdataを指す参照なので、main関数が終了すれば無効になってしまいます。

Rustのコンパイラは、このような「ダングリングポインタ」の危険性をコンパイル時に検知し、エラーとします。
エラーメッセージは、クロージャに渡す変数が'staticライフタイムを持たないことを示しています。
これは、データがスレッドのライフタイム全体にわたって有効であることを保証できないためです。

この'staticの制約を満たしつつ、複数のスレッドで安全にデータを共有するための仕組みがArcです。

Arc構造体の定義

ここからはArcの定義を詳しく見ていきましょう。
Arcの定義は以下のようになっています(属性を省略しています)

pub struct Arc<T: ?Sized, A: Allocator = Global> {
    ptr: NonNull<ArcInner<T>>,
    phantom: PhantomData<ArcInner<T>>,
    alloc: A,
}
  • T: ?Sized: Arcが指すデータの型です。?Sizedは、Tがサイズ不定型(DST: Dynamically Sized Type)である可能性を示します。
    つまり、Arcはスライスやトレイトオブジェクトなど、サイズがコンパイル時に決まらない型も扱うことができます。
  • phantom: ArcInner<T>型のデータを所有していることをドロップチェッカーに伝えるために使用されます。
    • ptrですでにArcInner<T>を使っているのに、なぜPhantomDataが必要なのか疑問に思う方は、以下の公式の詳しい記事をご参照ください。
    • Generic parameters and drop-checking
  • alloc: 今回の記事では触れません。
  • ptr: ArcInner<T>型のデータを指すポインタです。
    • NonNullはnullでないことが保証されたポインタを表す型です。Arcは常に有効なデータを指すため、nullポインタを許容しません。
      最適化のためにNonNullを使用しています。
    • NonNullについて詳しく知りたい方はNonNullを参照してください。
  • ArcInner<T>は以下のように定義されています。
#[repr(C)]
struct ArcInner<T: ?Sized> {
    strong: Atomic<usize>,  // AtomicUsizeと同等(nightlyのgeneric_atomic機能)
    weak: Atomic<usize>,
    data: T,
}
  • #[repr(C)]: このアトリビュートは、ArcInnerのメモリレイアウトをC言語の構造体と同じにすることを指定します。
    これにより、フィールドの順序が保証され、オフセットの計算が容易になります。
  • strong: 強参照カウンターを保持します。Arcのインスタンスが存在する限り、このカウンターは1以上になります。
  • weak: 弱参照カウンターを保持します。今回の記事では触れません。
  • data: 実際のデータを保持します。T型のデータがここに格納されます。

ArcInnerにはstrongweakの二つの参照カウンターがあることがわかります。
weakは循環参照を回避するために使用される、Weak<T>型のためのカウンターです。
weakArcの具体的な実装全体を理解する必要があります。Allocatorも同様に、メモリ割り当ての詳細に関わるため、今回は省略します。

簡略化のために、WeakAllocatorを無視した本記事のArcの定義は以下のようになります。

pub struct Arc<T: ?Sized> {
    ptr: NonNull<ArcInner<T>>,
}

#[repr(C)]
pub struct ArcInner<T: ?Sized> {
    strong: Atomic<usize>,
    data: T,
}

上記の定義を基に、Arcの基本的な動作を理解していきましょう。

Arcの基本操作

Arcの内部構造を理解したところで、実際の使用例を通して基本操作を見ていきましょう。

use std::{sync::Arc, thread};

fn main() {
    let greeting = Arc::new(String::from("Hello, world!"));

    let mut threads = vec![];
    for _ in 0..10 {
        threads.push(thread::spawn({
            let greeting = Arc::clone(&greeting);
            move || {
                println!("{}", greeting);
            }
        }));
    }
    for t in threads {
        t.join().unwrap();
    }
}

このコードは、Arcを使って複数のスレッドで文字列を共有し、それぞれのスレッドでその文字列を出力する例です。
コードを実行すると、各スレッドが同じ文字列を出力します:

Hello, world!
Hello, world!
Hello, world!
Hello, world!
Hello, world!
Hello, world!
Hello, world!
Hello, world!
Hello, world!
Hello, world!

上記のコードの要素を分解すると、以下のようになります:

  • Arc::new: Arc<String>を作成します。Stringと参照カウンターをヒープに確保し、Arcがそのポインタを保持します。
  • Arc::clone: Arcのクローンを作成します。
    重要: String自体をクローンするのではなく、参照カウンターをインクリメントして、ポインターをコピーして、同じデータを指す新しいArcを作成します。
  • thread::spawn: 新しいスレッドを作成します。moveキーワードにより、クロージャはArcの所有権を得ます。
  • println!("{}", greeting);: Derefトレイトにより、Arc<String>&Stringにderefされ、文字列が出力されます。
  • drop: クロージャがArcの所有権を持つため、クロージャが終了するとArcがドロップされ、参照カウンターが減少します。
  • JoinHandle::join: 各スレッドの終了を待ちます。最後のArcがドロップされ、参照カウンターが0になると、Stringが解放されます。

これらの動作を支えるArcの主要な要素をまとめると以下になります。それらを順に解説します。

  • Arc::new
  • Arc::clone
  • Derefトレイト
  • Dropトレイト

1. Arc::newの詳細

Arc::newは以下のように実装されています。

pub fn new(data: T) -> Arc<T> {
    let x: Box<_> = Box::new(ArcInner {
        strong: atomic::Atomic::new(1),
        data,
    });
    Self {
      ptr: Box::leak(x).into()
    }
}
  • Box::new: ArcInnerをヒープに確保します。Boxはヒープにデータを確保するスマートポインタです。
  • atomic::Atomic::new(1): 参照カウンターを1に初期化します。Arcが作成された時点で、1つの所有者が存在するためです。
  • Box::leak: Boxの所有権を放棄し、ヒープ上のデータへの&'static mut ArcInner<T>参照を取得します。
    • Arcは参照カウンターによって独自にメモリ管理を行うために、Boxのdropを実行せずにメモリを解放しないようにします。
  • into(): &mut ArcInner<T>NonNull<ArcInner<T>>に変換します。

つまり、Arc::newは、Boxの自動的なメモリ管理を無効化することで、
Arcが参照カウンターによってデータの寿命を管理できるようにしています。

2. Arc::cloneの詳細

Arc::cloneは以下のように実装されています(実際にはoverflowチェックなどの詳細がありますが、ここでは省略しています)

fn clone(&self) -> Arc<T> {
    // 参照カウンターを1増やす
    self.inner().strong.fetch_add(1, Relaxed);

    // 同じデータを指す新しいArcを作成
    Self {
        ptr: self.ptr
    }
}
  • self.inner(): ArcInner<T>への参照を取得します。
  • fetch_add(1, Relaxed): 参照カウンターを1増加させます。Relaxedメモリオーダリングを使用しています。
    • メモリオーダリング: アトミック操作における、他のスレッドとのメモリ同期の強さを指定します。
    • Relaxed: 最も制約が弱く高速。単一変数へのアトミック性のみ保証し、他の操作との順序関係は保証しません。
      クローン時は参照カウンターの単純な増加のみで、データへのアクセスや他のメモリ操作との順序関係を保証する必要がないため、Relaxedが最適です。
    • Release/Acquire: より強い同期を提供。Releaseはこの操作以前のすべての操作が、
      Acquireで読み込んだスレッドによるその後の操作から観測されることを保証します。
  • Self { ptr: self.ptr }: 新しいArcを作成します。ptrの値をコピーして、新しいArcが同じデータを指すようになります。

Arc::cloneは、データそのものをクローンするのではなく、ヒープ上のデータへのポインタをコピーし、参照カウンターを増やすだけです。
これにより、非常に軽量に「所有権の共有」を実現できます。

この様子を図にすると、以下のようになります。

          スタック (スレッド1)             スタック (スレッド2)
        +-----------------+           +-----------------+
arc1:   | ptr: 0x1234     |   arc2:   | ptr: 0x1234     |
        +-----------------+           +-----------------+
              |                               |
              +---------------+---------------+
                              |
                              v
           ヒープ (アドレス: 0x1234)
           +------------------------------------------+
           | ArcInner<String>                         |
           | +--------------------------------------+ |
           | | strong: Atomic(2)                    | |
           | | data: String("Hello, world!")        | |
           | +--------------------------------------+ |
           +------------------------------------------+

参照カウンターの変化を時系列で見ると:

時間    操作                   strongカウンター   メモリオーダリング
----    ------------------    ------------   ------------------
t1      Arc::new()            1              (初期化)
t2      Arc::clone() #1       2              Relaxed
t3      Arc::clone() #2       3              Relaxed
t4      drop(arc1)            2              Release
t5      drop(arc2)            1              Release
t6      drop(arc3)            0              Release
t7      fence(Acquire)        -              同期完了
t8      データ解放              -              (メモリ解放)

最後のドロップ時には、Releaseでカウンターを0にした後、Acquire fenceで他のスレッドのすべての操作を同期してから、安全にデータを解放します。

3. Derefトレイトの実装

ArcDerefトレイトを実装しており、Arcを通じて内部のデータにアクセスできるようにしています。

例えば先ほどの例ではprintln!("{}", greeting);の部分でgreetingArc<String>型ですが、
Derefトレイトのおかげで&Stringにderefされて、StringDisplayトレイトを使って文字列が出力されています。

impl<T: ?Sized> Deref for Arc<T> {
    type Target = T;

    #[inline]
    fn deref(&self) -> &T {
        &self.inner().data
    }
}
  • type Target = T;: Derefトレイトの関連型TargetTに設定します。これにより、Arc<T>Tにderefされることを示します。
  • fn deref(&self) -> &T: derefメソッドを実装します。このメソッドは、Arcが指すデータへの参照を返します。
  • &self.inner().data: ArcInner<T>dataフィールドへの参照を返します。

Arcが有効である限り、dataは解放されていないので、常に有効な参照を返すことができます。

4. Dropトレイトの実装

ArcCloneの際に参照カウンターを増やすので、Arcがドロップされる際に参照カウンターを減らす必要があります。
ArcDropトレイトを実装しており、以下のようになっています。

fn drop(&mut self) {
    // 参照カウンターを1減らし、減少前の値を取得
    if self.inner().strong.fetch_sub(1, Release) != 1 {
        // 減少前が1でない = 他に所有者がいる
        return;
    }

    // このfence以前のReleaseの操作が、このfence以降で観測されることを保証
    atomic::fence(Acquire);

    // 減少前が1 = これが最後の所有者
    unsafe {
        // データのデストラクタを安全に呼び出す
        ptr::drop_in_place(&mut (*self.ptr.as_ptr()).data);
        // ArcInner全体のメモリを解放。
        // 実際は、Weak参照の管理も含むため、より複雑な処理になる
        // (実装は`std::sync::Weak`の`impl Drop`を参照)
        // deallocate(self.ptr.cast(), Layout::for_value_raw(self.ptr.as_ptr()));
    }
}
  • fetch_sub(1, Release): 参照カウンターを1減少させ、減少前の値を返します。
    • Releaseオーダリングにより、このスレッドでのすべての操作が、後続のAcquire操作から観測されることを保証します。
    • 戻り値が1の場合:これが最後のArcなので、メモリを解放します。
    • 戻り値が1以外の場合:他のArcが存在するので、何もせずリターンします。
  • atomic::fence(Acquire): これは重要なポイントです。
    • fetch_subReleaseと対になって、同期を実現します。最後のスレッドがカウンターを0にした後、
      このfenceにより他のすべてのスレッドでの操作が完了していることを保証します。
    • 具体的には、他のスレッドがReleaseでカウンターを減らす際に行ったすべての操作(データへのアクセスなど)が、このfence以降で観測可能になります。
    • この同期により、最後のArcを持つスレッドが、他のすべてのスレッドでの操作を観測してから解放処理を行えます。
    • 例えば、あるスレッドがArc<Mutex<T>>のデータを変更したとします。
      しかしこのfenceがなければ、最後のArcを持つスレッドが、変更前の古いデータを見てしまう可能性があります。
      Release/Acquireのペアにより、最後のスレッドはデータの最新状態を観測してから解放処理を行えます。
  • unsafe { ptr::drop_in_place(...) }: データのデストラクタを安全に呼び出します。

ここまでのまとめ

Arcの基本的な動作を理解できました:

  • Arc::newでデータをヒープに確保し、参照カウンターを初期化します。
  • Arc::cloneで参照カウンターを増やし、新しいArcを作成します。
  • Dropトレイトで参照カウンターを減らし、必要に応じてデータを解放します。
  • Derefトレイトで内部のデータにアクセスできるようにします。

このように考えると、とてもシンプルな構造であることがわかります。
それに、所有権によりDropが自動的に呼ばれますし、Derefによりスマートにデータにアクセスできます。

次に、Arcを使う上で知っておくべき周辺知識を解説します。

Arcを使う上で理解すべき重要な概念

ここで、もう一度解説対象のコードを見てみましょう。
Arcの動作に関するコメントも付随させます。

use std::{sync::Arc, thread};

fn main() {
    // 1. Arc::newでArc<String>を作成。参照カウンターは1。
    let greeting = Arc::new(String::from("Hello, world!"));

    let mut threads = vec![];
    for _ in 0..10 {
        threads.push(thread::spawn({
            // 2. Arc::cloneで参照カウンターを増やし、新しいArc<String>を作成。
            let greeting = Arc::clone(&greeting);
            // 3. move keywordでクロージャがgreetingの所有権を得る。
            move || {
                // 4. Derefトレイトにより&Stringにderefされ、文字列が出力される。
                println!("{}", greeting);
                // 5. クロージャが終了するとgreetingがドロップされ、参照カウンターが減る。
                // `drop(greeting)` が暗黙的に呼ばれる。
                // 6. 最後のArcがドロップされ参照カウンターが0になると、Stringが解放される。
            }
        }));
    }
    for t in threads {
        // 7. 各スレッドの終了を待つ。
        t.join().unwrap();
    }
}

上記のコードを理解するために、以下の重要な概念を解説します:

  1. Send/Syncトレイト - スレッド間でのデータ移動/共有
  2. T: 'static - ライフタイム制約
  3. Arc<Mutex<T>>パターン - 共有可変データへのアクセス

thread::spawnのシグネチャを見てみましょう。

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
    F: FnOnce() -> T,
    F: Send + 'static,
    T: Send + 'static,
{
    Builder::new().spawn(f).expect("failed to spawn thread")
}
  • F: FnOnce() -> T: クロージャfは引数を取らず、T型の値を返す関数である必要があります。
  • F: Send + 'static & T: Send + 'static: クロージャfとその戻り値Tは、以下の条件を満たす必要があります。
    • Sendトレイトは、型が別のスレッドに移動できることを示します。
    • 'static lifetime boundは後ほど詳しく解説しますが、型が非静的な参照を含まないことを示します。
  • JoinHandle<T>: spawn関数は、新しいスレッドのハンドルを返します。
    このハンドルを使用して、スレッドの終了を待ったり、スレッドからの戻り値を取得したりできます。
    • 今回はjoinメソッドを使って、スレッドの終了を待っています。

1. SendSyncトレイト

Rustの型システムには、マーカートレイトと呼ばれる特殊なトレイトがあります。
マーカートレイトは、型に特定の性質を持たせるために使用されます。
SendSyncはその代表例です。
SendSyncは、Rustの並行性モデルにおいて非常に重要な役割を果たします。
基本的にほとんどの型が実装していますし、構造体を定義する際もフィールドがSendSyncを実装していれば、自動的に実装されます。
例外としては、Rc<T>はどちらも実装していませんし、Cell<T>RefCell<T>Syncを実装していません。

  • Sendトレイトは、型が別のスレッドに移動できることを示します。
    • Rc<T>はパフォーマンスのためにスレッドセーフではない参照カウンターを使用しているため、
      別のスレッドに移動するとデータ競合が発生する可能性があります。
  • Syncトレイトは、型が複数のスレッドから同時に参照できることを示します。
    • 例えば、TSyncを実装している場合、&TSyncを実装します。
    • i32StringSyncを実装していますが、Cell<T>RefCell<T>Syncを実装していません。

ArcがどのようにSendSyncを実装しているか確認してみましょう。

unsafe impl<T: ?Sized + Sync + Send, A: Allocator + Send> Send for Arc<T, A> {}
unsafe impl<T: ?Sized + Sync + Send, A: Allocator + Sync> Sync for Arc<T, A> {}
  • Arc<T>Sendであるためには、TSend + Syncである必要があります。
    • なぜSyncが必要?: Arc<T>を複数スレッドで共有する際、内部のTへの参照が同時にアクセスされる可能性があるためT: Syncが必要
    • なぜSendが必要?: 最後のArcを持つスレッドがTをドロップするため、Tは任意のスレッドに移動可能でなければならない
  • Arc<T>Syncであるためには、TSend + Syncである必要があります。
    • なぜ?: &Arc<T>を共有すると複数スレッドでArc::cloneが可能になり、結果的にArc<T>Sendである場合と同じ状況になるため

2. T: 'static - ライフタイム制約

'staticライフタイムは、Rustのライフタイムシステムにおける特別なライフタイムです。
'staticライフタイムを持つ参照は、プログラムの実行中ずっと有効であることを意味します。

例えば以下のようなコードがあります:

const GREETING: &str = "Hello, world!";
fn main() {
    let greeting: &'static str = GREETING;
    println!("{}", greeting);
}
  • GREETINGはconstで定義されており、constで定義された値はバイナリに埋め込まれ、
    値が変更されることや解放されることがないため、'staticライフタイムを持ちます。

&'static strは、文字列リテラルのようにプログラム全体で有効な参照を表すために使われます。
次に、T: 'staticのstatic lifetime boundの意味を考えてみましょう。

T: 'staticは、型Tが非静的な参照を含まないことを示します。
Tが参照を含まなければ、Tは常に'staticを満たします。
T&'a Uのような参照を含む場合、そのライフタイム'a'staticでなければなりません。
これにより、Tがスレッドを跨いだアクセスに対して安全であることが保証されます。
例えば、以下のようなコードはコンパイルエラーになります。

use std::sync::Arc;
use std::thread;

fn main() {
    let data = vec![1, 2, 3];
    let data_ref = &data; // data_refのライフタイムは'staticではない
    
    // コンパイルエラー: `data_ref`は`'static`でない参照を含む
    // let arc = Arc::new(data_ref);
    // thread::spawn(move || {
    //     println!("{:?}", arc);
    // }).join().unwrap();
    
    // 解決策1: 所有権を持つ型を使う
    let arc_owned = Arc::new(vec![1, 2, 3]);
    let arc_clone = Arc::clone(&arc_owned);
    thread::spawn(move || {
        println!("{:?}", arc_clone);
    }).join().unwrap();
    
    // 解決策2: 'staticな参照を使う
    let static_data: &'static str = "Hello";
    let arc_static = Arc::new(static_data);
    thread::spawn(move || {
        println!("{}", arc_static);
    }).join().unwrap();
}
  • data_refdataへの参照で、main関数のスコープに縛られています。
  • スレッドはmain関数より長く生きる可能性があるため、無効な参照となるリスクがあります。
  • Rustはコンパイル時にこれを検出し、エラーとします。

3. Arc<Mutex<T>>パターンの理解

Arc<Mutex<T>>は、複数のスレッドで共有される可変データに対して、安全に排他制御を行うための一般的なパターンです。

  • Arc<T>: 複数のスレッドでデータを共有(不変アクセスのみ)
  • Mutex<T>: データへの排他的な可変アクセスを提供
    • lockメソッドでロックを取得し、ロックが解放されるまで他のスレッドはアクセスできない
    • デッドロックなどの論理的な問題はプログラマが注意する必要がある
  • Arc<Mutex<T>>: 共有される可変データへの安全なアクセス

簡単な例を示します:

use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    let counter = Arc::new(Mutex::new(0));
    let mut threads = vec![];
    
    for _ in 0..10 {
        let counter_clone = Arc::clone(&counter);
        threads.push(thread::spawn(move || {
            let mut num = counter_clone.lock().unwrap();
            *num += 1;
        }));
    }
    
    for t in threads {
        t.join().unwrap();
    }
    
    println!("Result: {}", *counter.lock().unwrap());
    // 出力: Result: 10
}

このパターンにより、複数のスレッドが同じデータを安全に読み書きできるようになります。

まとめ

Arcは、Rustの所有権システムと並行性モデルにおいて非常に重要な役割を果たします。
Arcを使うことで、複数のスレッドでデータを安全に共有できるようになります。
また、SendSyncなどのマーカートレイトや、'staticライフタイムなどの概念を理解することで、Arcの仕様に関する制約や注意点を把握できます。
さらに、Arc<Mutex<T>>のようなパターンを理解することで、より複雑な並行処理のシナリオにも対応できるようになります。

この記事を通じて、Arcの内部動作と使用例を理解し、Rustにおける並行処理への理解が深まったことを願っています。

参考資料

この記事は以下の資料を参考にしています。これらの記事を噛み砕いて、総合的にまとめたのが本記事です。

FAST Tech Blog

Discussion