🦁

Databricks上でAIモデルの分散バッチ推論を実行する ~OpenVINO編~

2023/10/31に公開

Databricks上でAIモデルの分散バッチ推論を実行する ~OpenVINO編~

Databricksの素晴らしい点は、OSSフレンドリーな点だと思います。
どういうことかというと、Databricks Runtimeには様々なライブライリーが事前インストールされているので、そのまますぐに使い始められますが、Runtimeに含まれていないライブラリーも大抵は簡単に導入できます。なので例えば、オンプレミスやクラウド上に既存のデータ分析環境やAI開発環境があり、その環境が様々なライブラリーで構成されているとしても、その既存環境をDatabricksへ移行するのは多くの場合そんなに難しくないです。まずは論より証拠ということで、今回はDatabricks Runtimeに2023年10月27日時点で事前インストールされていないライブラリの一つである、OpenVINO Toolkitを使って、AIモデルの分散バッチ処理を高速化していこうと思います。

なお、本記事にはこちらの記事の続きになります。一部、重複箇所は割愛しますので、必要に応じて前回記事をご参照ください。

https://zenn.dev/hiouchiy/articles/5285e405014c45

環境

本ブログではAzure Databricksを用いていますが、AWS版、GCP版のいずれでも再現可能かと思います。

  • Databricks Runtime: 13.3 LTS ML (includes Apache Spark 3.4.1, Scala 2.12, PyTorch 1.13.1 pre-installed)
  • ノードタイプ: Azure Standard D16s_V5(ドライバー1台、ワーカー4台)
  • 今回追加するライブラリー: OpenVINO Toolkit 2023.1

モデルおよびデータセット

前回と同様に以下の通りです。

  • モデル:Torchvision ResNet50
  • データセット:TensorFlowチームによるflowers dataset
    • Databricks Datasetsのdbfs:/databricks-datasets/flower_photosの下に格納されています。

検証に使用したソースコード

フルバージョンはこちらをご参照ください。

OpenVINO Toolkitとは

OpenVINOは「Open Visual Inference and Neural Network Optimization」の略で、インテル社によって開発されたオープンソースのツールキットです。このツールキットは、ディープラーニングモデルを最適化し、インテル・ハードウェア・プラットフォーム上でのデプロイを容易にする「一度書いて、どこでもデプロイ」のアプローチを採用しています。OpenVINOは機械学習ソリューションの開発を加速させ、特に畳み込みニューラルネットワーク(CNN)を利用して、インテル・ハードウェア(アクセラレータを含む)上でのパフォーマンスを最大化します。

では、Databricksノートブックからインストールをしていきます。

%pip install openvino

念のため、Pythonランタイムを再起動。

dbutils.library.restartPython()

シングルノード上での推論

ベースラインとして、シングルノード上(ノードタイプは"環境"に記載のものと同様)で推論処理を実行しました。

  1. Databricks-datasetからのデータのロード
dataset_dir = "/dbfs/databricks-datasets/flower_photos/"
output_file_path = "/tmp/predictions"
files = [os.path.join(dp, f) for dp, dn, filenames in os.walk(dataset_dir) for f in filenames if os.path.splitext(f)[1] == '.jpg']
print(f'画像ファイルの総数は {len(files)} 枚です。')

ちなみに画像枚数は合計3,670枚です。

  1. モデル変換(モデルの最適化)
    OpenVINOの推論エンジン上でモデルを実行するには、モデルをOpenVINOの独自形式(IR)に変換する必要があります。今回のモデル(Resnet50)はPyTorchの形式なので、以下の様にして変換を実施します。
import openvino as ov

def get_model_for_eval():
  """Gets the broadcasted model."""
  model = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)
  model.eval()

  # OpenVINO Coreオブジェクトのインスタンスを作成する
  core = ov.Core()

  # モデルをopenvino.runtime.Modelオブジェクトに変換する
  ov_model = ov.convert_model(model)
  
  # デバイスにOpenVINOモデルをロードする
  compiled_model = core.compile_model(ov_model, 'CPU')
  
  return model
  1. 推論実行
    以下のコードで推論を実行していきます。
transform = transforms.Compose([
  transforms.Resize(224),
  transforms.CenterCrop(224),
  transforms.ToTensor(),
  transforms.Normalize(mean=[0.485, 0.456, 0.406],
                      std=[0.229, 0.224, 0.225])
])

predictions = []
for image_path in files:
  image = default_loader(image_path)
  image = transform(image)
  batch = image.unsqueeze(0)

  prediction = model(batch)[0]
  class_id = prediction.argmax(axis=1)
  score = prediction[np.arange(prediction.shape[0]), class_id]
  predictions.append((class_id, score))

print(list(zip(files, predictions)))

総処理時間は私の環境で 2分17秒 でした。
前回が23分ほどなので、この時点で既に10倍ほど高速化しています。この辺りはさすがOpenVINOといったところですが、ひとまずこちらをベースラインとして、次はSparkクラスターで検証していきます。

ワーカーノード4台を使用して分散モデル推論 + Pandas UDFを使用

前回の検証では前振りとして、性能の出ないPython UDFを使いましたが、今回はいきなり本命のPandas UDFを使います。

改めて、Pandas UDFに関する詳細はぜひ以下のブログをご覧ください。
個人的には現時点で最も分かりやすくPandas UDFの優位点を記載してくれていると思います。

https://medium.com/@suffyan.asad1/an-introduction-to-pandas-udfs-in-pyspark-a0a512bd00e2

コードにいきましょう。

  1. (前回と同様に)明示的にArrowの使用をOnにする
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
  1. Pandas UDFに一度に渡すレコード数の最大値(バッチあたりの最大レコード数)を指定
    今回は512を設定しています。1024でも良いかと思います。なお、最大レコード数を大きくすると、レコードがメモリに収まる場合に限り、UDFを呼び出すためのI/Oオーバーヘッドを減らすことができます。
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "512")
  1. カスタム PyTorch データセットクラスを作成
    以下のドキュメントの通りですが、パフォーマンスチューニングのヒントとして、PyTorchであれば、データロード用にtorch.utils.data.DataLoaderの使用が推奨されているので、そのお作法に則ります。
class ImageDataset(Dataset):
  def __init__(self, paths, transform=None):
    self.paths = paths
    self.transform = transform
  def __len__(self):
    return len(self.paths)
  def __getitem__(self, index):
    image = default_loader(self.paths[index])
    if self.transform is not None:
      image = self.transform(image)
    return image
  1. モデル推論のためのPandas UDFを定義
    前述した通り、今回のサンプル画像データは全部で3670枚で、ワーカーノードの数と同じ4つのパーティションに分割しているので、各ワーカーノードが910〜920枚ほどの画像を処理します。その中から512枚の画像(正確には画像パス)を取り出してきて、Pandas UDFにpandas.Seriesデータとして入力します。Pandas UDF内では、その512個の画像パスからバッチサイズごとに画像パスを取り出し、当該画像ファイルをロードしてTensor化して、それをモデルで推論することを繰り返します。すべての画像パスの推論が終了したら、推論結果から欲しい情報を取り出した上で、それをPandas.Seriesにパックして、返します。
    つまり、各ワーカーノードにおいて、Pandas UDFが呼び出される回数は今回の場合、2回です。したがって、UDFの前半のモデルのロード処理などにかかる時間を極小化できます。
    (ちなみに、spark.sql.execution.arrow.maxRecordsPerBatchを1024などに設定すればUDFの呼び出し回数は一回で済みます。)
@pandas_udf(ArrayType(FloatType()))
def predict_batch_udf(paths: pd.Series) -> pd.Series:
  transform = transforms.Compose([
    transforms.Resize(224),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                       std=[0.229, 0.224, 0.225])
  ])

  images = ImageDataset(paths, transform=transform)
  loader = torch.utils.data.DataLoader(images, batch_size=8, num_workers=8)
  model = get_model_for_eval()

  all_predictions = []
  with torch.no_grad():
    for batch in loader:
      predictions = model(batch)[0]
      class_id = predictions.argmax(axis=1)
      score = predictions[np.arange(predictions.shape[0]), class_id]

      for result in np.stack((class_id, score), axis=1):
        all_predictions.append(result)

  return pd.Series(all_predictions)
  1. 推論実行
predictions_df = files_df.withColumn('prediction', predict_batch_udf(col('path')))
display(predictions_df)

総処理時間は私の環境で 42秒 でした。
シングルノードに比べて約3倍ほど性能が向上しています。ノード数が4倍になったことからも、リーズナブルな結果ではないかと思います。(もう少しよくできるかも知れませんが。。)
なお、前回と同様に今回もモデル推論時のバッチサイズを、ワーカーノードの物理コア数と同じ8としています。試しにバッチサイズを1で実行したら、55秒 ほどにやや劣化しました。今回はバッチサイズを上げた方がより良い結果が得られています。

結果のまとめ

以下に結果をまとめます。

シングルノード ワーカー4台 + Pandas UDF (推論時のバッチサイズ=8)
2分17秒 42秒

今回はDatabricks上でPandas UDFsとOpenVINO Toolkitを組み合わせて、分散モデル推論の性能を検証してみました。
OpenVINOはさすがインテルCPU上だと速いなといった感じですが、こういったサードパーティのOSSを柔軟に組み合わせられるDatabricksの柔軟性と汎用性あってのものだと思います。ご参考にされてください。

BFN!

参考

Deep learning model inference workflow
Deep learning model inference performance tuning guide
An Introduction to Pandas UDFs in PySpark
Spark3.0における新機能: Pandas UDFとPython型ヒント
Install Intel® Distribution of OpenVINO™ Toolkit from PyPI Repository
Convert a PyTorch Model to OpenVINO™ IR

Databricks無料トライアル

https://databricks.com/jp/try-databricks

Discussion