Rayシリーズ:Actors
今回は前回に続いてRayのコア機能であるActorsについて調べてみました。前回のTasksについてはぜひ以下をご覧ください。
Ray Actorsとは?
Ray Actorsは、Ray APIを関数からクラスへと拡張する仕組みになります。前回紹介したRay Tasksは関数に対するデコレータとして実装していましたが、それのクラス版と解釈できます。Actorは基本的にステートフルなワーカー(またはサービス)で、新しいActorをインスタンス化すると、新しいワーカーを作成しそのワーカーにActorのメソッドをスケジュールします。Actorのメソッドは、そのワーカーの状態にアクセスしたり、状態を変更したりすることができます。
実際に使ってみる
今回もRay Actorsのページで紹介されているサンプルコードをベースにまとめてみます。
環境構築
uvを使って環境を構築します。
uv init ray_actors -p 3.12
cd ray_actors
uv add ray
カウンターの実装
それではまずはカウンターの作成を通してActorsに入門してみます。Ray Tasksでは状態管理を直接サポートしていないため、カウンターのように状態を管理するようなユースケースには不向きです(無理やりやろうとすればリファレンスを使えばできると思いますが、本質的な解決ではないです)。それではクラスを用いてActorを実装します。
import ray
@ray.remote
class Counter:
def __init__(self, init_count: int = 0):
self.count = init_count
def increment(self):
self.count += 1
def get_count(self) -> int:
return self.count
counter = Counter.remote()
init_value_ref = counter.get_count.remote()
init_value = ray.get(init_value_ref)
counter.increment.remote()
after_increment_value_ref = counter.get_count.remote()
after_increment_value = ray.get(after_increment_value_ref)
print(f"{init_value=}, {after_increment_value=}")
Actorを定義するにはTasksの時と同様に@ray.remote
デコレータをつけたクラスとして定義するだけです。また、インスタンス化や関数呼び出しは全てhoge.remote()
のようにremote()
を呼び出す形となります。こちらのコードを実行すると以下のようになります。
(uv run counter_actor.py) 2>&1 | tee /dev/tty | pbcopy
# 結果
2025-08-13 22:14:27,363 INFO worker.py:1747 -- Connecting to existing Ray cluster at address: 127.0.0.1:6379...
2025-08-13 22:14:27,370 INFO worker.py:1918 -- Connected to Ray cluster. View the dashboard at [1m[32mhttp://127.0.0.1:8265 [39m[22m
(raylet) warning: `VIRTUAL_ENV=/Users/user/Documents/Blog/blog_materials/ray/ray_actors/.venv` does not match the project environment path `.venv` and will be ignored; use `--active` to target the active environment instead
init_value=0, after_increment_value=1
結果を見るとinit_value=0, after_increment_value=1
となっているのでリモート関数を呼び出す前後で状態が変わっていることが確認できました。ちなみに、counter = Counter.remote(100)
のようにコンストラクタの値を指定することで初期値を変えることももちろんできます。実行すると以下のようになります。
(uv run counter_actor_init_100.py) 2>&1 | tee /dev/tty | pbcopy
# 結果
2025-08-13 22:16:27,541 INFO worker.py:1747 -- Connecting to existing Ray cluster at address: 127.0.0.1:6379...
2025-08-13 22:16:27,548 INFO worker.py:1918 -- Connected to Ray cluster. View the dashboard at [1m[32mhttp://127.0.0.1:8265 [39m[22m
2025-08-13 22:16:27,555 INFO packaging.py:588 -- Creating a file package for local module '/Users/user/Documents/Blog/blog_materials/ray/ray_actors'.
2025-08-13 22:16:27,565 INFO packaging.py:380 -- Pushing file package 'gcs://_ray_pkg_1f6bfb7034ef69e6.zip' (0.21MiB) to Ray cluster...
2025-08-13 22:16:27,565 INFO packaging.py:393 -- Successfully pushed file package 'gcs://_ray_pkg_1f6bfb7034ef69e6.zip'.
(raylet) warning: `VIRTUAL_ENV=/Users/user/Documents/Blog/blog_materials/ray/ray_actors/.venv` does not match the project environment path `.venv` and will be ignored; use `--active` to target the active environment instead
(raylet) Using CPython 3.12.9
(raylet) Creating virtual environment at: .venv
(raylet) Installed 65 packages in 184ms
(raylet) [2025-08-13 22:16:29,555 E 71950 968663] (raylet) file_system_monitor.cc:116: /tmp/ray/session_2025-08-12_17-55-07_117017_71924 is over 95% full, available space: 42.5357 GB; capacity: 926.352 GB. Object creation will fail if spilling is required.
init_value=100, after_increment_value=101
Tasksへの引数にする
例えば先ほど定義したCounter
というActorをTasksの引数として与えることもできます。例えばTasksのなかで指定した回数incrementするようなTasksを作ってみます。
import ray
@ray.remote
class Counter:
def __init__(self, init_count: int = 0):
self.count = init_count
def increment(self):
self.count += 1
def get_count(self) -> int:
return self.count
@ray.remote
def increment(counter: Counter, increment_num: int):
for _ in range(increment_num):
counter.increment.remote()
counter = Counter.remote()
increment_task_ref = increment.remote(counter, 10)
ray.get(increment_task_ref)
count = ray.get(counter.get_count.remote())
print(f"{count=}")
それではこちらも実行してみましょう。結果を見ると、Taskにて10回インクリメントした結果、Counterの値は10になりその値を参照できるていることが確認できました。
(uv run increment_in_task.py) 2>&1 | tee /dev/tty | pbcopy
# 結果
2025-08-13 22:22:34,340 INFO worker.py:1747 -- Connecting to existing Ray cluster at address: 127.0.0.1:6379...
2025-08-13 22:22:34,346 INFO worker.py:1918 -- Connected to Ray cluster. View the dashboard at [1m[32mhttp://127.0.0.1:8265 [39m[22m
(raylet) warning: `VIRTUAL_ENV=/Users/user/Documents/Blog/blog_materials/ray/ray_actors/.venv` does not match the project environment path `.venv` and will be ignored; use `--active` to target the active environment instead
count=10
(raylet) warning: `VIRTUAL_ENV=/Users/user/Documents/Blog/blog_materials/ray/ray_actors/.venv` does not match the project environment path `.venv` and will be ignored; use `--active` to target the active environment instead
Actorの種類について
Actorにはいくつかの種類があります。今まで紹介してきたのはRegular Actorsに該当します。そのほかにAsync ActorsやThreaded Actorsがあります。詳細な使い分けについては記事が長くなってしまうので次回の記事で紹介しようと思います。
Async Actors
async/await構文を使って並行コードを書くことができます。Rayはasyncioとネイティブに統合されてい流ので、aiohttpやaioredisなどの非同期フレームワークと一緒に実装することができます。いわゆる非同期実行をすることができます。
Threaded Actors
こちらもAsync Actorsとにており、マルチスレッドに対応したActorの実装方法になります。複数スレッドを利用した処理の実行に向いています。
Named Actor
Actorには名前をつけることができ、名前をつけたActorはNamespaceないで参照することができるようになります。Namespaceは論理的なグループであり、ジョブやアクターを登録できます。今回は最もシンプルな方法を使ってみようと思います。先ほどのCounterについて、ネームスペースに登録した後に、あえてそれを参照しにいくことで利用してみます。
import ray
@ray.remote
class Counter:
def __init__(self, init_count: int = 0):
self.count = init_count
def increment(self):
self.count += 1
def get_count(self) -> int:
return self.count
counter_origin = Counter.options(name="counter").remote()
ray.get(counter_origin.increment.remote())
counter_origin_value = ray.get(counter_origin.get_count.remote())
print(f"{counter_origin_value=}")
counter_from_namespace = ray.get_actor("counter")
ray.get(counter_from_namespace.increment.remote())
counter_from_namespace_value = ray.get(counter_from_namespace.get_count.remote())
print(f"{counter_from_namespace_value=}")
この例では最初に初期化したCounterのActor側で一回分インクリメントした後、そのActorをあえてNamespaceから以下のようにして参照しています。
ray.get(counter_from_namespace.increment.remote())
このコードの結果としては、counter_origin
側の値取得の結果は1、counter_from_namespace
側の値はcounter_origin
を参照の上で1プラスするので2が得られると所望の動作となります。それでは実行してみましょう。
(uv run increment_in_task.py) 2>&1 | tee /dev/tty | pbcopy
# 結果
2025-08-13 22:51:37,609 INFO worker.py:1747 -- Connecting to existing Ray cluster at address: 127.0.0.1:6379...
2025-08-13 22:51:37,615 INFO worker.py:1918 -- Connected to Ray cluster. View the dashboard at [1m[32mhttp://127.0.0.1:8265 [39m[22m
(raylet) [2025-08-13 22:51:37,697 E 71950 968663] (raylet) file_system_monitor.cc:116: /tmp/ray/session_2025-08-12_17-55-07_117017_71924 is over 95% full, available space: 42.4922 GB; capacity: 926.352 GB. Object creation will fail if spilling is required.
(raylet) warning: `VIRTUAL_ENV=/Users/user/Documents/Blog/blog_materials/ray/ray_actors/.venv` does not match the project environment path `.venv` and will be ignored; use `--active` to target the active environment instead
counter_origin_value=1
counter_from_namespace_value=2
結果を見ると、思っていたような動作になっていることがわかります。ネームスペースについてはもっと機能がありますが、こちらもここで解説するとボリュームが大きくなってしまうので改めて別の記事で紹介します。
まとめ
今回はRayのコア機能の一つであるRay Actorsを紹介しました。前回紹介したRay Tasksと基本的には同じような実装になりつつ、状態を管理できるという点が大きな違いになっています。ステートレスな実装はRay Tasks、ステートフルな実装はRay Actorsを使うと覚えてもらえるといいかなと思います!次回はRay Actorsの種類について解説しようと思います。
Discussion