🕌

OpenAIのAPIにアクセスするServing EndpointをDatabricks上にデプロイする方法

2023/11/20に公開

OpenAIのAPIにアクセスするServing EndpointをDatabricks上にデプロイする方法

すでにWeb上にいくつか同手順をまとめた記事が存在しますが、2023年11月20日時点の最新情報とAPI使用に基づき、記録として記載しておきます。

TL;DR

  • OpenAIのAPIをサービングエンドポイントとしてDatabricks上にデプロイする場合、以前だと環境変数"MLFLOW_OPENAI_SECRET_SCOPE”にOpenAIのトークンを格納しているシークレットのスコープ名を指定していましたが、この方法が現在は非推奨になりました。
  • 代わりに、エンドポイントのデプロイ時に下記情報を追加します。(詳細はこの下を読んでください。)
      "environment_vars": {
        "OPENAI_API_KEY": f"{{{{secrets/{OPENAI_TOKEN_SECRETS_SCOPE_NAME}/{OPENAI_TOKEN_SECRETS_KEY_NAME}}}}}"
      }

環境

  • Databricks Runtime: 14.1 ML
  • ノードタイプ: i3.xlarge (シングルノード)

前提

  • Unity Catalogが有効化されている。
      - まだの方はこちら を参照して有効化ください
  • OpenAI APIへのアクセストークンがDatabricksシークレット内に格納されていること
      - 今回は以下のように設定します。詳細はこちらを参照ください。
# Databricksのトークン
databricks secrets put-secret my_secret_scope my_secret_key_db

# OpenAIのトークン
databricks secrets put-secret my_secret_scope my_secret_key_openai

大まかな流れ

  1. OpenAIフレーバーを使用してMLFlow Trackingにモデルを記録
  2. Trackingに記録したモデルをモデルレジストリー(今回はUnity Catalog)へ登録
  3. Unity Cactalogに登録したモデルをサービングエンドポイントとしてデプロイ

1. OpenAIフレーバーを使用してMLFlow Trackingにモデルを記録

gpt-3.5-turboのChat Completion APIにアクセスする場合を例に取ります。
まずは、MLFlowが提供するOpenAIフレーバーを使用して、OpenAIのAPIアクセスを疑似モデル化して、MLFlow Trackingへ記録します。

import os
import mlflow
from mlflow.models.signature import ModelSignature
from mlflow.types.schema import ColSpec, ParamSchema, ParamSpec, Schema
import openai
import pandas as pd

##########################################################
# MLFLOW_OPENAI_SECRET_SCOPEを使ったモデルサービングのためのシークレットの指定は非推奨になりました。代わりにシークレットベースの環境変数を使用してください。
# https://mlflow.org/docs/latest/python_api/openai/index.html
##########################################################
# os.environ["MLFLOW_OPENAI_SECRET_SCOPE"] = "my_secret_scope"

with mlflow.start_run() as run:
    model_info = mlflow.openai.log_model(
        model="gpt-3.5-turbo",
        task=openai.ChatCompletion,
        messages=[{"role": "system", "content": "あなたは世界的に有名な作家です。"},
                  {"role": "user", "content": "{user_prompt}"}],
        signature=ModelSignature(
            inputs=Schema([ColSpec(type="string", name=None)]),
            outputs=Schema([ColSpec(type="string", name=None)]),
            params=ParamSchema(
                [
                    ParamSpec(name="temperature", default=0, dtype="float"),
                ]
            ),
        ),
        artifact_path="model",
    )

(オプション)記録したモデルをロードして実行してみる

MLFlow Trackingからモデルを取得し、推論(OpenAIのAPIを呼ぶ)します。

OPENAI_API_TOKEN=dbutils.secrets.get(scope="my_secret_scope", key="my_secret_key_openai")
os.environ["OPENAI_API_KEY"]=OPENAI_API_TOKEN
openai.api_key=OPENAI_API_TOKEN

model = mlflow.pyfunc.load_model(model_info.model_uri)
df = pd.DataFrame({"user_prompt": ["このトンネルを抜けると"]})
response = model.predict(df, params={"temperature":0.5})
print(response)

以下が出力。

['そこは広大な草原が広がっています。草原は美しい緑色で、風がそよそよと吹き抜けています。遠くには山々や川が見え、鳥のさえずりが聞こえてきます。空は広く開けており、透明な青色が広がっています。この場所は自然の美しさが溢れ、心を癒してくれるパラダイスのような場所です。それぞれの人にとって特別な意味を持つかもしれませんが、一つは確かです。このトンネルを抜けると新たな冒険や発見が待っているのかもしれません。']

2. Trackingに記録したモデルをUnity Catalogへ登録

続いて、モデルをモデルレジストリーへ移していきます。
現在Databricks上ではMLFlow Model RegistryとUnity Catalogのどちらかをモデルのレジストリーとして選択いただけます。Unity Catalogのモデルを使用すると、ワークスペース間での一元的なアクセス制御、監査、リネージ、モデルディスカバリーなど、Unity Catalogの利点をMLモデルに拡張できるため、基本的にはUnity Catalogの活用をお勧めしています。したがって、今回はUnity Catalogへモデルを登録します。

import mlflow

# モデルレジストリーとしてUnity Catalogをセット
mlflow.set_registry_uri("databricks-uc")

# UCモデル名は<カタログ名>.<スキーマ名>.<モデル名>のパターンに従っており、カタログ名、スキーマ名、登録モデル名に対応していることに注意してください
registered_name = <カタログ名>.<スキーマ名>.<モデル名>

result = mlflow.register_model(
    "runs:/"+run.info.run_id+"/model",
    registered_name,
)

(オプション)Unity Catalogに登録したモデルをロードして実行してみる

Unity Catalogに登録したモデルを取得し、実行します。

import mlflow
import pandas as pd
import torch

registered_name = "hiroshi.model.openai_gpt-35-turbo_chat"

loaded_model = mlflow.pyfunc.load_model(f"models:/{registered_name}/{result.version}")

df = pd.DataFrame({"user_prompt": ["このトンネルを抜けると"]})
response = model.predict(df, params={"temperature":0.5})
print(response)

以下が出力。

['広々とした景色が広がっています。']

3. Unity Cactalogに登録したモデルをサービングエンドポイントにデプロイ

ここがポイントです。
モデルデプロイのリクエストに下記のアイテムを加えてください。

      "environment_vars": {
        "OPENAI_API_KEY": "{{secrets/my_secret_scope/my_secret_key_openai}}"
      }

フルのコードはこちら。

import requests
import json

endpoint_name = $YOUR_ENDPOINT_NAME

databricks_url = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().getOrElse(None)
databricks_token = dbutils.secrets.get(scope="my_secret_scope", key="my_secret_key_db")

deploy_headers = {'Authorization': f'Bearer {databricks_token}', 'Content-Type': 'application/json'}
deploy_url = f'{databricks_url}/api/2.0/serving-endpoints'

model_version = result 
endpoint_config = {
  "name": endpoint_name,
  "config": {
    "served_models": [{
      "name": f'{model_version.name.replace(".", "_")}_{model_version.version}',
      "model_name": model_version.name,
      "model_version": model_version.version,
      "workload_type": "CPU",
      "workload_size": "Small",
      "scale_to_zero_enabled": "True",
      "environment_vars": {
        "OPENAI_API_KEY": "{{secrets/my_secret_scope/my_secret_key_openai}}"
      }
    }]
  }
}
endpoint_json = json.dumps(endpoint_config, indent='  ')

# APIにPOSTリクエストを送る
deploy_response = requests.request(method='POST', headers=deploy_headers, url=deploy_url, data=endpoint_json)

if deploy_response.status_code != 200:
  raise Exception(f'Request failed with status {deploy_response.status_code}, {deploy_response.text}')

# POSTリクエストのレスポンスを表示する
# 最初にサービングエンドポイントを作成する際、'ready'の状態が 'NOT_READY'であることが表示される
# Databricksモデルのサービングエンドポイントページでステータスを確認可能
print(deploy_response.json())

(オプション)Endpointを直接Callしてみる

import os
import requests
import numpy as np
import pandas as pd
import json

def score_model(workspace_name: str, endpoint_name: str, databricks_token: str):
  url = f'https://{workspace_name}.cloud.databricks.com/serving-endpoints/{endpoint_name}/invocations'
  
  headers = {'Authorization': f'Bearer {databricks_token}', 'Content-Type': 'application/json'}
  ds_dict = {
    "dataframe_split": {
      "columns": [
        "user_prompt"
      ],
      "data": [
        [
          "このトンネルを抜けると"
        ]
      ]
    }
  }
  data_json = json.dumps(ds_dict, allow_nan=True)
  response = requests.request(method='POST', headers=headers, url=url, data=data_json)
  if response.status_code != 200:
    raise Exception(f'Request failed with status {response.status_code}, {response.text}')

  return response.json()

print(score_model(YOUR_WORKSPACE_NAME, YOUR_ENDPOINT_NAME, YOUR_DATABRICKS_TOKEN))

以下が出力。

{'predictions': ['新しい景色が広がっているだろう。\n\nこのトンネルを抜けると、私たちは新たな場所や風景に出会うことができるだろう。トンネルは暗く、制限された空間であり、私たちはそれを通り抜けることで制約から解放され、広がりのある世界に目を向けることができる。\n\nトンネルの出口にたどり着くと、そこには自然の美しさや都市の喧騒、新しい冒険が待っているかもしれない。私たちは新たな出会いや経験を通して成長し、自己を発見することができるかもしれない。\n\nしかし、トンネルから出ることにはリスクや挑戦が伴うことも忘れてはならない。新しい景色を求めるためには、忍耐や勇気が必要とされるかもしれない。しかし、その先に広がる可能性や充実感は、それを乗り越える価値があると信じたい。\n\nこのトンネルを抜けると、私たちは未知の世界へと足を踏み出すことになる。どんな景色や出来事が待っているのか、私たち自身では予測できないかもしれない。しかし、トンネルの先には新たな発見や達成感が待っていることは間違いないだろう。']}

BFN!

参考

Discussion