Rayシリーズ:Actorsの種類について
今回は前回紹介したActorsについて、どのような種類があるかを解説しようと思います。
Named Actors
こちらはActorsの種類というより、Namespaceの概念を用いてActorsを取得するための機能になります。Actorsのインスタンスを作成した後にNamespaceに登録することで、その名前を参照して別の場所からActorsを利用することができます。公式サンプルを添付すると、以下のようなコードになります。
import ray
@ray.remote
class Counter:
def __init__(self):
self.count = 0
def increment(self):
self.count += 1
def get_counter(self) -> int:
return self.count
# Create an actor with a name
counter = Counter.options(name="some_name").remote()
# Retrieve the actor later somewhere
counter = ray.get_actor("some_name")
Counter.options(name="some_name")
とすることで、Namespaceにsome_name
という名前で登録することができます。なお、Namespaceはいくつも作ることができ、例えば指定したNamespaceに登録したい場合は以下のように実装します。
ray.init(address="auto", namespace="new_namespace")
Counter.options(name="some_name", lifetime="detached").remote()
非同期Actors
先ほど実装していたCounterは同期的に動くものを想定していたものですが、非同期実装をすることができます。公式に提供されているサンプルをみてみましょう。
import ray
import asyncio
@ray.remote
class AsyncActor:
def __init__(self, expected_num_tasks: int):
self._event = asyncio.Event()
self._curr_num_tasks = 0
self._expected_num_tasks = expected_num_tasks
# Multiple invocations of this method can run concurrently on the same event loop.
async def run_concurrent(self):
self._curr_num_tasks += 1
if self._curr_num_tasks == self._expected_num_tasks:
print("All coroutines are executing concurrently, unblocking.")
self._event.set()
else:
print("Waiting for other coroutines to start.")
await self._event.wait()
print("All coroutines ran concurrently.")
actor = AsyncActor.remote(4)
refs = [actor.run_concurrent.remote() for _ in range(4)]
# Fetch results using regular `ray.get`.
ray.get(refs)
# Fetch results using `asyncio` APIs.
async def get_async():
return await asyncio.gather(*refs)
asyncio.run(get_async())
Actorsの実装は@ray.remote
をクラスにデコレータとしてつけることで実現でき、これは同期Actorsと同じです。今回の実装では同時に4つのタスクを非同期で実装するようにしています。self._event
を利用することで、全てのタスクの準備が完了したことを検知しています。試しにこのコードを実行してみると、結果は以下のようになりました。4つのタスクの準備ができるまではself._event.wait()
によってブロッキングされるためWaiting for other coroutines to start.
が表示されます。4つ目のタスクが登録された段階でAll coroutines are executing concurrently, unblocking.
が表示され、後続の処理が始まったことが確認できます。
(AsyncActor pid=28302) Waiting for other coroutines to start.
(AsyncActor pid=28302) Waiting for other coroutines to start.
(AsyncActor pid=28302) Waiting for other coroutines to start.
(AsyncActor pid=28302) All coroutines are executing concurrently, unblocking.
(AsyncActor pid=28302) All coroutines ran concurrently.
(AsyncActor pid=28302) All coroutines ran concurrently.
(AsyncActor pid=28302) All coroutines ran concurrently.
(AsyncActor pid=28302) All coroutines ran concurrently.
非同期実行数の制限
非同期Actorsを実装する際に、非同期グループの作成や実行数の制限を行うことができます。サンプルコードをみてみましょう。
import ray
@ray.remote(concurrency_groups={"io": 2, "compute": 4})
class AsyncIOActor:
def __init__(self):
pass
@ray.method(concurrency_group="io")
async def f1(self):
pass
@ray.method(concurrency_group="io")
async def f2(self):
pass
@ray.method(concurrency_group="compute")
async def f3(self):
pass
@ray.method(concurrency_group="compute")
async def f4(self):
pass
async def f5(self):
pass
a = AsyncIOActor.remote()
a.f1.remote() # executed in the "io" group.
a.f2.remote() # executed in the "io" group.
a.f3.remote() # executed in the "compute" group.
a.f4.remote() # executed in the "compute" group.
a.f5.remote() # executed in the default group.
@ray.remote(concurrency_groups={"io": 2, "compute": 4})
のように指定することで、ioグループとcomputeグループを作り、それぞれ実行数を2、4に限定しています。グループごとの実行数を指定することで、どのようにタスクをスケーリングしていくかを指定することができます。
Out-of-bound通信
Actorsでは通常メソッドを呼び出すことで実行されますが、帯域外に向けた通信などを提供することもできます。例えば以下の例では、Actors内部でHTTPサーバを起動し、メソッドをHTTPメソッド経由でアクセスできるように実装しています。
import ray
import asyncio
import requests
from aiohttp import web
@ray.remote
class Counter:
async def __init__(self):
self.counter = 0
asyncio.get_running_loop().create_task(self.run_http_server())
async def run_http_server(self):
app = web.Application()
app.add_routes([web.get("/", self.get)])
app.add_routes([web.post("/", self.post)])
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, "127.0.0.1", 25001)
await site.start()
async def get(self, request):
return web.Response(text=str(self.counter))
async def post(self, request):
self.counter += 1
return web.Response(text="Counter incremented")
async def increment(self):
self.counter = self.counter + 1
ray.init()
counter = Counter.remote()
[ray.get(counter.increment.remote()) for i in range(5)]
r = requests.get("http://127.0.0.1:25001/")
print(r.text)
requests.post("http://127.0.0.1:25001/")
r = requests.get("http://127.0.0.1:25001/")
print(r.text)
run_http_server
メソッド内部でHTTPサーバを起動しています。サーバを立てるにあたり、GETメソッドやPOSTメソッドをActorsのメソッドに紐づける実装をすることで、外部からGETやPOSTメソッドを呼び出すことでActorsメソッドを呼び出しています。
async def run_http_server(self):
app = web.Application()
app.add_routes([web.get("/", self.get)])
app.add_routes([web.post("/", self.post)])
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, "127.0.0.1", 25001)
await site.start()
これを実行すると以下のような結果になります。
(raylet) Using CPython 3.12.9
(raylet) Creating virtual environment at: .venv
(raylet) Installed 33 packages in 176ms
5
6
Actor Pool
Actorsはステートフルなタスク実行をすることができますが、用途によっては複数のActorをプールとしてまとめ、データに対してラウンドロビン的に適用したい場合があります。そのような用途のために、ActorPool
というものが提供されています。サンプルコードをみてみましょう。
import ray
from ray.util import ActorPool
@ray.remote
class Actor:
def double(self, n):
return n * 2
a1, a2 = Actor.remote(), Actor.remote()
pool = ActorPool([a1, a2])
# pool.map(..) returns a Python generator object ActorPool.map
gen = pool.map(lambda a, v: a.double.remote(v), [1, 2, 3, 4])
print(list(gen))
今回はActor
を二つ作成してプールを作成し、そのプールに対してリスト内の値を二倍にする機能を呼び出しています。以下の部分で、プール内のActorにリストの値を割り振り、結果をまとめています。
gen = pool.map(lambda a, v: a.double.remote(v), [1, 2, 3, 4])
こちらのコードを実行すると、以下のように要素が2倍になった値が取得できます。
(raylet) Using CPython 3.12.9
(raylet) Creating virtual environment at: .venv
(raylet) Installed 25 packages in 120ms
list(gen)=[2, 4, 6, 8]
まとめ
今回はActorsの種類、特に非同期Actorsについてみてみました。ステートフルなActorsを利用するにあたり様々な呼び出し時の制約の付与などができるため、利用する際はぜひ参考にしてみてください。
Discussion