😸

Mosaic AI Model Serving で 多様な日本語LLMと埋め込みモデルをサービングするためのガイド (3/3)

に公開

サービング手順

前の記事で各モデルのサービングパターンを決定したので、ここからは実際に埋め込みモデルを Mosaic AI Model Serving へデプロイする手順を示します。前節でパターンの判定に用いた各モデルを継続して使用し、パターンごとの具体的な手順をサンプルコードとともに記載していきます。なお、重複したコードは最小限にとどめていますので、ご了承ください。

埋め込みモデルのサービング手順

前章で示した 4 種類のパターン をもとに、日本語対応の埋め込みモデルを Mosaic AI Model Serving へデプロイする流れを示します。埋め込みモデルは LLM ほどバリエーションは多くないですが、エンジン(HF/Sentence Transformers)と GPU サイジングは事前に確認してください。

パターン① — Alibaba-NLP/gte-large-en-v1.5 のデプロイ

前述した通り、本パターンに分類される日本語特化の埋め込みモデルが現時点で確認できない点と、GTE v1.5 と BGE v1.5 に関しては公式サンプルが公開されているため、本ガイドでは割愛させていただきます。

パターン② — cl-nagoya/ruri-large-v2 のデプロイ

本手順は Azure Databricks の以下のランタイムとクラスターで正常に動作することを確認しています。

  • ランタイム: サーバーレスノートブック

1. ライブラリをインストール

必要なライブラリをインストールします。

%pip install mlflow[databricks]==2.21.3 torch==2.5.1 torchvision==0.20.1
dbutils.library.restartPython()

2. 変数を定義

変数をセットします。

HF_MODEL_REPO_ID = "cl-nagoya/ruri-large-v2"
CATALOG = "hiroshi"
SCHEMA = "models"
MODEL_NAME = "ruri-large-v2"
registered_model_name = f"{CATALOG}.{SCHEMA}.{MODEL_NAME}"
ENDPOINT_NAME = MODEL_NAME + "-embedding"

3. モデルをロードしてパイプ生成

HuggingFace Hub からモデルをダウンロードして、ロードします。

from transformers import AutoTokenizer, AutoModel, pipeline

tokenizer = AutoTokenizer.from_pretrained(HF_MODEL_REPO_ID)
model = AutoModel.from_pretrained(HF_MODEL_REPO_ID)

pipe = pipeline("feature-extraction", model=model, tokenizer=tokenizer)

4. MLflow ロギング + 登録

MLFlow の transformers フレーバーで MLFlow にロギング、および、Unity Catalog への登録を行います。

from transformers import pipeline
import mlflow

mlflow.set_registry_uri('databricks-uc')

with mlflow.start_run():
    logged_model = mlflow.transformers.log_model(
        transformers_model=pipe,
        artifact_path="model",
        task="llm/v1/embeddings",
        metadata={"task": "llm/v1/embeddings"},
        registered_model_name=registered_model_name
   )

5. 最適化可能か確認

このモデルは、Databricks の最適化対象ではないのですが、一応確認しておきます。

import requests
import json

model_name = registered_model_name
model_version = latest_version

API_ROOT = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().getOrElse(None)
API_TOKEN = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().getOrElse(None)

headers = {"Context-Type": "text/json", "Authorization": f"Bearer {API_TOKEN}"}

response = requests.get(url=f"{API_ROOT}/api/2.0/serving-endpoints/get-model-optimization-info/{model_name}/{model_version}", headers=headers)

print(json.dumps(response.json(), indent=4))

6. GPU Serving でデプロイ

続いて、モデルのデプロイです。ここは、通常のGPUサービングを用いるように各パラメータを設定ください。Ruri-v2 は 400 MB ほどのため GPU_SMALL (T4×1) で十分ですし、レイテンシーの要件が緩い場合は CPU サービングも可能です。その際は、workload_type を CPU と指定ください。

from mlflow.deployments import get_deploy_client
from datetime import timedelta
from databricks.sdk import WorkspaceClient

serving_endpoint_name = ENDPOINT_NAME
latest_model_version = logged_model.registered_model_version
model_name = logged_model.registered_model_name

client = get_deploy_client("databricks")
endpoint = client.create_endpoint(
    name=serving_endpoint_name,
    config={
        "served_entities": [
            {
                "entity_name": model_name,
                "entity_version": latest_model_version,
                "workload_type": "GPU_SMALL",
                "workload_size": "Small",
                "scale_to_zero_enabled": True
            }
        ]
    }
)

w = WorkspaceClient()
w.serving_endpoints.wait_get_serving_endpoint_not_updating(name=serving_endpoint_name, timeout=timedelta(minutes=120))

7. エンドポイントのテスト

デプロイが成功すれば、以下のテストコードでエンドポイントを叩くことができます。

import mlflow.deployments

client = mlflow.deployments.get_deploy_client("databricks")

embeddings_response = client.predict(
    endpoint=ENDPOINT_NAME,
    inputs={
        "inputs": ["クエリ: 日本の首都はどこですか?"]
    }
)
print(embeddings_response)

パターン③ — intfloat/multilingual-e5-large のデプロイ

本手順は Azure Databricks の以下のランタイムとクラスターで正常に動作することを確認しています。

  • ランタイム: サーバーレスノートブック

1. ライブラリをインストール

ライブラリをインストールします。

%pip install mlflow[databricks]==2.21.3 torch==2.5.1 sentence_transformers==4.0.2
dbutils.library.restartPython()

[!INFO]
sentence_transformersのバージョンは2.2.2以上、4.0.2以下にすること

MLFlowのドキュメントによると、mlflow.sentence_transformersフレーバーを使用する際は、sentence-transformersのバージョンを2.2.2以上、4.0.2以下にするよう明記されている。

2. 変数を定義

変数を定義します。

CATALOG = "hiroshi"
SCHEMA = "models"
MODEL_NAME = "multilingual-e5-large"
ENDPOINT_NAME = MODEL_NAME + "-embedding"

3. モデルをロードしてパイプ生成

HuggingFace Hub からモデルをダウンロードし、SentenceTransformer を使ってロードします。

from sentence_transformers import SentenceTransformer
model_name = "intfloat/multilingual-e5-large"

model = SentenceTransformer(model_name)

4. MLflow ロギング + 登録

MLFlow の sentence_transformers フレーバーで MLFlow にロギング、および、Unity Catalog へ登録します。

import mlflow
mlflow.set_registry_uri("databricks-uc")

with mlflow.start_run() as run:  
    mlflow.sentence_transformers.log_model(
      model, 
      artifact_path=MODEL_NAME, 
      signature=signature,
      input_example=sentences,
      extra_pip_requirements=[
        "torchvision==0.20.1", 
        "sentence_transformers==4.0.2"
      ],
      registered_model_name=registered_model_name,
    )

5. デプロイ

続いて、モデルのデプロイです。ここは、通常の GPU サービングを用いるように各パラメータを設定ください。この例では、GPU_SMALL(T4x1)を使用していますが、CPU サービングも可能です。その際は、workload_type を CPU と指定ください。

from mlflow.deployments import get_deploy_client
from datetime import timedelta
from databricks.sdk import WorkspaceClient

client = get_deploy_client("databricks")
endpoint = client.create_endpoint(
    name=serving_endpoint_name,
    config={
        "served_entities": [
            {
                "entity_name": model_name,
                "entity_version": latest_model_version,
                "workload_type": "GPU_SMALL",
                "workload_size": "Small",
                "scale_to_zero_enabled": True
            }
        ]
    }
)

w = WorkspaceClient()
w.serving_endpoints.wait_get_serving_endpoint_not_updating(name=serving_endpoint_name, timeout=timedelta(minutes=120))

6. エンドポイントのテスト

デプロイが完了すれば、以下のコードでテストできます。

import mlflow.deployments

client = mlflow.deployments.get_deploy_client("databricks")

embeddings_response = client.predict(
    endpoint=serving_endpoint_name,
    inputs={
        "inputs": ["query: おはようございます"]
    }
)

print(embeddings_response)

パターン④ — intfloat/multilingual-e5-large のデプロイ

本手順は Azure Databricks の以下のランタイムとクラスターで正常に動作することを確認しています。

  • ランタイム: サーバーレスノートブック

埋め込みモデルをパターン④でサービングすることはあまり多くないとは思いますが、この例では、Sentence Transformers 4.1.x で追加された auto-device map を利用したいケースを想定します。バージョン 4.1.x は MLflow が公式にサポートするレンジ外のため、パターン④ (カスタム) を選択します。

1. ライブラリをインストール

まずは、ライブラリのインストールです。

%pip install hf_transfer mlflow[databricks]==2.21.3 torch==2.5.1 sentence_transformers==4.1.0
dbutils.library.restartPython()

2. 変数を定義

変数を定義します。

import os
os.environ["HF_HUB_ENABLE_HF_TRANSFER"] = "0"

HF_MODEL_REPO_ID = "intfloat/multilingual-e5-large"
CATALOG = "hiroshi"
SCHEMA = "models"
MODEL_NAME = "multilingual-e5-large"
MODEL_PATH = f"/Volumes/{CATALOG}/{SCHEMA}/models_from_hf/{MODEL_NAME}"
ENDPOINT_NAME = MODEL_NAME + "-embedding"
registered_model_name = f"{CATALOG}.{SCHEMA}.{MODEL_NAME}"

3. モデルを Unity Catalog Volume に配置

HuggingFace Hub からモデルをダウンロードし、Unity Catalog Volume に保存します。

from huggingface_hub import snapshot_download

snapshot_download(
  repo_id=HF_MODEL_REPO_ID,
  local_dir="/tmp/hf")

!mkdir -p {MODEL_PATH}
!cp -L -R /tmp/hf/* {MODEL_PATH}

4. カスタム PythonModel 実装

エンドポイント内での推論処理を実装していきます。

import mlflow
import torch
from sentence_transformers import SentenceTransformer
import uuid

class MultilingualE5LargeModel(mlflow.pyfunc.PythonModel):
  
  def load_context(self, context): 
    self.model = SentenceTransformer(context.artifacts["model-path"])
    
  def predict(self, context, model_input, params=None):
    if isinstance(model_input, pd.DataFrame):
      model_input = model_input.to_dict()['input'] # 入力の整形方法を変更
    
    answer = self.model.encode(sentences=[model_input], normalize_embeddings=True)

    response = {
      "id": str(uuid.uuid4()),
      "object": "list",
      "model": "intfloat/multilingual-e5-large",
      "data": [
        {
          "index": 0,
          "object": "embedding",
          "embedding": answer[0].tolist()
        }
      ],
      "usage": {
        "prompt_tokens": None,
        "total_tokens": None
      }
    }

    return response

5. MLflow ロギング

作成したクラス(モデル)を MLFlow へロギングし、Unity Catalog へ登録します。Signature として、入力と出力の形式を OpenAI Embedding API 互換にしています。

import numpy as np
import pandas as pd

import mlflow
from mlflow.models import infer_signature

mlflow.set_registry_uri("databricks-uc")

with mlflow.start_run(run_name=MODEL_NAME):
  # 入出力スキーマの定義
  input_example = {
    "input": "こんにちは",
  }

  output_response = {
    "id": "37f4f110-e518-40cf-bcc9-fe8518621c64",
    "object": "list",
    "model": HF_MODEL_REPO_ID,
    "data": [
      {
        "index": 0,
        "object": "embedding",
        "embedding": [0.010050328448414803, 0.0050097182393074036]
      }
    ],
    "usage": {
      "prompt_tokens": None,
      "total_tokens": None
    }
  }

  signature = infer_signature(
    model_input=input_example, 
    model_output=output_response)
  
  logged_chain_info = mlflow.pyfunc.log_model(
    python_model=MultilingualE5LargeModel(),
    artifact_path="model",
    signature=signature, 
    input_example=input_example,
    example_no_conversion=True,
    extra_pip_requirements=[
      "torch==2.5.1", 
      "sentence_transformers==4.1.0",
      "torchvision==0.20.1"],
    artifacts={'model-path': MODEL_PATH},
    registered_model_name=registered_model_name,
  )

6. GPU Serving デプロイ

GPU_SMALL (T4×1) または CPU。Scale-to-zero で開発コストを最適化します。

from mlflow.deployments import get_deploy_client
from datetime import timedelta
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.serving import EndpointCoreConfigInput, ServedModelInput, ServedModelInputWorkloadSize, ServedModelInputWorkloadType

client = get_deploy_client("databricks")
endpoint = client.create_endpoint(
    name=serving_endpoint_name,
    config={
        "served_entities": [
            {
                "entity_name": registered_model_name,
                "entity_version": model_version,
                "workload_type": "GPU_SMALL",
                "workload_size": "Small",
                "scale_to_zero_enabled": True
            }
        ]
    }
)

w = WorkspaceClient()
w.serving_endpoints.wait_get_serving_endpoint_not_updating(name=serving_endpoint_name, timeout=timedelta(minutes=120))

7. エンドポイントのテスト

デプロイが完了すれば、以下のコードでテストできます。

import mlflow.deployments

client = mlflow.deployments.get_deploy_client("databricks")

embeddings_response = client.predict(
    endpoint=ENDPOINT_NAME,
    inputs={
        "input": "おはようございます"
    }
)

print(embeddings_response)

おわりに

全3回に渡り、Databricks Mosaic AI Model Serving を用いて自己ホスト型の LLM と埋め込みモデルをサービングする手順を体系的に整理しました。まず判定フローで 5 種類の LLM サービングパターン または 4 種類の埋め込みモデル サービングパターン を選び、続いて パターンごとの具体的なデプロイ工程をコードで解説しました。とくに Production ワークロードでは、Databricks 独自エンジンによる Provisioned Throughput を優先いただき、通常のCPU/GPU サービング を選ぶ場合は GPU_SMALL / GPU_LARGE など適切な SKU を設定することを意識してください。

AI モデルも Databricks の機能も急速に進化しています。本ガイドも必要に応じてアップデートし、常に最新の状況に対応できるよう努めます。

最後に、Databricks Mosaic AI Model Serving は REST API を公開するだけでなく、AI Gateway と連携して認可・レート制限・監査ログを統合管理できるため、エンタープライズ利用でも安心して展開できます。ぜひ本書のサンプルを参考に、“あなたのモデル” を Databricks でサービングし、生成 AI を様々なビジネスでご活用ください。

Discussion