🐕

Rayシリーズ:Actors

に公開

今回は前回に続いてRayのコア機能であるActorsについて調べてみました。前回のTasksについてはぜひ以下をご覧ください。

https://zenn.dev/akasan/articles/f5be2708dc34def

Ray Actorsとは?

Ray Actorsは、Ray APIを関数からクラスへと拡張する仕組みになります。前回紹介したRay Tasksは関数に対するデコレータとして実装していましたが、それのクラス版と解釈できます。Actorは基本的にステートフルなワーカー(またはサービス)で、新しいActorをインスタンス化すると、新しいワーカーを作成しそのワーカーにActorのメソッドをスケジュールします。Actorのメソッドは、そのワーカーの状態にアクセスしたり、状態を変更したりすることができます。

https://docs.ray.io/en/latest/ray-core/actors.html

実際に使ってみる

今回もRay Actorsのページで紹介されているサンプルコードをベースにまとめてみます。

環境構築

uvを使って環境を構築します。

uv init ray_actors -p 3.12
cd ray_actors
uv add ray

カウンターの実装

それではまずはカウンターの作成を通してActorsに入門してみます。Ray Tasksでは状態管理を直接サポートしていないため、カウンターのように状態を管理するようなユースケースには不向きです(無理やりやろうとすればリファレンスを使えばできると思いますが、本質的な解決ではないです)。それではクラスを用いてActorを実装します。

counter_actor.py
import ray

@ray.remote
class Counter:
    def __init__(self, init_count: int = 0):
        self.count = init_count

    def increment(self):
        self.count += 1

    def get_count(self) -> int:
        return self.count


counter = Counter.remote()
init_value_ref = counter.get_count.remote()
init_value = ray.get(init_value_ref)
counter.increment.remote()
after_increment_value_ref = counter.get_count.remote()
after_increment_value = ray.get(after_increment_value_ref)

print(f"{init_value=}, {after_increment_value=}")

Actorを定義するにはTasksの時と同様に@ray.remoteデコレータをつけたクラスとして定義するだけです。また、インスタンス化や関数呼び出しは全てhoge.remote()のようにremote()を呼び出す形となります。こちらのコードを実行すると以下のようになります。

(uv run counter_actor.py) 2>&1 | tee /dev/tty | pbcopy

# 結果
2025-08-13 22:14:27,363	INFO worker.py:1747 -- Connecting to existing Ray cluster at address: 127.0.0.1:6379...
2025-08-13 22:14:27,370	INFO worker.py:1918 -- Connected to Ray cluster. View the dashboard at [1m[32mhttp://127.0.0.1:8265 [39m[22m
(raylet) warning: `VIRTUAL_ENV=/Users/user/Documents/Blog/blog_materials/ray/ray_actors/.venv` does not match the project environment path `.venv` and will be ignored; use `--active` to target the active environment instead
init_value=0, after_increment_value=1

結果を見るとinit_value=0, after_increment_value=1となっているのでリモート関数を呼び出す前後で状態が変わっていることが確認できました。ちなみに、counter = Counter.remote(100)のようにコンストラクタの値を指定することで初期値を変えることももちろんできます。実行すると以下のようになります。

(uv run counter_actor_init_100.py) 2>&1 | tee /dev/tty | pbcopy

# 結果
2025-08-13 22:16:27,541	INFO worker.py:1747 -- Connecting to existing Ray cluster at address: 127.0.0.1:6379...
2025-08-13 22:16:27,548	INFO worker.py:1918 -- Connected to Ray cluster. View the dashboard at [1m[32mhttp://127.0.0.1:8265 [39m[22m
2025-08-13 22:16:27,555	INFO packaging.py:588 -- Creating a file package for local module '/Users/user/Documents/Blog/blog_materials/ray/ray_actors'.
2025-08-13 22:16:27,565	INFO packaging.py:380 -- Pushing file package 'gcs://_ray_pkg_1f6bfb7034ef69e6.zip' (0.21MiB) to Ray cluster...
2025-08-13 22:16:27,565	INFO packaging.py:393 -- Successfully pushed file package 'gcs://_ray_pkg_1f6bfb7034ef69e6.zip'.
(raylet) warning: `VIRTUAL_ENV=/Users/user/Documents/Blog/blog_materials/ray/ray_actors/.venv` does not match the project environment path `.venv` and will be ignored; use `--active` to target the active environment instead
(raylet) Using CPython 3.12.9
(raylet) Creating virtual environment at: .venv
(raylet) Installed 65 packages in 184ms
(raylet) [2025-08-13 22:16:29,555 E 71950 968663] (raylet) file_system_monitor.cc:116: /tmp/ray/session_2025-08-12_17-55-07_117017_71924 is over 95% full, available space: 42.5357 GB; capacity: 926.352 GB. Object creation will fail if spilling is required.
init_value=100, after_increment_value=101

Tasksへの引数にする

例えば先ほど定義したCounterというActorをTasksの引数として与えることもできます。例えばTasksのなかで指定した回数incrementするようなTasksを作ってみます。

increment_in_task.py
import ray


@ray.remote
class Counter:
    def __init__(self, init_count: int = 0):
        self.count = init_count

    def increment(self):
        self.count += 1

    def get_count(self) -> int:
        return self.count


@ray.remote
def increment(counter: Counter, increment_num: int):
    for _ in range(increment_num):
        counter.increment.remote()


counter = Counter.remote()
increment_task_ref = increment.remote(counter, 10)
ray.get(increment_task_ref)
count = ray.get(counter.get_count.remote())
print(f"{count=}")

それではこちらも実行してみましょう。結果を見ると、Taskにて10回インクリメントした結果、Counterの値は10になりその値を参照できるていることが確認できました。

(uv run increment_in_task.py) 2>&1 | tee /dev/tty | pbcopy

# 結果
2025-08-13 22:22:34,340	INFO worker.py:1747 -- Connecting to existing Ray cluster at address: 127.0.0.1:6379...
2025-08-13 22:22:34,346	INFO worker.py:1918 -- Connected to Ray cluster. View the dashboard at [1m[32mhttp://127.0.0.1:8265 [39m[22m
(raylet) warning: `VIRTUAL_ENV=/Users/user/Documents/Blog/blog_materials/ray/ray_actors/.venv` does not match the project environment path `.venv` and will be ignored; use `--active` to target the active environment instead
count=10
(raylet) warning: `VIRTUAL_ENV=/Users/user/Documents/Blog/blog_materials/ray/ray_actors/.venv` does not match the project environment path `.venv` and will be ignored; use `--active` to target the active environment instead

Actorの種類について

Actorにはいくつかの種類があります。今まで紹介してきたのはRegular Actorsに該当します。そのほかにAsync ActorsやThreaded Actorsがあります。詳細な使い分けについては記事が長くなってしまうので次回の記事で紹介しようと思います。

Async Actors

async/await構文を使って並行コードを書くことができます。Rayはasyncioとネイティブに統合されてい流ので、aiohttpやaioredisなどの非同期フレームワークと一緒に実装することができます。いわゆる非同期実行をすることができます。

https://docs.ray.io/en/latest/ray-core/actors/async_api.html#async-actors

Threaded Actors

こちらもAsync Actorsとにており、マルチスレッドに対応したActorの実装方法になります。複数スレッドを利用した処理の実行に向いています。

https://docs.ray.io/en/latest/ray-core/actors/async_api.html#threaded-actors

Named Actor

Actorには名前をつけることができ、名前をつけたActorはNamespaceないで参照することができるようになります。Namespaceは論理的なグループであり、ジョブやアクターを登録できます。今回は最もシンプルな方法を使ってみようと思います。先ほどのCounterについて、ネームスペースに登録した後に、あえてそれを参照しにいくことで利用してみます。

named_counter.py
import ray

@ray.remote
class Counter:
    def __init__(self, init_count: int = 0):
        self.count = init_count

    def increment(self):
        self.count += 1

    def get_count(self) -> int:
        return self.count


counter_origin = Counter.options(name="counter").remote()
ray.get(counter_origin.increment.remote())
counter_origin_value = ray.get(counter_origin.get_count.remote())
print(f"{counter_origin_value=}")

counter_from_namespace = ray.get_actor("counter")
ray.get(counter_from_namespace.increment.remote())
counter_from_namespace_value = ray.get(counter_from_namespace.get_count.remote())
print(f"{counter_from_namespace_value=}")

この例では最初に初期化したCounterのActor側で一回分インクリメントした後、そのActorをあえてNamespaceから以下のようにして参照しています。

  ray.get(counter_from_namespace.increment.remote())

このコードの結果としては、counter_origin側の値取得の結果は1、counter_from_namespace側の値はcounter_originを参照の上で1プラスするので2が得られると所望の動作となります。それでは実行してみましょう。

(uv run increment_in_task.py) 2>&1 | tee /dev/tty | pbcopy

# 結果
2025-08-13 22:51:37,609	INFO worker.py:1747 -- Connecting to existing Ray cluster at address: 127.0.0.1:6379...
2025-08-13 22:51:37,615	INFO worker.py:1918 -- Connected to Ray cluster. View the dashboard at [1m[32mhttp://127.0.0.1:8265 [39m[22m
(raylet) [2025-08-13 22:51:37,697 E 71950 968663] (raylet) file_system_monitor.cc:116: /tmp/ray/session_2025-08-12_17-55-07_117017_71924 is over 95% full, available space: 42.4922 GB; capacity: 926.352 GB. Object creation will fail if spilling is required.
(raylet) warning: `VIRTUAL_ENV=/Users/user/Documents/Blog/blog_materials/ray/ray_actors/.venv` does not match the project environment path `.venv` and will be ignored; use `--active` to target the active environment instead
counter_origin_value=1
counter_from_namespace_value=2

結果を見ると、思っていたような動作になっていることがわかります。ネームスペースについてはもっと機能がありますが、こちらもここで解説するとボリュームが大きくなってしまうので改めて別の記事で紹介します。

まとめ

今回はRayのコア機能の一つであるRay Actorsを紹介しました。前回紹介したRay Tasksと基本的には同じような実装になりつつ、状態を管理できるという点が大きな違いになっています。ステートレスな実装はRay Tasks、ステートフルな実装はRay Actorsを使うと覚えてもらえるといいかなと思います!次回はRay Actorsの種類について解説しようと思います。

Discussion