🌟

Mosaic AI Model Serving で 多様な日本語LLMと埋め込みモデルをサービングするためのガイド (2/3)

に公開

サービング手順

前の記事で各モデルのサービングパターンを決定したので、ここからは実際に Mosaic AI Model Serving へデプロイする手順を示します。前節でパターンの判定に用いた各モデルを継続して使用し、パターンごとの具体的な手順をサンプルコードとともに記載していきます。なお、重複したコードは最小限にとどめていますので、ご了承ください。

LLMのサービング手順

5 種類の LLM サービングパターンごとに代表モデルを使って手順を示します。

パターン① — "tokyotech-llm/Llama-3.1-Swallow-8B-Instruct-v0.3" をデプロイ

本手順は Azure Databricks の以下のランタイムとクラスターで正常に動作することを確認

  • ランタイム:15.4 ML
  • インスタンスタイプ:Standard_D8ds_v5 (本作業ではGPUは必須ではありません)

1. ライブラリのインストール

MLflow 2.21.3 と Transformers 4.48 系を使用します(ML ランタイムにプリインストールされているものから少しバージョンを上げています)。

%pip install -U hf_transfer mlflow==2.21.3 transformers==4.48.1 torch==2.5.1 torchvision==0.20.1
dbutils.library.restartPython()

2. 変数を定義

続いて、プログラムで使用する変数をまとめて定義します。エンドポイント名は -endpoint-chat-pt を付与して ChatCompletions + Provisioned Throughput であることを示します。

import os
os.environ["HF_HUB_ENABLE_HF_TRANSFER"] = "1" #HuggingFace Hubから高速ダウンロードするための設定

MODEL_PATH = "tokyotech-llm/Llama-3.1-Swallow-8B-Instruct-v0.3"
MODEL_NAME = "llama3_1_swallow_8b_instruct"
CATALOG = "hiroshi"
SCHEMA = "models"
registered_model_name = f"{CATALOG}.{SCHEMA}.{MODEL_NAME}"
ENDPOINT_NAME = MODEL_NAME+"-endpoint-chat-pt"

3. モデル取得とパイプライン生成

モデルとトークナイザーを HuggingFace Hub から取得し、パイプラインを構築します。

from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline

tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH)

model = AutoModelForCausalLM.from_pretrained(
  MODEL_PATH, 
  device_map="auto", 
  torch_dtype="auto"
)

pipe = pipeline(task="text-generation", model=model, tokenizer=tokenizer)

4. MLflow でロギング + Unity Catalog への登録

MLFlowの transformers フレーバーを使用して、モデルを MLFlow の実験(Experiment)にロギングし、同時に、Unity Catalogにデータを登録する方法を説明します。パターン①が ChatCompletions 形式であることから、mlflow.transformers.log_model 関数の引数として、task="llm/v1/chat" と明示的に指定し、input_example={"messages": messages} と JSON 形式の例を与えている点です。

import mlflow

messages = [
    {"role": "system", "content": "あなたは誠実で優秀な日本人のアシスタントです。"},
    {
        "role": "user",
        "content": "ただ、「はい」とだけ答えて下さい。",
    },
]

mlflow.set_registry_uri("databricks-uc")

with mlflow.start_run(run_name=MODEL_NAME) as run:
    model_info = mlflow.transformers.log_model(
        transformers_model=pipe,
        artifact_path=MODEL_NAME,
        task="llm/v1/chat",
        input_example={"messages": messages},
        registered_model_name=registered_model_name,
    )

5. 最適化可否を確認

登録したモデルが Mosaic AI Model Serving によって最適化可能か?、つまりDatabricksの独自エンジンで推論可能か?、さらに言うと、Provisioned Throughput でモデルをサービングできるか?、を判断します。モデルが登録された時点で、モデルの config などからシステムが自動的に判断するので、ここではその判断結果を確認します。

from databricks.sdk import WorkspaceClient
from datetime import timedelta

import requests
import json

model_name = registered_model_name
model_version = model_info.registered_model_version
endpoint_name = ENDPOINT_NAME

databricks_url = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().getOrElse(None)
token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().getOrElse(None)

headers = {"Context-Type": "text/json", "Authorization": f"Bearer {token}"}

response = requests.get(url=f"{databricks_url}/api/2.0/serving-endpoints/get-model-optimization-info/{model_name}/{model_version}", headers=headers)

print(json.dumps(response.json(), indent=4))

6. Provisioned Throughput でエンドポイント作成

モデルが最適化可能であることが分かれば、いよいよデプロイです。Provisioned Throughput でサービングする場合、デプロイ時にインフラのスペック(GPU SKU や枚数など)に関する指定は必要ありません。代わりにトークン・スループットの下限値と上限値を設定します。ただ、そのための基準値も前述の最適化可否の結果と一緒に取得されるので、何も考えずその基準値 throughput_chunk_size を上下限値として設定すればひとまずは十分です。この値はデプロイ後も変更可能です。

from mlflow.deployments import get_deploy_client
from datetime import timedelta
from databricks.sdk import WorkspaceClient

client = get_deploy_client("databricks")

endpoint = client.create_endpoint(
    name=endpoint_name,
    config={
        "served_entities": [
            {
                "entity_name": model_name,
                "entity_version": model_version,
                "min_provisioned_throughput": response.json()['throughput_chunk_size'],
                "max_provisioned_throughput": response.json()['throughput_chunk_size'],
            }
        ]
    },
)

print(json.dumps(endpoint, indent=4))

w = WorkspaceClient()
w.serving_endpoints.wait_get_serving_endpoint_not_updating(name=serving_endpoint_name, timeout=timedelta(minutes=120))

7. 動作確認

デプロイが成功したらエンドポイントを叩いてみます。エンドポイントは REST API を提供しているので、対応するクライアントであれば、どのようなものからもアクセス可能です。ここでは、Databricks も推奨している OpenAI クライアントと、Python の HTTP クライアントである requests ライブラリを使用する方法を紹介します。

まずは、OpenAI クライアントの例からですが、そもそもこのクライアントを使えることからも分かる通り、Provisioned Throughput のエンドポイントは OpenAI のエンドポイントと互換性性があります。かつ、パターン①は ChatCompletions 形式なので、OpenAI.chat.completions.create 関数を使ってエンドポイントへアクセスします。さらに、推論時のパラメータも OpenAI 互換であるため、temperature といった定番のものから、presence_penalty など OpenAI ならではのものも指定できます(ただしパラメータによっては設定値の制約があるのでご注意ください)。

from databricks.sdk import WorkspaceClient

w = WorkspaceClient()
openai_client = w.serving_endpoints.get_open_ai_client()

response = openai_client.chat.completions.create(
    model=ENDPOINT_NAME,
    messages=[
      {
        "role": "system",
        "content": "あなたは誠実で優秀な日本人のアシスタントです。"
      },
      {
        "role": "user",
        "content": "箱根を舞台にした時代劇小説を書いて下さい。",
      }
    ],
    temperature=0.1,
    max_tokens=256
)

print(response)

続いて、Python の requests ライブラリーの例です。

import requests
import json

test_messages = [
    {"role": "system", "content": "あなたは誠実で優秀な日本人のアシスタントです。"},
    {
        "role": "user",
        "content": "箱根を舞台にした時代劇小説を書いて下さい。",
    },
]

data = {
  "messages": test_messages,
  "temperature": 0.1,
  "max_tokens": 2000,
}

databricks_host = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().get()
databricks_token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get()
headers = {"Context-Type": "text/json", "Authorization": f"Bearer {databricks_token}"}

endpoint_name = ENDPOINT_NAME
response = requests.post(
    url=f"{databricks_host}/serving-endpoints/{endpoint_name}/invocations", 
    json=data, 
    headers=headers
)

print(response)

以上がパターン①の具体的な手順になります。

パターン② — "weblab-GENIAC/Tanuki-8B-dpo-v1.0" をデプロイ

本手順は Azure Databricks の以下のランタイムとクラスターで正常に動作することを確認しています。

  • ランタイム:15.4 ML
  • インスタンスタイプ:Standard_D8ds_v5 (本作業ではGPUは必須ではありません)

このモデルはカスタム・チャット・テンプレートを持つため Provisioned Throughput でデプロイすると ChatCompletions が破綻します。したがって Completions 形式での Provisioned Throughput サービング (パターン②) に切り替えます。コードはパターン①とほとんど同じなので、異なる部分のみ記載します。

1. ロギング時の差分

task="llm/v1/completions" とし、input_example にチャット・テンプレート適用済みテキストを渡します。

import numpy as np
from transformers import pipeline
import mlflow

messages = [
    {"role": "system", "content": "あなたは誠実で優秀な日本人のアシスタントです。"},
    {
        "role": "user",
        "content": "ただ、「はい」とだけ答えて下さい。",
    },
]

# プロンプトをChatCompletions -> Completionsに変換
prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)

mlflow.set_registry_uri("databricks-uc")

with mlflow.start_run(run_name=model_name) as run:
    model_info = mlflow.transformers.log_model(
        transformers_model=pipe,
        artifact_path="model",
        input_example={"prompt": prompt},   # Completionsを意識してプロンプトをテキストで入力
        task="llm/v1/completions",          # Completionsに変更
        registered_model_name=registered_model_name,
    )

2. 最適化判定

その後、デプロイまでパターン①と同じ手順になります。念の為、モデルの最適化判定 は {"optimizable":true} になります。

3. エンドポイントテスト

エンドポイントのテストも OpenAI クライアントを使うところは同じですが、エンドポイントのインターフェースが Completions 形式になるため、OpenAI.completions.create 関数に変える必要があります。パラメータに関しては、パターン①と同様に、OpenAI Completions API と互換になります。

from databricks.sdk import WorkspaceClient

w = WorkspaceClient()
openai_client = w.serving_endpoints.get_open_ai_client()

response = client.completions.create(
    model=ENDPOINT_NAME,
    prompt=["### Input:\nアメリカの首都は?\n\n### Output:\n"],
    max_tokens=200,
    temperature=0.1,
    stream=False
)

print(response)

パターン③ — AXCXEPT/EZO-Common-9B-gemma-2-it をデプロイ

本手順は Azure Databricks の以下のランタイムとクラスターで正常に動作することを確認しています。

  • ランタイム:15.4 ML
  • インスタンスタイプ:Standard_D8ds_v5 (本作業ではGPUは必須ではありません)

1. 最適化判定

パターン③も、実はパターン①とほとんど同じです。異なるのは、このパターンは Databricks の最適化がサポートされていないモデルであるので、api/2.0/serving-endpoints/get-model-optimization-info を使って最適化可否を確認しても、"不可" と返ってきます。したがって、Provisioned Throughput ではなく、通常の CPU/GPU サービングでデプロイする必要があります。LLM は基本的に GPU サービングを選択することになりますが、モデルが極端に小さい場合など一定条件下ではCPUサービングもオプションとなり得ます。

# サービングエンドポイントの作成または更新
from databricks.sdk import WorkspaceClient
from datetime import timedelta

import requests
import json

model_name = registered_model_name
model_version = model_info.registered_model_version
endpoint_name = ENDPOINT_NAME

databricks_url = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().getOrElse(None)
token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().getOrElse(None)

headers = {"Context-Type": "text/json", "Authorization": f"Bearer {token}"}

response = requests.get(url=f"{databricks_url}/api/2.0/serving-endpoints/get-model-optimization-info/{model_name}/{model_version}", headers=headers)

print(json.dumps(response.json(), indent=4))

2. GPU Serving でデプロイ

この例では、通常のGPUサービングを使用します。故に、モデルのデプロイに当ってはトークン・スループットの指定はできず、GPU サイジング を明示的に設定する必要があります。具体的には、GPU タイプとサイズです。この例では、Azure Databricks のドキュメントに則り、GPU_LARGE(A100x1) を Small で指定しています。

from mlflow.deployments import get_deploy_client
from datetime import timedelta
from databricks.sdk import WorkspaceClient

client = get_deploy_client("databricks")

endpoint = client.create_endpoint(
    name=endpoint_name,
    config={
        "served_entities": [
            {
                "entity_name": model_name,
                "entity_version": model_version,
                "workload_type": "GPU_LARGE", 
                "workload_size": "Small",
                "scale_to_zero_enabled": True
            }
        ]
    },
)

print(json.dumps(endpoint, indent=4))

w = WorkspaceClient()
w.serving_endpoints.wait_get_serving_endpoint_not_updating(name=endpoint_name, timeout=timedelta(minutes=240))

3. エンドポイントのテスト

パターン③では、ChatCompletions 形式でモデルをサービングしているため、プロンプトは以下の例のようにJSONで作成できます。

import requests
import json

test_messages = [
    {"role": "system", "content": "あなたは誠実で優秀な日本人のアシスタントです。"},
    {
        "role": "user",
        "content": "箱根を舞台にした時代劇小説を書いて下さい。",
    },
]

data = {
  "messages": test_messages,
  "temperature": 0.1,
  "max_tokens": 2000,
}

databricks_host = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().get()
databricks_token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get()
headers = {"Context-Type": "text/json", "Authorization": f"Bearer {databricks_token}"}

endpoint_name = ENDPOINT_NAME
response = requests.post(
    url=f"{databricks_host}/serving-endpoints/{endpoint_name}/invocations", 
    json=data, 
    headers=headers
)

print(response)

パターン④ — cyberagent/open-calm-1b をデプロイ

本手順は Azure Databricks の以下のランタイムとクラスターで正常に動作することを確認しています。

  • ランタイム:15.4 ML
  • インスタンスタイプ:Standard_D8ds_v5 (本作業ではGPUは必須ではありません)

1. ロギング

パターン④は、パターン②とほとんど同じですが、念の為ロギングと登録のサンプルコードを記載します。

import mlflow

mlflow.set_registry_uri("databricks-uc")

prompt = "AIによって私達の暮らしは、"

with mlflow.start_run(run_name=MODEL_NAME) as run:
    model_info = mlflow.transformers.log_model(
        transformers_model=pipe,
        artifact_path=MODEL_NAME,
        task="llm/v1/completions",
        input_example={"prompt": prompt},
        registered_model_name=registered_model_name,
    )

2. デプロイ

さらに、モデルが最適化対象ではないので、パターン③と同様にGPUのサイジングを指定する必要がありますが、モデルが1Bと小さいためGPUのタイプを GPU_LARGE → GPU_SMALL(T4x1) と変更してデプロイいただいても問題ありません。

テストもパターン②と同様に、プロンプトをテキストで指定ください。

import requests
import json

data = {
  "prompt": "AIによって私達の暮らしは、",
  "temperature": 0.1,
  "max_tokens": 2000,
}

databricks_host = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().get()
databricks_token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get()
headers = {"Context-Type": "text/json", "Authorization": f"Bearer {databricks_token}"}

response = requests.post(
    url=f"{databricks_host}/serving-endpoints/{endpoint_name}/invocations", 
    json=data, 
    headers=headers
)

print(response)

以上がパターン④の具体的な手順になります。

パターン⑤ — {Qwen/QwQ-32B-AWQ} を vLLM を用いてサービング

本手順は Azure Databricks の以下のランタイムとクラスターで正常に動作することを確認しています。

  • ランタイム:15.4 ML
  • インスタンスタイプ:Standard_NC24ads_A100_v4 (本作業では80GB以上のGPUメモリが必要です。)

パターン⑤は、これまでの手順より難しく感じると思います。そもそも、このパターンにする理由(せざるを得ないシナリオ)はいくつかありますが、その中でも現実的なのがモデルを vLLM などの独自エンジンでサービングしたい、または、モデルの構造を多分にカスタマイズしているなど、独自性やカスタム要素が多いものはこのパターンとして判断される可能性が高いです。

1. ライブラリのインストール

まずは、ライブラリのインストールから始めましょう。

%pip install -U hf_transfer mlflow==2.21.3 autoawq==0.2.5 transformers==4.51.3 accelerate==0.30.1 vllm==0.7.3 threadpoolctl==3.6.0
dbutils.library.restartPython()

2. 変数の定義

続いて、変数の定義です。

import os
os.environ["HF_HUB_ENABLE_HF_TRANSFER"] = "1"

HF_REPO_ID = "Qwen/QwQ-32B-AWQ"
MODEL_PATH = "/Volumes/hiroshi/models/models_from_hf/QwQ-32B-AWQ"
MODEL_NAME = "qwq-32b-awq"
CATALOG = "hiroshi"
SCHEMA = "models"
registered_model_name = f"{CATALOG}.{SCHEMA}.{MODEL_NAME}"
ENDPOINT_NAME = f"{MODEL_NAME}-gpu-vllm-chat"

3. モデルを Unity Catalog Volume へ配置

HuggingFace Hub からモデルをダウンロードして、それを Unity Catalog Volume に保存し、その後は Volume からモデルを読み込みようにします。

from huggingface_hub import snapshot_download

snapshot_download(
  repo_id=HF_REPO_ID,
  local_dir="/local_disk0/hf")

!mkdir -p {MODEL_PATH}
!cp -L -R /local_disk0/hf/* {MODEL_PATH}

4. カスタム推論処理の実装

次が、このパターンの最大の特徴ですが、エンドポイント内での推論処理をカスタム実装していきます。基本的には mlflow.pyfunc.PythonModel を継承したサブクラスを作成し、load_context メソッドと predict メソッドをオーバーライドします。

import mlflow
import torch
from mlflow.types.llm import ChatCompletionResponse, ChatMessage, ChatParams, ChatChoice
from vllm import LLM, SamplingParams

class QwQ32BAWQModel(mlflow.pyfunc.PythonModel):
  
  def load_context(self, context): 

    # For generative models (task=generate) only
    self.llm = LLM(model=context.artifacts["model-path"], task="generate", tensor_parallel_size=torch.cuda.device_count())
    
  def predict(self, context, model_input, params=None):

    if isinstance(model_input, pd.DataFrame):
      model_input = model_input.to_dict()['messages']
      model_input_list = list(model_input.values())

    sampling_params = SamplingParams(
      temperature=params.get("temperature", 0.5), 
      top_p=params.get("top_p", 0.8), 
      top_k=params.get("top_k", 5), 
      max_tokens=params.get("max_new_tokens", 200), 
      presence_penalty=params.get("presence_penalty", 1.1))
    
    answer = self.llm.chat(
      model_input_list, 
      sampling_params=sampling_params, 
      use_tqdm=False)

    assistant_message = answer[0].outputs[0].text

    response = ChatCompletionResponse(
        choices=[
          ChatChoice(index=0, message=ChatMessage(role="assistant", content=assistant_message))
        ],
        usage={},
        model=HF_REPO_ID,
    )

    return response.to_dict()

5. ロギングと登録

作成したクラス(モデル)をMLFlowへロギングし、Unity Catalogへ登録します。ポイントは、signature というエンドポイントの入力と出力の形式定義です。

import numpy as np
import pandas as pd

import mlflow
from mlflow.models import infer_signature

mlflow.set_registry_uri("databricks-uc")

with mlflow.start_run(run_name="qwq-32b-instruct-vllm"):
  # 入出力スキーマの定義
  input_example = {
    "messages": [
        {
            "role": "user",
            "content": "日本の首都はどこ?",
        }
    ],
  }

  output_response = {
    'id': 'chatcmpl_e048d1af-4b9c-4cc9-941f-0311ac5aa7ab',
    'choices': [
      {
        'finish_reason': 'stop', 
        'index': 0,
        'logprobs': "",
        'message': {
          'content': '首都は東京です。',
          'role': 'assistant'
          }
        }
      ],
    'created': 1719722525,
    'model': 'dbrx-instruct-032724',
    'object': 'chat.completion',
    'usage': {'completion_tokens': 74, 'prompt_tokens': 803, 'total_tokens': 877}
  }

  params={
    "max_new_tokens": 512,
    "temperature": 0.1,
    "do_sample": True,
  }

  signature = infer_signature(
    model_input=input_example, 
    model_output=output_response, 
    params=params)
  
  logged_model_info = mlflow.pyfunc.log_model(
    python_model=QwQ32BAWQModel(),
    artifact_path="model",
    signature=signature, 
    input_example=input_example,
    example_no_conversion=True,
    extra_pip_requirements=[ 
      "autoawq==0.2.5", 
      "transformers==4.51.3", 
      "accelerate==0.30.1", 
      "vllm==0.7.3", 
      "threadpoolctl==3.6.0"],
    artifacts={'model-path': MODEL_PATH},
    registered_model_name=registered_model_name,
  )

6. デプロイ

続いて、モデルのデプロイです。ここは、通常の GPU サービングを用いるように各パラメータを設定ください。

from mlflow.deployments import get_deploy_client
from datetime import timedelta
from databricks.sdk import WorkspaceClient

client = get_deploy_client("databricks")

endpoint = client.create_endpoint(
    name=endpoint_name,
    config={
        "served_entities": [
            {
                "entity_name": model_name,
                "entity_version": model_version,
                "workload_type": "GPU_LARGE", 
                "workload_size": "Small",
                "scale_to_zero_enabled": True
            }
        ]
    },
)

print(json.dumps(endpoint, indent=4))

w = WorkspaceClient()
w.serving_endpoints.wait_get_serving_endpoint_not_updating(name=endpoint_name, timeout=timedelta(minutes=240))

7. エンドポイントのテスト

デプロイが成功すれば、以下のテストコードでエンドポイントを叩くことができます。

import requests
import json

data = {
  "messages": [{"role": "user", "content": "日本の首都はどこ?"}],
  "temperature": 0.1,
  "max_new_tokens": 10,
}

databricks_host = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().get()
databricks_token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get()
headers = {"Context-Type": "text/json", "Authorization": f"Bearer {databricks_token}"}

response = requests.post(
    url=f"{databricks_host}/serving-endpoints/{endpoint_name}/invocations", json=data, headers=headers
)

print(response.json())

以上がパターン⑤の具体的な手順になります。

おわりに

LLMのサービングパターンごとの具体例を記載しました。埋め込みモデルの同手順は次の記事で書きます。

Discussion