😎

Rayシリーズ:Ray Dataへの入門 ~quickstart~

に公開

今回はRayでデータを取り扱うためのRay Dataについて、Quickstartを通して入門してみました。

Ray Dataとは?

Ray Dataは、Ray上に構築されたMLとAIワークロードのためのスケーラブルなデータ処理ライブラリです。 バッチ推論やデータ前処理、MLトレーニングのためのインジェストなど、AIワークロードを表現するための柔軟で高性能なAPIを提供してくれます。他の分散データシステムとは異なり、Ray Dataはストリーミング実行を特徴としており、大規模なデータセットを効率的に処理し、CPUとGPUの両方のワークロードで高い利用率を維持します。

Ray DataはAIのワークロードに向けた実装をされており、以下のような特徴があるとのことです。

  • ディープラーニングをより速く、より安く: CPUの前処理とGPUの推論/学習タスクの間でデータをストリーミングし、リソースの利用率を最大化し、GPUをアクティブに保つことでコストを削減
  • フレームワークフレンドリー: 一般的なAIフレームワーク(vLLM、PyTorchなど)と一般的なクラウドプロバイダー(AWS、Google Cloud、Azure)とのパフォーマンスとファーストクラスの統合
  • マルチモーダルデータをサポート: Apache ArrowとPandasを活用してParquet、Lance、画像、JSON、CSV、オーディオ、ビデオなどMLワークロードで使用される多くのデータ形式をサポート
  • デフォルトでスケーラブル: CPUやGPUマシンが異なる異種クラスター間で自動的にスケーリングできるようにRayで構築されており、コードは1台のマシンから数百TBのデータを処理する数百のノードまで変更なく実行

https://docs.ray.io/en/latest/data/data.html

早速使ってみる

それでは早速Quickstartを通して入門してみます。

https://docs.ray.io/en/latest/data/quickstart.html

環境構築

uvを利用して環境構築します。

uv init ray_data_quickstart -p 3.12
cd ray_data_quickstart
uv add "ray[data]"

データの読み込みと変換

それではirisデータの読み込みと変換を実装します。

load_and_transform_data.py
from typing import Dict
import numpy as np
import ray

ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")

ds.show(limit=10)

def transform_batch(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
    vec_a = batch["petal length (cm)"]
    vec_b = batch["petal width (cm)"]
    batch["petal area (cm^2)"] = vec_a * vec_b
    return batch

transformed_ds = ds.map_batches(transform_batch)

print(transformed_ds.materialize())

まずはデータの読み込みです。irisデータはS3上で一般公開されているデータを参照するようにします。データの読み込みにはray.data.read_csvを呼び出すことでCSVを読み込めます。また、ds.showを利用することでデータを表示することができます。

ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")

ds.show(limit=10)

次にデータの変換を定義します。この例ではpetalの面積をlengthwidthから計算して新たなデータとして定義する変換を実装しています。

def transform_batch(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
    vec_a = batch["petal length (cm)"]
    vec_b = batch["petal width (cm)"]
    batch["petal area (cm^2)"] = vec_a * vec_b
    return batch

データの変換を適用するにはds.map_batchesを利用します。これを利用することにより全てのデータに適用できます。

transformed_ds = ds.map_batches(transform_batch)

最後にtransformed_ds.materialize()についてです。materialize関数はすべての遅延変換を実行し、データセットをオブジェクトストアメモリにマテリアライズします。lazy処理のため、実行されたタイミングでデータの処理が実行されるようです。

print(transformed_ds.materialize()

それでは早速実行してみます。

uv run load_and_transform_data.py

# 結果
[dataset]: Run `pip install tqdm` to enable progress reporting.
{'sepal length (cm)': 5.1, 'sepal width (cm)': 3.5, 'petal length (cm)': 1.4, 'petal width (cm)': 0.2, 'target': 0}
{'sepal length (cm)': 4.9, 'sepal width (cm)': 3.0, 'petal length (cm)': 1.4, 'petal width (cm)': 0.2, 'target': 0}
{'sepal length (cm)': 4.7, 'sepal width (cm)': 3.2, 'petal length (cm)': 1.3, 'petal width (cm)': 0.2, 'target': 0}
{'sepal length (cm)': 4.6, 'sepal width (cm)': 3.1, 'petal length (cm)': 1.5, 'petal width (cm)': 0.2, 'target': 0}
{'sepal length (cm)': 5.0, 'sepal width (cm)': 3.6, 'petal length (cm)': 1.4, 'petal width (cm)': 0.2, 'target': 0}
{'sepal length (cm)': 5.4, 'sepal width (cm)': 3.9, 'petal length (cm)': 1.7, 'petal width (cm)': 0.4, 'target': 0}
{'sepal length (cm)': 4.6, 'sepal width (cm)': 3.4, 'petal length (cm)': 1.4, 'petal width (cm)': 0.3, 'target': 0}
{'sepal length (cm)': 5.0, 'sepal width (cm)': 3.4, 'petal length (cm)': 1.5, 'petal width (cm)': 0.2, 'target': 0}
{'sepal length (cm)': 4.4, 'sepal width (cm)': 2.9, 'petal length (cm)': 1.4, 'petal width (cm)': 0.2, 'target': 0}
{'sepal length (cm)': 4.9, 'sepal width (cm)': 3.1, 'petal length (cm)': 1.5, 'petal width (cm)': 0.1, 'target': 0}
MaterializedDataset(
   num_blocks=16,
   num_rows=150,
   schema={
      sepal length (cm): double,
      sepal width (cm): double,
      petal length (cm): double,
      petal width (cm): double,
      target: int64,
      petal area (cm^2): double
   }
)

結果をみると、まずds.show(limit=10)の実行によりデータのうち10サンプルが表示されています。

データの取得

それでは先ほどのコードを編集して、データをバッチ取得する実装を追加してみます。

get_batch.py
from typing import Dict
import numpy as np
import ray

ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")

def transform_batch(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
    vec_a = batch["petal length (cm)"]
    vec_b = batch["petal width (cm)"]
    batch["petal area (cm^2)"] = vec_a * vec_b
    return batch

transformed_ds = ds.map_batches(transform_batch)
print(transformed_ds.take_batch(batch_size=3))

先ほどとの変更点ですが、以下のようにしてデータをバッチで取得する部分の追加になります。batch_sizeを指定することで各絡むごとに指定したデータを抽出できます。

print(transformed_ds.take_batch(batch_size=3))

これを実行すると以下のようになります。

uv run get_batch.py

# 結果
[dataset]: Run `pip install tqdm` to enable progress reporting.
{'sepal length (cm)': array([5.1, 4.9, 4.7]), 'sepal width (cm)': array([3.5, 3. , 3.2]), 'petal length (cm)': array([1.4, 1.4, 1.3]), 'petal width (cm)': array([0.2, 0.2, 0.2]), 'target': array([0, 0, 0]), 'petal area (cm^2)': array([0.28, 0.28, 0.26])}

結果をみると絡むごとにarray([...])のような形式で取得できており、例えばsepal length (cm)array([5.1, 4.9, 4.7])のように3つのデータになっています。

データの保存

それでは変換されたデータをparquetファイルで保存します。

save_as_parquet.py
from typing import Dict
import numpy as np
import ray

ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")

ds.show(limit=10)

def transform_batch(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
    vec_a = batch["petal length (cm)"]
    vec_b = batch["petal width (cm)"]
    batch["petal area (cm^2)"] = vec_a * vec_b
    return batch

transformed_ds = ds.map_batches(transform_batch)
transformed_ds.write_parquet("./iris")

transformed_ds.write_parquetを利用することでparquetファイルで保存できます。引数にはデータを保存するディレクトリを指定します。それではこちらを実行してみましょう。

uv run save_as_parquet.py

実行してirisフォルダを確認すると、以下のように複数のファイルに分割して保存されていることが確認できます。

ll iris 

# 結果
.rw-r--r--@ 2.3k user 25 Oct 21:01 2_e1420b89875e4dc783b8db1081a37ee8_000000_000000-0.parquet
.rw-r--r--@ 2.3k user 25 Oct 21:01 2_e1420b89875e4dc783b8db1081a37ee8_000001_000000-0.parquet
.rw-r--r--@ 2.3k user 25 Oct 21:01 2_e1420b89875e4dc783b8db1081a37ee8_000002_000000-0.parquet
.rw-r--r--@ 2.3k user 25 Oct 21:01 2_e1420b89875e4dc783b8db1081a37ee8_000003_000000-0.parquet
.rw-r--r--@ 2.3k user 25 Oct 21:01 2_e1420b89875e4dc783b8db1081a37ee8_000004_000000-0.parquet
.rw-r--r--@ 2.4k user 25 Oct 21:01 2_e1420b89875e4dc783b8db1081a37ee8_000005_000000-0.parquet
.rw-r--r--@ 2.3k user 25 Oct 21:01 2_e1420b89875e4dc783b8db1081a37ee8_000006_000000-0.parquet
.rw-r--r--@ 2.4k user 25 Oct 21:01 2_e1420b89875e4dc783b8db1081a37ee8_000007_000000-0.parquet
.rw-r--r--@ 2.3k user 25 Oct 21:01 2_e1420b89875e4dc783b8db1081a37ee8_000008_000000-0.parquet
.rw-r--r--@ 2.3k user 25 Oct 21:01 2_e1420b89875e4dc783b8db1081a37ee8_000009_000000-0.parquet
.rw-r--r--@ 2.4k user 25 Oct 21:01 2_e1420b89875e4dc783b8db1081a37ee8_000010_000000-0.parquet
.rw-r--r--@ 2.4k user 25 Oct 21:01 2_e1420b89875e4dc783b8db1081a37ee8_000011_000000-0.parquet
.rw-r--r--@ 2.4k user 25 Oct 21:01 2_e1420b89875e4dc783b8db1081a37ee8_000012_000000-0.parquet
.rw-r--r--@ 2.3k user 25 Oct 21:01 2_e1420b89875e4dc783b8db1081a37ee8_000013_000000-0.parquet
.rw-r--r--@ 2.3k user 25 Oct 21:01 2_e1420b89875e4dc783b8db1081a37ee8_000014_000000-0.parquet
.rw-r--r--@ 2.3k user 25 Oct 21:01 2_e1420b89875e4dc783b8db1081a37ee8_000015_000000-0.parquet

まとめ

今回はRay DataのQuickstartを通して、Rayでデータを扱う方法をみてみました。使い勝手はpandasを使うような感じで扱えると思います。RayでMLを分散学習する時などはデータを取り扱うことも多くなるので、Ray Dataをどんどん利用できるようにキャッチアップ進めようと思います。

Discussion