Databricks上でAIモデルの分散バッチ推論を実行する ~OpenVINO編~
Databricks上でAIモデルの分散バッチ推論を実行する ~OpenVINO編~
Databricksの素晴らしい点は、OSSフレンドリーな点だと思います。
どういうことかというと、Databricks Runtimeには様々なライブライリーが事前インストールされているので、そのまますぐに使い始められますが、Runtimeに含まれていないライブラリーも大抵は簡単に導入できます。なので例えば、オンプレミスやクラウド上に既存のデータ分析環境やAI開発環境があり、その環境が様々なライブラリーで構成されているとしても、その既存環境をDatabricksへ移行するのは多くの場合そんなに難しくないです。まずは論より証拠ということで、今回はDatabricks Runtimeに2023年10月27日時点で事前インストールされていないライブラリの一つである、OpenVINO Toolkitを使って、AIモデルの分散バッチ処理を高速化していこうと思います。
なお、本記事にはこちらの記事の続きになります。一部、重複箇所は割愛しますので、必要に応じて前回記事をご参照ください。
環境
本ブログではAzure Databricksを用いていますが、AWS版、GCP版のいずれでも再現可能かと思います。
- Databricks Runtime: 13.3 LTS ML (includes Apache Spark 3.4.1, Scala 2.12, PyTorch 1.13.1 pre-installed)
- ノードタイプ: Azure Standard D16s_V5(ドライバー1台、ワーカー4台)
- 今回追加するライブラリー: OpenVINO Toolkit 2023.1
モデルおよびデータセット
前回と同様に以下の通りです。
- モデル:Torchvision ResNet50
- データセット:TensorFlowチームによるflowers dataset
- Databricks Datasetsの
dbfs:/databricks-datasets/flower_photos
の下に格納されています。
- Databricks Datasetsの
検証に使用したソースコード
フルバージョンはこちらをご参照ください。
OpenVINO Toolkitとは
OpenVINOは「Open Visual Inference and Neural Network Optimization」の略で、インテル社によって開発されたオープンソースのツールキットです。このツールキットは、ディープラーニングモデルを最適化し、インテル・ハードウェア・プラットフォーム上でのデプロイを容易にする「一度書いて、どこでもデプロイ」のアプローチを採用しています。OpenVINOは機械学習ソリューションの開発を加速させ、特に畳み込みニューラルネットワーク(CNN)を利用して、インテル・ハードウェア(アクセラレータを含む)上でのパフォーマンスを最大化します。
では、Databricksノートブックからインストールをしていきます。
%pip install openvino
念のため、Pythonランタイムを再起動。
dbutils.library.restartPython()
シングルノード上での推論
ベースラインとして、シングルノード上(ノードタイプは"環境"に記載のものと同様)で推論処理を実行しました。
- Databricks-datasetからのデータのロード
dataset_dir = "/dbfs/databricks-datasets/flower_photos/"
output_file_path = "/tmp/predictions"
files = [os.path.join(dp, f) for dp, dn, filenames in os.walk(dataset_dir) for f in filenames if os.path.splitext(f)[1] == '.jpg']
print(f'画像ファイルの総数は {len(files)} 枚です。')
ちなみに画像枚数は合計3,670枚です。
- モデル変換(モデルの最適化)
OpenVINOの推論エンジン上でモデルを実行するには、モデルをOpenVINOの独自形式(IR)に変換する必要があります。今回のモデル(Resnet50)はPyTorchの形式なので、以下の様にして変換を実施します。
import openvino as ov
def get_model_for_eval():
"""Gets the broadcasted model."""
model = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)
model.eval()
# OpenVINO Coreオブジェクトのインスタンスを作成する
core = ov.Core()
# モデルをopenvino.runtime.Modelオブジェクトに変換する
ov_model = ov.convert_model(model)
# デバイスにOpenVINOモデルをロードする
compiled_model = core.compile_model(ov_model, 'CPU')
return model
- 推論実行
以下のコードで推論を実行していきます。
transform = transforms.Compose([
transforms.Resize(224),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
])
predictions = []
for image_path in files:
image = default_loader(image_path)
image = transform(image)
batch = image.unsqueeze(0)
prediction = model(batch)[0]
class_id = prediction.argmax(axis=1)
score = prediction[np.arange(prediction.shape[0]), class_id]
predictions.append((class_id, score))
print(list(zip(files, predictions)))
総処理時間は私の環境で 2分17秒 でした。
前回が23分ほどなので、この時点で既に10倍ほど高速化しています。この辺りはさすがOpenVINOといったところですが、ひとまずこちらをベースラインとして、次はSparkクラスターで検証していきます。
ワーカーノード4台を使用して分散モデル推論 + Pandas UDFを使用
前回の検証では前振りとして、性能の出ないPython UDFを使いましたが、今回はいきなり本命のPandas UDFを使います。
改めて、Pandas UDFに関する詳細はぜひ以下のブログをご覧ください。
個人的には現時点で最も分かりやすくPandas UDFの優位点を記載してくれていると思います。
コードにいきましょう。
- (前回と同様に)明示的にArrowの使用をOnにする
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
- Pandas UDFに一度に渡すレコード数の最大値(バッチあたりの最大レコード数)を指定
今回は512を設定しています。1024でも良いかと思います。なお、最大レコード数を大きくすると、レコードがメモリに収まる場合に限り、UDFを呼び出すためのI/Oオーバーヘッドを減らすことができます。
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "512")
- カスタム PyTorch データセットクラスを作成
以下のドキュメントの通りですが、パフォーマンスチューニングのヒントとして、PyTorchであれば、データロード用にtorch.utils.data.DataLoader
の使用が推奨されているので、そのお作法に則ります。
- https://docs.databricks.com/en/machine-learning/model-inference/dl-model-inference.html
- https://docs.databricks.com/en/machine-learning/model-inference/model-inference-performance.html
class ImageDataset(Dataset):
def __init__(self, paths, transform=None):
self.paths = paths
self.transform = transform
def __len__(self):
return len(self.paths)
def __getitem__(self, index):
image = default_loader(self.paths[index])
if self.transform is not None:
image = self.transform(image)
return image
- モデル推論のためのPandas UDFを定義
前述した通り、今回のサンプル画像データは全部で3670枚で、ワーカーノードの数と同じ4つのパーティションに分割しているので、各ワーカーノードが910〜920枚ほどの画像を処理します。その中から512枚の画像(正確には画像パス)を取り出してきて、Pandas UDFにpandas.Seriesデータとして入力します。Pandas UDF内では、その512個の画像パスからバッチサイズごとに画像パスを取り出し、当該画像ファイルをロードしてTensor化して、それをモデルで推論することを繰り返します。すべての画像パスの推論が終了したら、推論結果から欲しい情報を取り出した上で、それをPandas.Seriesにパックして、返します。
つまり、各ワーカーノードにおいて、Pandas UDFが呼び出される回数は今回の場合、2回です。したがって、UDFの前半のモデルのロード処理などにかかる時間を極小化できます。
(ちなみに、spark.sql.execution.arrow.maxRecordsPerBatch
を1024などに設定すればUDFの呼び出し回数は一回で済みます。)
@pandas_udf(ArrayType(FloatType()))
def predict_batch_udf(paths: pd.Series) -> pd.Series:
transform = transforms.Compose([
transforms.Resize(224),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
])
images = ImageDataset(paths, transform=transform)
loader = torch.utils.data.DataLoader(images, batch_size=8, num_workers=8)
model = get_model_for_eval()
all_predictions = []
with torch.no_grad():
for batch in loader:
predictions = model(batch)[0]
class_id = predictions.argmax(axis=1)
score = predictions[np.arange(predictions.shape[0]), class_id]
for result in np.stack((class_id, score), axis=1):
all_predictions.append(result)
return pd.Series(all_predictions)
- 推論実行
predictions_df = files_df.withColumn('prediction', predict_batch_udf(col('path')))
display(predictions_df)
総処理時間は私の環境で 42秒 でした。
シングルノードに比べて約3倍ほど性能が向上しています。ノード数が4倍になったことからも、リーズナブルな結果ではないかと思います。(もう少しよくできるかも知れませんが。。)
なお、前回と同様に今回もモデル推論時のバッチサイズを、ワーカーノードの物理コア数と同じ8としています。試しにバッチサイズを1で実行したら、55秒 ほどにやや劣化しました。今回はバッチサイズを上げた方がより良い結果が得られています。
結果のまとめ
以下に結果をまとめます。
シングルノード | ワーカー4台 + Pandas UDF (推論時のバッチサイズ=8) |
---|---|
2分17秒 | 42秒 |
今回はDatabricks上でPandas UDFsとOpenVINO Toolkitを組み合わせて、分散モデル推論の性能を検証してみました。
OpenVINOはさすがインテルCPU上だと速いなといった感じですが、こういったサードパーティのOSSを柔軟に組み合わせられるDatabricksの柔軟性と汎用性あってのものだと思います。ご参考にされてください。
BFN!
参考
Deep learning model inference workflow
Deep learning model inference performance tuning guide
An Introduction to Pandas UDFs in PySpark
Spark3.0における新機能: Pandas UDFとPython型ヒント
Install Intel® Distribution of OpenVINO™ Toolkit from PyPI Repository
Convert a PyTorch Model to OpenVINO™ IR
Databricks無料トライアル
Discussion