🍣

Rayシリーズ:Tasks

に公開

今回からRay解説シリーズを始めてみようと思います。分散コンピューティングライブラリとして絶対的な地位を気づいているであろうRayを使えるようになるために解説してみます。

Ray Tasksとは?

Ray TasksはRay Coreの一つとして置かれている機能の一つになります。任意の関数を別のPythonワーカー上で非同期的に実行することを可能にするための機能になります。Ray Tasksで扱われる関数はRayリモート関数と呼ばれ、その非同期呼び出しはRayタスクと呼ばれます、

https://docs.ray.io/en/latest/ray-core/tasks.html#ray-remote-functions

実際に使ってみる

今回は先ほど添付したRay Tasksのページで紹介されているサンプルコードをベースにまとめてみます。

環境構築

uvを使って環境を構築します。

uv init ray_tasks -p 3.12
cd ray_tasks
uv add ray

time.sleepを使った実験

それではまずはtime.sleepを使った実験をしてみます。実験では3秒間sleepする関数を4回呼び出すことを想定し、Ray Tasksを使う場合と使わない場合でどう違うか比較してみます。

Ray Tasksを使わない場合

ソースコードは以下のようになります。

slow_function.py
import time

def slow_function():
    time.sleep(3)

slow_function()
slow_function()
slow_function()
slow_function()

それでは実行してみましょう。

(time uv run slow_function.py) 2>&1 | tee /dev/tty | pbcopy

# 結果
uv run slow_function.py  0.03s user 0.01s system 0% cpu 12.060 total

結果を見ると、、おおよそ12秒かかっており、3秒要する関数を4回実行した結果としては正しい結果に見えます。

Ray Tasksを使う場合

それでは次はRay Tasksを使ってみましょう。Rayでは@ray.remoteデコレータを使うことでリモート関数として定義できます。

slow_function_ray.py
import ray
import time


@ray.remote
def slow_function():
    time.sleep(3)


for _ in range(4):
    slow_function.remote()

これを実行してみましょう。

(time uv run slow_function_ray.py) 2>&1 | tee /dev/tty | pbcopy

# 結果
2025-08-12 13:16:18,054	INFO worker.py:1927 -- Started a local Ray instance.
2025-08-12 13:16:18,059	INFO packaging.py:588 -- Creating a file package for local module '/Users/user/Documents/Blog/blog_materials/ray/ray_tasks'.
2025-08-12 13:16:18,073	INFO packaging.py:380 -- Pushing file package 'gcs://_ray_pkg_2efc12f91335c8ff.zip' (0.08MiB) to Ray cluster...
2025-08-12 13:16:18,073	INFO packaging.py:393 -- Successfully pushed file package 'gcs://_ray_pkg_2efc12f91335c8ff.zip'.
uv run slow_function_ray.py  1.58s user 0.56s system 29% cpu 7.216 total

結果を見ると先ほどの結果に比べると短くはなっていますが、7秒程度になっています。これは関数の実行に時間がかかっていると言うよりは、Rayの環境構築のための時間がかかっていることが挙げられると思います。しかし、並列実行がされているおかげで、圧倒的に短い実行時間になっています。また、threadingやmuliiprocessingを使わずにほとんど変更なしで並列計算をできているのはとても便利だと思います。

リソースの確保

@ray.remoteでは確保したいリソースを定義することができます。指定したリソースは確保されるまで実行が待機されるため、運用環境に応じて設定するh値雨ようがあります。例えば以下のようにすることでCPUコア数を4つ、GPUを2つ確保したいと言うことを伝えています。

@ray.remote(num_cpus=4, num_gpus=2)
def function():
    ...

関数の実行結果の取得

先ほどの例は単純にtime.sleepを呼び出しているだけでしたが、何かしらの計算をさせて値を取得することが多いと思います。@ray.remoteで定義された関数の戻り値は関数の戻り値自体ではなくそれに対する参照という形で戻されます。@ray.remoteの実際の結果を取得するにはray.get(ref)のような形で対応することができます。それではサンプルをみてみましょう。

calculate_add.py
import ray


@ray.remote
def add(x: int, y: int):
    return x + y


obj_refs = []

for i in range(4):
    obj_refs.append(
        add.remote(i, 1) 
    )

res = [ray.get(obj_ref) for obj_ref in obj_refs]
print(res)

これを実行してみます。

(uv run calculate_add.py) 2>&1 | tee /dev/tty | pbcopy

# 結果
2025-08-12 13:42:22,058	INFO worker.py:1918 -- Started a local Ray instance. View the dashboard at [1m[32mhttp://127.0.0.1:8265 [39m[22m
2025-08-12 13:42:22,066	INFO packaging.py:588 -- Creating a file package for local module '/Users/user/Documents/Blog/blog_materials/ray/ray_tasks'.
2025-08-12 13:42:22,078	INFO packaging.py:380 -- Pushing file package 'gcs://_ray_pkg_7afd6deef098967e.zip' (0.21MiB) to Ray cluster...
2025-08-12 13:42:22,080	INFO packaging.py:393 -- Successfully pushed file package 'gcs://_ray_pkg_7afd6deef098967e.zip'.
(raylet) [2025-08-12 13:42:21,969 E 88540 476163] (raylet) file_system_monitor.cc:116: /tmp/ray/session_2025-08-12_13-42-20_285839_88523 is over 95% full, available space: 44.801 GB; capacity: 926.352 GB. Object creation will fail if spilling is required.
(raylet) warning: `VIRTUAL_ENV=/Users/user/Documents/Blog/blog_materials/ray/ray_tasks/.venv` does not match the project environment path `.venv` and will be ignored; use `--active` to target the active environment instead
(raylet) Using CPython 3.12.9
(raylet) Creating virtual environment at: .venv
(raylet) Installed 65 packages in 3.63s
(raylet) error: Failed to install: py_spy-0.4.1-py2.py3-none-macosx_10_12_x86_64.macosx_11_0_arm64.macosx_10_12_universal2.whl (py-spy==0.4.1)
(raylet)   Caused by: failed to rename file from /private/tmp/ray/session_2025-08-12_13-42-20_285839_88523/runtime_resources/working_dir_files/_ray_pkg_7afd6deef098967e/.venv/lib/python3.12/site-packages/.tmpwDPlA9/py-spy to /private/tmp/ray/session_2025-08-12_13-42-20_285839_88523/runtime_resources/working_dir_files/_ray_pkg_7afd6deef098967e/.venv/lib/python3.12/site-packages/py_spy-0.4.1.data/scripts/py-spy: Invalid argument (os error 22)
(raylet) warning: `VIRTUAL_ENV=/Users/user/Documents/Blog/blog_materials/ray/ray_tasks/.venv` does not match the project environment path `.venv` and will be ignored; use `--active` to target the active environment instead[32m [repeated 3x across cluster] (Ray deduplicates logs by default. Set RAY_DEDUP_LOGS=0 to disable log deduplication, or see https://docs.ray.io/en/master/ray-observability/user-guides/configure-logging.html#log-deduplication for more options.)
[1, 2, 3, 4]

結果を見ると、引数として与えた二つの数字(今回の場合は0~3と固定で1)の足した結果が格納されていることが確認できました。

リモート関数結果の受け渡し

例えば二つのリモート関数を定義して、一つ目のリモート関数の結果を受け取って二つ目の関数が実行される場合を想定します。リモート関数では、引数の引き渡しはリモート関数の戻り値であるリファレンスを受け入れるようになっています。つまり、ray.get(ref)をわざわざして受け渡さなくてもいいということになります。

argument_ref.py
import ray


@ray.remote
def first_function(x: int):
    return x * 10


@ray.remote
def second_function(x: int):
    return x / 2


obj_refs = []

for i in range(1, 5):
    first_ref = first_function.remote(i)
    second_ref = second_function.remote(first_ref)
    obj_refs.append(second_ref)

res = [ray.get(obj_ref) for obj_ref in obj_refs]
print(res)

それでは実行してみましょう。

(uv run argument_ref.py) 2>&1 | tee /dev/tty | pbcopy

# 結果
2025-08-12 13:50:57,951	INFO worker.py:1918 -- Started a local Ray instance. View the dashboard at [1m[32mhttp://127.0.0.1:8265 [39m[22m
2025-08-12 13:50:57,960	INFO packaging.py:588 -- Creating a file package for local module '/Users/user/Documents/Blog/blog_materials/ray/ray_tasks'.
2025-08-12 13:50:57,974	INFO packaging.py:380 -- Pushing file package 'gcs://_ray_pkg_c5fe6038018ef02d.zip' (0.21MiB) to Ray cluster...
2025-08-12 13:50:57,976	INFO packaging.py:393 -- Successfully pushed file package 'gcs://_ray_pkg_c5fe6038018ef02d.zip'.
(raylet) warning: `VIRTUAL_ENV=/Users/user/Documents/Blog/blog_materials/ray/ray_tasks/.venv` does not match the project environment path `.venv` and will be ignored; use `--active` to target the active environment instead
(raylet) Using CPython 3.12.9
(raylet) Creating virtual environment at: .venv
(raylet) Installed 65 packages in 2.28s
(raylet) error: Failed to install: ruff-0.12.8-py3-none-macosx_11_0_arm64.whl (ruff==0.12.8)
(raylet)   Caused by: failed to rename file from /private/tmp/ray/session_2025-08-12_13-50-56_164420_93246/runtime_resources/working_dir_files/_ray_pkg_c5fe6038018ef02d/.venv/lib/python3.12/site-packages/.tmpcac3St/ruff to /private/tmp/ray/session_2025-08-12_13-50-56_164420_93246/runtime_resources/working_dir_files/_ray_pkg_c5fe6038018ef02d/.venv/lib/python3.12/site-packages/ruff-0.12.8.data/scripts/ruff: Invalid argument (os error 22)
[5.0, 10.0, 15.0, 20.0]
(raylet) warning: `VIRTUAL_ENV=/Users/user/Documents/Blog/blog_materials/ray/ray_tasks/.venv` does not match the project environment path `.venv` and will be ignored; use `--active` to target the active environment instead[32m [repeated 3x across cluster] (Ray deduplicates logs by default. Set RAY_DEDUP_LOGS=0 to disable log deduplication, or see https://docs.ray.io/en/master/ray-observability/user-guides/configure-logging.html#log-deduplication for more options.)
(raylet) Installed 65 packages in 5.12s
(raylet) error: Failed to install: ruff-0.12.8-py3-none-macosx_11_0_arm64.whl (ruff==0.12.8)
(raylet)   Caused by: failed to rename file from /private/tmp/ray/session_2025-08-12_13-50-56_164420_93246/runtime_resources/working_dir_files/_ray_pkg_c5fe6038018ef02d/.venv/lib/python3.12/site-packages/.tmpHL4m9l/ruff to /private/tmp/ray/session_2025-08-12_13-50-56_164420_93246/runtime_resources/working_dir_files/_ray_pkg_c5fe6038018ef02d/.venv/lib/python3.12/site-packages/ruff-0.12.8.data/scripts/ruff: Invalid argument (os error 22)
(raylet) warning: `VIRTUAL_ENV=/Users/user/Documents/Blog/blog_materials/ray/ray_tasks/.venv` does not match the project environment path `.venv` and will be ignored; use `--active` to target the active environment instead[32m [repeated 4x across cluster]

結果を見ると、1~4に対して10倍して半分にした数が結果として取得できていることが確認できました。

実行中および終了済みタスクの列挙

ray.getを利用すると指定されたリファレンスの結果が取得するまでブロッキングされます。ユースケースとしてはそれでいい場合もあると思いますが、中には実行完了したものと実行中のリファレンスを取得したいというケースもあるかもしれません。そのような場合に実行完了しているものと実行中のリファレンスを取得するにはray.waitを使うことで実現できます。

wait_sleep.py
import ray
import time


@ray.remote
def sleep(seconds: int):
    time.sleep(seconds)


sleep_seconds = [1, 3, 20, 30]
obj_refs = []

for sleep_second in sleep_seconds:
    obj_refs.append(sleep.remote(sleep_second))


ready_refs, remain_refs = ray.wait(obj_refs, num_returns=2, timeout=None)
print(f"{ready_refs=}")
print(f"{remain_refs=}")

以下の部分で完了したタスクと実行中のリファレンスを取得しています。num_returns=2にして完了すみのリファレンスが2つ揃うことを指定しtimeout=Noneにしているのでこの条件を満たすまではブロックされます。

(uv run wait_sleep.py) 2>&1 | tee /dev/tty | pbcopy

# 結果
2025-08-12 14:07:55,560	INFO worker.py:1918 -- Started a local Ray instance. View the dashboard at [1m[32mhttp://127.0.0.1:8265 [39m[22m
2025-08-12 14:07:55,568	INFO packaging.py:588 -- Creating a file package for local module '/Users/user/Documents/Blog/blog_materials/ray/ray_tasks'.
2025-08-12 14:07:55,579	INFO packaging.py:380 -- Pushing file package 'gcs://_ray_pkg_6e889ee11095f0e5.zip' (0.21MiB) to Ray cluster...
2025-08-12 14:07:55,580	INFO packaging.py:393 -- Successfully pushed file package 'gcs://_ray_pkg_6e889ee11095f0e5.zip'.
(raylet) warning: `VIRTUAL_ENV=/Users/user/Documents/Blog/blog_materials/ray/ray_tasks/.venv` does not match the project environment path `.venv` and will be ignored; use `--active` to target the active environment instead
(raylet) Using CPython 3.12.9
(raylet) Creating virtual environment at: .venv
(raylet) Installed 65 packages in 2.07s
(raylet) error: Failed to install: py_spy-0.4.1-py2.py3-none-macosx_10_12_x86_64.macosx_11_0_arm64.macosx_10_12_universal2.whl (py-spy==0.4.1)
(raylet)   Caused by: failed to query metadata of file `/private/tmp/ray/session_2025-08-12_14-07-53_871455_22901/runtime_resources/working_dir_files/_ray_pkg_6e889ee11095f0e5/.venv/lib/python3.12/site-packages/py_spy-0.4.1.data/scripts/py-spy`: No such file or directory (os error 2)
(raylet)   Caused by: failed to rename file from /private/tmp/ray/session_2025-08-12_14-07-53_871455_22901/runtime_resources/working_dir_files/_ray_pkg_6e889ee11095f0e5/.venv/lib/python3.12/site-packages/.tmprAxYTs/ruff to /private/tmp/ray/session_2025-08-12_14-07-53_871455_22901/runtime_resources/working_dir_files/_ray_pkg_6e889ee11095f0e5/.venv/lib/python3.12/site-packages/ruff-0.12.8.data/scripts/ruff: Invalid argument (os error 22)
ready_refs=[ObjectRef(c8ef45ccd0112571ffffffffffffffffffffffff0100000001000000), ObjectRef(16310a0f0a45af5cffffffffffffffffffffffff0100000001000000)]
remain_refs=[ObjectRef(c2668a65bda616c1ffffffffffffffffffffffff0100000001000000), ObjectRef(32d950ec0ccf9d2affffffffffffffffffffffff0100000001000000)]
(raylet) warning: `VIRTUAL_ENV=/Users/user/Documents/Blog/blog_materials/ray/ray_tasks/.venv` does not match the project environment path `.venv` and will be ignored; use `--active` to target the active environment instead[32m [repeated 3x across cluster] (Ray deduplicates logs by default. Set RAY_DEDUP_LOGS=0 to disable log deduplication, or see https://docs.ray.io/en/master/ray-observability/user-guides/configure-logging.html#log-deduplication for more options.)
(raylet) Installed 65 packages in 4.80s
(raylet) error: Failed to install: ruff-0.12.8-py3-none-macosx_11_0_arm64.whl (ruff==0.12.8)

結果を見ると、以下のように完了済み、実行中それぞれの結果が取得できています。

ready_refs=[ObjectRef(c8ef45ccd0112571ffffffffffffffffffffffff0100000001000000), ObjectRef(16310a0f0a45af5cffffffffffffffffffffffff0100000001000000)]
remain_refs=[ObjectRef(c2668a65bda616c1ffffffffffffffffffffffff0100000001000000), ObjectRef(32d950ec0ccf9d2affffffffffffffffffffffff0100000001000000)]

複数結果の戻り値の取り扱い

関数によっては戻り値が複数あるようなものもありますが、そのような場合にそれぞれに対応するような形でリファレンスに登録することができます。

multiple_returns.py
import ray


@ray.remote(num_returns=3)
def return_multiple():
    return 1, 2, 3


one_ref = return_multiple.remote()
print(ray.get(one_ref))

first_ref, second_ref, third_ref = return_multiple.remote()
print(ray.get(first_ref), ray.get(second_ref), ray.get(third_ref))

以下のように@ray.remotenum_returnsを設定することで、戻り値のリファレンスの個数を指定します。今回の例ではnum_returns=3にしているので3つの戻り値があるとしていますが、受け取るリファレンスは一つにまとめても3つそれぞれにしてもどちらでも大丈夫です。

@ray.remote(num_returns=3)

それでは実行してみます。

(uv run multiple_returns.py) 2>&1 | tee /dev/tty | pbcopy

# 結果
2025-08-12 14:14:51,219	INFO worker.py:1918 -- Started a local Ray instance. View the dashboard at [1m[32mhttp://127.0.0.1:8265 [39m[22m
2025-08-12 14:14:51,228	INFO packaging.py:588 -- Creating a file package for local module '/Users/user/Documents/Blog/blog_materials/ray/ray_tasks'.
2025-08-12 14:14:51,240	INFO packaging.py:380 -- Pushing file package 'gcs://_ray_pkg_e3f57d7d767b63f8.zip' (0.21MiB) to Ray cluster...
2025-08-12 14:14:51,241	INFO packaging.py:393 -- Successfully pushed file package 'gcs://_ray_pkg_e3f57d7d767b63f8.zip'.
[33m(raylet)[0m warning: `VIRTUAL_ENV=/Users/user/Documents/Blog/blog_materials/ray/ray_tasks/.venv` does not match the project environment path `.venv` and will be ignored; use `--active` to target the active environment instead
[33m(raylet)[0m Using CPython 3.12.9
[33m(raylet)[0m Creating virtual environment at: .venv
[33m(raylet)[0m Installed 65 packages in 181ms
[1, 2, 3]
1 2 3

最後の部分を見ると、全てまとめた場合はリストとして受け取っていますが、三つに分けた場合はそれぞれに値が一つずつ含まれるようになっています。この使い方をすることで、後続処理に受け渡す情報を選別することができるので便利だと思います。

ジョブのキャンセル

ユースケースとして、ジョブを途中で終了させたい時があるかと思います。その場合、リファレンスに対してray.cancelを実行することで対応できます。

cancel_job.py
import ray
import time


@ray.remote
def too_long_sleep():
    time.sleep(10e6)


obj_ref = too_long_sleep.remote()
ray.cancel(obj_ref)

try:
    res = ray.get(obj_ref)
except ray.exceptions.TaskCancelledError:
    print("Object reference was cancelled.")

以下のようにしてray.cancel(obj_ref)を呼び出すことで実行をキャンセルします。実行がキャンセルされたリファレンスに対して結果を取得しようとしてもエラーになるため、try/exceptを利用してそれを検知しています。

ray.cancel(obj_ref)

try:
    res = ray.get(obj_ref)
except ray.exceptions.TaskCancelledError:
    print("Object reference was cancelled.")

それでは実行してみましょう。

(uv run cancel_job.py) 2>&1 | tee /dev/tty | pbcopy

# 結果
2025-08-12 14:20:13,896	INFO worker.py:1918 -- Started a local Ray instance. View the dashboard at [1m[32mhttp://127.0.0.1:8265 [39m[22m
2025-08-12 14:20:13,903	INFO packaging.py:588 -- Creating a file package for local module '/Users/user/Documents/Blog/blog_materials/ray/ray_tasks'.
2025-08-12 14:20:13,912	INFO packaging.py:380 -- Pushing file package 'gcs://_ray_pkg_ee23390dcb83bcb6.zip' (0.21MiB) to Ray cluster...
2025-08-12 14:20:13,913	INFO packaging.py:393 -- Successfully pushed file package 'gcs://_ray_pkg_ee23390dcb83bcb6.zip'.
Object reference was cancelled.

まとめ

今回はRay Tasksについて説明しました。Rayのコアコンセプトの一つしとて定義されているTasksを利用することで、簡単に並列計算を実現できるだけでなく、実行結果の取得の受け渡しの便利さなども恩恵を受けることができます。みなさんもぜひ調べてみてください。

Discussion