📌

Databricks上でBERTをファインチューニングしてサービングする

2024/02/26に公開

Databricks上でBERTのファインチューンからサービングまでの流れを記載します。
同様のドキュメントは複数あるのですが、なぜかサービングまでカバーしているものが見当たらなかったので、せっかくならばと筆を取りました。なお、今回ベースとして使うモデルはHuggingFaceに登録されているtohoku-nlp/bert-base-japanese-v3です。これを用いたテキスト分類器をlivedoor ニュースコーパスのデータセットを使用して作ろうと思います。

環境

前提

今回用いるデータセットlivedoor ニュースコーパスは、Rondhuitのサイトから入手できます。かつ、すでに事前のデータ処理が終了し、Unity Catalog内に以下のテーブル(Delta Table)として登録されているものとします。(フルのソースコードでは、このデータを作るところも含めてやっているのですが、今回の主旨ではないので割愛)

ちなみに、各カラムの意味はこちら。

  • label: 記事のカテゴリ(ラベル)
  • label_name: 記事のカテゴリ名
  • file_path: 記事が格納されていたファイルパス
  • text: 記事本文

データロードと加工

学習用データ(Delta Table)をロードし、HuggingFace Datasetsフォーマットに変換

Delta Tableからデータを読み、HuggingFaceのDatasetsフォーマットへ変換します。

dataset_df = spark.read.table(table_name_full_path)
(train_df, test_df) = dataset_df.persist().randomSplit([0.8, 0.2], seed=47)

import datasets
train_dataset = datasets.Dataset.from_spark(train_df, cache_dir="/dbfs/cache/train")
test_dataset = datasets.Dataset.from_spark(test_df, cache_dir="/dbfs/cache/test")

from_spark()メソッドを利用することで、簡単に変換できます。この際返り値として得られる、DatasetオブジェクトはArrowテーブルのラッパーで、データセット内の配列からPyTorch、TensorFlow、JAXテンソルを高速に読み込むことができます。また、Arrowテーブルはディスクからメモリマッピングされるため、 使用可能なRAMよりも大きなデータセットを読み込む ことができます。

加えて、ここではcache_dirパラメーターを付けて、DBFSのパスを指定しています。これをつけるとデータが当該パスにキャッシュされるので、次回以降from_spark()を読んだ際はそのキャッシュからデータが読まれます。このパスはドライバーノード、ワーカーノードともにアクセスできる場所が望ましいので、DBFSのパスを指定すると良いです。

テキスト列からトークン列を作成(最終的にテキスト列は削除)

データセットの中の「text」列の文章をトークン化し、それを新たな列として追加します。新たな列の名称が指定されていませんが、input_idstoken_type_idsattention_maskなどが数列がデフォルトで追加されます。かつ、「text」列は今後の処理では不要なので削除してしまいます。

from transformers import AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained(base_model)

def tokenize_function(examples):
    return tokenizer(
        examples["text"], 
        max_length=max_length,
        padding='max_length', 
        truncation=True)

train_tokenized = train_dataset.map(tokenize_function, batched=True).remove_columns(["text"])
test_tokenized = test_dataset.map(tokenize_function, batched=True).remove_columns(["text"])
train_dataset = train_tokenized.shuffle(seed=47)
test_dataset = test_tokenized.shuffle(seed=47)

なお、こちらがトークン列(群)追加後のデータセットの中身です。(ランダムに1レコード取り出して表示しています)

{'label': 4,
 'label_name': 'movie-enter',
 'file_path': '/databricks/driver/text/movie-enter/movie-enter-6909318.txt',
 'input_ids': [2, 687, 8912, 32, 14414,・・・],
 'token_type_ids': [0, 0, 0, 0, 0,・・・],
 'attention_mask': [1, 1, 1, 1, 1,・・・]}

モデル学習(ファインチューニング)

いよいよここからモデルの学習です。今回はHuggigFaceのTrainerクラスを使用してファインチューニングしていきます。Trainerクラスのおかげで学習用コードの記述量がだいぶ削減できます。

評価メトリクスの設定

まずは評価メトリクスを定義します。今回はAccuracyを使用します。

import numpy as np
import evaluate

metric = evaluate.load("accuracy")
def compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = np.argmax(logits, axis=-1)
    return metric.compute(predictions=predictions, references=labels)

学習用パラメーターの設定

続いて学習用パラメーターの設定です。TrainingArgumentsクラスにデフォルト値以外で使用したいパラメータをセットしていきます。パラメーターは膨大な種類があるので気をつけましょう。
今回は4つのパラメーターを変更しています。

  • Epoch数:デフォルト「3」→「5」
  • 学習時バッチサイズ:デフォルト「8」→「12」
  • 検証時バッチサイズ:デフォルト「8」→「32」
  • ログ書き込み用フォルダ:デフォルト「無し」→「適当なパス」
from transformers import TrainingArguments

training_output_dir = f"{tutorial_path}/bert_trainer"
training_args = TrainingArguments(
  output_dir=training_output_dir, 
  logging_dir = f"{tutorial_path}/logs",    # TensorBoard用にログを記録するディレクトリ
  evaluation_strategy="epoch",
  num_train_epochs=5)

# TrainingArgumentsオブジェクト作成後も以下のようにパラメータは変更可能
training_args.set_dataloader(train_batch_size=12, eval_batch_size=32)

モデルとトークナイザーのダウンロード

お馴染みですね。今回ベースモデルとして使用するモデルと対応するトークナイザーをダウンロードします。今回は8クラスの分類問題を解きたいのでnum_labels8 にセットします。また、ラベルのインデックスと名称を対応づけるマップとしてlabel2idid2labelもセットします。

from transformers import AutoTokenizer, AutoModelForSequenceClassification

tokenizer = AutoTokenizer.from_pretrained(base_model)

model = AutoModelForSequenceClassification.from_pretrained(
  base_model, 
  num_labels=8, 
  label2id=label2id, 
  id2label=id2label)

Trainerオブジェクトの作成

Trainerオブジェクトを作成します。ここで、data_collatorというデータロード時に何らかの加工を行うためのオブジェクトも用意します。ここでは、Pre-defineされているDataCollatorWithPaddingクラスを用います。これを用いると、ミニバッチロード時に、トークンの最大長に到達するまで、隙間をパディングしてくれます。正直、上のトークン化の処理でパディングも一緒やっているためあまり意味はないのですが、他にも処理があれば同様にDataCollatorを使用することになるので参考までに使ってみました。

from transformers import DataCollatorWithPadding
from transformers import Trainer

data_collator = DataCollatorWithPadding(tokenizer)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=test_dataset,
    compute_metrics=compute_metrics,
    data_collator=data_collator,
)

学習開始&MLFlowでTracking

では、学習を開始します。
学習はMLFlowのTracking機能を使用して、パラメーターやメトリックを記録しながら進めます。今回パラメーターやメトリクスはTrainerオブジェクトに包含されているのですが、with mlflow.start_run() as run:を頭につけることにより、必要なデータは全て自動で記録されます。学習が終了したらモデルを一旦保存し、再度読み込んでからパイプライン化し、それをMLFlowのTransformersフレーバーを使用してMLFlow Trackingに記録していきます。

from transformers import pipeline

model_output_dir = f"{tutorial_path}/trained_model"
pipeline_output_dir = f"{tutorial_path}/trained_pipeline"

with mlflow.start_run() as run:
  
  # 学習開始。学習のメトリックが自動的にMLFLowにロギングされる
  trainer.train()

  # 学習終了後にモデルを保存
  trainer.save_model(model_output_dir)
  
  # 学習済みモデルを読み込んでパイプライン化して、更に保存。
  bert = AutoModelForSequenceClassification.from_pretrained(model_output_dir)

  pipe = pipeline(
    "text-classification", 
    model=bert, 
    batch_size=1, 
    tokenizer=tokenizer,
    device=0)
  pipe.save_pretrained(pipeline_output_dir)
  
  # MLFlow Trackingにパイプラインを記録する。
  mlflow.transformers.log_model(
    transformers_model=pipe, 
    artifact_path=model_artifact_path+"_CPU", 
    input_example=["これはサンプル1です。", "これはサンプル2です。"],
    pip_requirements=["torch", "transformers", "accelerate", "sentencepiece", "datasets", "evaluate", "fugashi", "ipadic", "unidic-lite"],
    model_config={ 
      "max_length": max_length, 
      "padding": "max_length", 
      "truncation": True 
    }
  )

が、しかし、実はこのまま後続の処理を続けて、モデルをGPUインスタンス上でエンドポイント化しても、推論時にGPUを使ってくれません(恐らくTransformersフレーバーの仕様だと思うのですが、詳細が分かり次第追記します)。
いずれにしてもGPUを使わせるためには、上記のコードを書き変える必要があります。以下のサンプルが参考になります。

https://github.com/ahdbilal/Databricks-GPU-Serving-Examples/blob/main/examples/measuring-GPU-utilization.py

GPUを使用するように書き直したコード
from transformers import pipeline

from mlflow.models.signature import ModelSignature
from mlflow.types import DataType, Schema, ColSpec

import mlflow
import torch

class TextClassificationPipelineModel(mlflow.pyfunc.PythonModel):
  
  def __init__(self, model, tokenizer):
    device = 0 if torch.cuda.is_available() else -1
    self.pipeline = pipeline(
      "text-classification", 
      model=model, 
      tokenizer=tokenizer,
      batch_size=1,
      device=device)
    self.tokenizer = tokenizer
    
  def predict(self, context, model_input): 
    messages = model_input["text"].to_list()
    answers = self.pipeline(messages, max_length=max_length, padding='max_length', truncation=True)

    label_list = []
    score_list = []
    for answer in answers:
      label_list.append(answer['label'])
      score_list.append(str(answer['score']))

    return {"label": label_list, "score": score_list}

model_output_dir = f"{tutorial_path}/trained_model"
pipeline_output_dir = f"{tutorial_path}/trained_pipeline"

with mlflow.start_run() as run:
  
  # 学習開始。学習のメトリックが自動的にMLFLowにロギングされる
  trainer.train()

  # 学習終了後にモデルを保存
  trainer.save_model(model_output_dir)
  
  # 学習済みモデルを読み込んでパイプライン化して、更に保存。
  bert = AutoModelForSequenceClassification.from_pretrained(model_output_dir)

  # エンドポイントの入力と出力のスキーマを定義
  input_schema = Schema([ColSpec(DataType.string, "text")])
  output_schema = Schema([ColSpec(DataType.string, "label"), ColSpec(DataType.double, "score")])
  signature = ModelSignature(inputs=input_schema, outputs=output_schema)

  # 入力データのサンプルを用意
  input_example = pd.DataFrame({"text": ["これはサンプル1です。", "これはサンプル2です。"]})
  
  # モデルをMLFlow Trackingに記録
  mlflow.pyfunc.log_model(
      artifact_path=model_artifact_path,
      python_model=TextClassificationPipelineModel(bert, tokenizer),
      pip_requirements=["torch", "transformers", "accelerate", "sentencepiece", "datasets", "evaluate", "fugashi", "unidic-lite"],
      input_example=input_example,
      signature=signature
  )

コードが長くなってしまいましたね。。。ただ、ご安心ください、そんなに難しいことはしておりません。
実装のポイントはmlflow.pyfunc.PythonModelクラスを継承したモデルのラッパークラスTextClassificationPipelineModelを作る点です。このクラス内でモデルを明示的にGPUへロードし、GPU上での推論を実行させます。そしてこのTextClassificationPipelineModelクラスをオブジェクト化したものをMLFlow Trackingへ記録します。

モデルのデプロイ

ここから先はモデルをエンドポイントとしてデプロイする手順です。実はこの手順はGUIでもできますが、再現性確保のためにコードベースのものをお見せします。

モデルをMLFlow Model Registryに登録

まず先ほどMLFlow Trackingに記録したモデルをMLFlow Model Registryに登録します。2024年2月26日現在、Model RegistryはワークスペーススコープのものとUnity Catalogスコープのものと2種類ありますが、今後はUnity Catalogが主流となるので、今回もUnity Catalog内のModel Registryへ登録します。

import mlflow
mlflow.set_registry_uri('databricks-uc') # Unity CatalogのModel Registryへ登録するための設定

result = mlflow.register_model(
    "runs:/"+run.info.run_id+f"/{model_artifact_path}",
    registered_model_full_path,
)

# MLFlow Model Registryへ登録されている特定のモデルのバージョンにAliasを設定
from mlflow import MlflowClient
client = MlflowClient()

client.set_registered_model_alias(
  name=registered_model_full_path, 
  alias="Champion", 
  version=result.version
)

Registry登録後はモデルをバージョン管理できるようになります。今回が初めての登録であれば自動でバージョン「1」が付与されます。そして、今後同じ名前(registered_model_full_path)でモデルが登録されれば、自動でバージョン番号がインクリメントされていきます。
加えて、特定のバージョンに適当なアライアスをつける事も可能です。このサンプルでは、最新バージョンへChampionというアライアスを付けています。他にもStagingProductionなどプロジェクトで定義された任意の名前をつけることができます。

モデルをエンドポイントとしてデプロイ

Model Registory内の任意のバージョンのモデルをサービングエンドポイントとしてデプロイします。サービングインフラはサーバーレスなので、細かいことを意識する必要はありませんが、workload_typeを"GPU_SMALL"(NVIDIA GPU T4ベース)、workload_sizeを"Small"(4コンカレンシー)くらいは指定する必要があります。20~30分待つとエンドポイントが作成完了し、ステータスが Ready となります。

databricks_url = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().getOrElse(None)
token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().getOrElse(None)

# サービングエンドポイントの作成または更新
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.serving import EndpointCoreConfigInput, ServedModelInput

model_version = result  # mlflow.register_modelの返された結果

serving_endpoint_name = endpoint_name
latest_model_version = model_version.version
model_name = model_version.name

w = WorkspaceClient()
endpoint_config = EndpointCoreConfigInput(
    name=serving_endpoint_name,
    served_models=[
        ServedModelInput(
            model_name=model_name,
            model_version=latest_model_version,
            workload_type="GPU_SMALL",
            workload_size="Small",
            scale_to_zero_enabled=False
        )
    ]
)

existing_endpoint = next(
    (e for e in w.serving_endpoints.list() if e.name == serving_endpoint_name), None
)
serving_endpoint_url = f"{databricks_url}/ml/endpoints/{serving_endpoint_name}"
if existing_endpoint == None:
    print(f"Creating the endpoint {serving_endpoint_url}, this will take a few minutes to package and deploy the endpoint...")
    w.serving_endpoints.create_and_wait(name=serving_endpoint_name, config=endpoint_config)
else:
    print(f"Updating the endpoint {serving_endpoint_url} to version {latest_model_version}, this will take a few minutes to package and deploy the endpoint...")
    w.serving_endpoints.update_config_and_wait(served_models=endpoint_config.served_models, name=serving_endpoint_name)
    
displayHTML(f'Your Model Endpoint Serving is now available. Open the <a href="/ml/endpoints/{serving_endpoint_name}">Model Serving Endpoint page</a> for more details.')

エンドポイントをRESTで叩いてみる

最後にエンドポイントをRESTクライアントから叩いてみましょう。RESTに対応していればどんなプログラム言語からでも叩けます。ただし、ローカルPCや外部のサービスから叩く場合は環境変数として「DATABRICKS_HOST」と「DATABRICKS_TOKEN」をセットしてください。詳細はこちらを参照のこと。

import mlflow.deployments
deploy_client = mlflow.deployments.get_deploy_client("databricks")

response = deploy_client.predict(
  endpoint = endpoint_name, 
  inputs = {"inputs": inputs}
)

print(response)

まとめ

BERTモデルをDatabricks上でファインチューニングし、かつ、サービングまで行うサンプルを記載しました。最近はLLaMa2やGPTなどテキスト生成系モデルの話が多いですが、生成系のモデル単一での使用ではなく、BERT系やその他のモデルも組み合わせた複合AIシステム(Compound AI Systems)へのシフトが起こりつつあります。

https://bair.berkeley.edu/blog/2024/02/18/compound-ai-systems/

したがってこのブログでも生成系言語モデルは当然のことながらも、画像系、動画系、その他クラシカルなMLモデルも含めて様々なトピックを取り扱っていこうと思います。

BFN!

参考

https://docs.databricks.com/ja/machine-learning/train-model/huggingface/load-data.html

https://github.com/ahdbilal/Databricks-GPU-Serving-Examples/blob/main/examples/measuring-GPU-utilization.py

https://qiita.com/taka_yayoi/items/20d1690ea5906bc48ae9

https://qiita.com/m__k/items/2c4e476d7ac81a3a44af#trainerクラスの定義と実行

Discussion