❄️

Snowflake モデルレジストリの運用を真剣に考えてみた

に公開

MLOpsではモデルのバージョン管理をして、本番環境のモデルを安全に交換・維持していくということを考えなくてはなりません。
この記事ではSnowflakeでMLモデルを管理する上で、モデルのライフサイクルと各環境をどのように使い分けていくかを整理しました。
企業やチームの思想によって許容できない部分もあると思いますが、あくまで一例として読んで頂ければ幸いです。

実現したいこと

大きく分けて以下三点を実現するための手法を考えます。

  • 開発環境/検証環境/本番環境を分離したい
    各環境には後述の役割があります。CI/CDと合わせて良い感じに分離します。
  • 検証環境で検証したモデルを本番環境で運用したい
    検証環境で運用可能性が見えたモデルを、本番環境で学習することなくデプロイします。
  • 本番環境のモデルは複数バージョンを並列で動かしたい
    本番環境で安全にモデルの取り換えができるように、運用バージョンとステージングバージョンを同時に稼働できるようにします。

環境分離の前提

MLモデルの環境分離の要件はデータ側の要件に合わせる必要があります。
下図がそのイメージです。

env

具体的には以下のような使い分けがあります。

本番環境

運用に供する環境。
デプロイ・リリースはCI/CDパイプラインによって自動で行う。
モデルの学習や精度検証には、この環境のデータがソースとして使われる。
他環境とはアカウントが分離されている。

検証環境

本番環境へリリースする前の、モデルの最終的な精度検証や動作確認を行う。
モデルの学習もCI/CDやCTのパイプラインによってこの環境で行う。
データパイプラインやモデルの挙動を検証するデータベースと、本番環境から共有されたデータベースの2つが存在している。

開発環境

モデルや特徴量の開発を行う。
開発時に検証環境からデータをクローン・コピーして使う。

開発から本番運用までの流れ

次に各環境でどのように人が関わるかを整理します。
対象にするMLモデルは、主にテーブルデータを扱う回帰や分類のタスクを想定しています。
また、1モデルにつきバッチ推論を一日一回実行できれば良いとします。

下図がモデル開発~本番リリースまでの工程イメージです。
flow

モデル開発

開発環境でデータサイエンティストがモデルの開発をします。
今回は特徴量の開発については省略します。
各々Notebookやローカルでコードを書いています。

検証環境へデプロイ

コードがgitブランチにプッシュされると、CI/CDパイプラインが稼働して検証環境でモデルが作成されます。
このときデータは本番環境から共有されているデータを使用します。
作成されたモデルを動かしてみて、数値に異常が無いか、パフォーマンスに問題が無いか確認します。

本番環境へデプロイ

検証環境での確認が済んだら、本番環境へデプロイします。
検証環境で動作を確認したモデルをそのまま本番環境へデプロイするため、学習は行いません。
検証環境では本番環境から共有しているデータで学習しているため、本番環境でも学習データの再現が可能ですので、本番環境での学習はしないこととします。
せっかく検証環境で精度を確認したのに、本番環境で学習すると別モデルになってしまいます。検証環境で検証したモデルをそのまま使えるに越したことはありませんし、それが実現できるような環境を作りたいです。

運用モデルの決定(リリース)

本番環境では、実際に推論リクエストを処理しているprdバージョンと、リリース候補であるstgバージョンの両方がデプロイされており、いつでもエイリアスの付け替えでstgprdに昇格(リリース)できるようにします。
本番環境でもリリース候補のモデルで推論パイプラインが稼働するかをチェックできますし、必要であれば吐き出される予測値がイカれてないか等の最終確認も行えます。
また、定期的にモデルを更新するCT(Continuous Training)のプロセスも自動で実行しますが、最終的にどのバージョンをprdとするかの判断は人間が行うこととします。

MLモデルのライフサイクル管理

Snowflakeの公式ドキュメントには、MLモデルのライフサイクル管理の手法について3つ紹介されています。
https://docs.snowflake.com/ja/developer-guide/snowflake-ml/model-registry/model-management#model-lifecycle-management

スキーマで分離する方法

モデルを保管しておくスキーマを検証用、本番用で分けておく手法です。
モデルの格納場所が複数スキーマに跨ることが、やや煩雑に見えたため採用を見送りました。

タグ

他のオブジェクト同様、MLモデルにもタグを貼ることができます。
モデルごとのprd_versionのタグの値を読み取ることで、本番稼働中のバージョンを取得できます。

-- prd_versionタグに稼働中のバージョン名をセットする
ALTER MODEL prd_database.model_schema.my_model
SET TAG prd_version = 'V1';

-- prd_versionのタグ値を読み取る
-- モデルのSQLドメインは'MODULE'です
SET prd_version_name = (
    SELECT
    SYSTEM$GET_TAG('prd_database.model_schema.prd_version', 'my_model', 'MODULE')
);

-- SQLでprd_versionのモデルで推論を実行
WITH prd_version_model AS MODEL my_model VERSION IDENTIFIER($prd_version_name)
    SELECT prd_version_model!predict(...) ... ;

同様の方法でstg_versionbeta_version等を設定し、呼び出すことができます。
この方法はモデルのOWNERSHIPを持つロールとは別のロールでモデルのライフサイクルを制御したい場合に有用です。

エイリアス

モデルの各バージョンはエイリアスを持つことができます。
エイリアスはモデル内で排他的に扱われ、同一のエイリアスを複数バージョンに付与することはできません。

-- SQLでエイリアスを設定
ALTER MODEL my_model VERSION v1 SET ALIAS = prd;

-- prdを呼び出して推論を実行
WITH prd_version AS MODEL my_model VERSION prd
    SELECT prd_version!predict(...) ... ;

こちらの方がタグよりシンプルに扱うことができます。
しかし、エイリアスはモデルのOWNERSHIPを持つロールしか設定できません。
データサイエンティストやMLエンジニアがモデルの作成から運用バージョンの管理までを行う場合に向いていると思います。

今回はエイリアスを使った手法で実装しますが、タグを使う場合も基本的な挙動は同じです。

エイリアスを使った実装

モデルレジストリの利用については以下チュートリアルが参考になります。

https://www.snowflake.com/en/developers/guides/develop-and-manage-ml-models-with-feature-store-and-model-registry/#0

モデルを作成して、レジストリに保存する部分を簡略化すると以下のようになります。

from snowflake.ml.registry import Registry
from snowflake.ml.modeling.ensemble import RandomForestRegressor

def train_model_and_save(training_data_df):
    # 中略
    # モデルを初期化
    rf = RandomForestRegressor(
        input_cols=feature_columns, label_cols=label_cols, 
        max_depth=3, n_estimators=20, random_state=42
    )
    # 学習
    rf.fit(train)

    # モデルレジストリを初期化
    registry = Registry(
        session=session, 
        database_name=session.get_current_database(), 
        schema_name=MODEL_TEST_SCHEMA,
    )

    # モデルを保存
    model_name = "RANDOM_FOREST_REGRESSOR_MODEL"
    mv = registry.log_model(
        model_name=model_name,
        # version_name="v1",
        model=rf,
        comment="demo model"
    )

検証環境へのデプロイ

先程の開発したコードはgitブランチにコミットされ、プルリクエストをもってCI/CDプロセスに載ります。
ここではMLJobsを使ったCDプロセスを紹介します。

ディレクトリは以下のような構成にします。

./
├─ models/
│  ├─ training_random_forest.py
│  ├─ training_lightgbm.py
│  ├─ training_xgboost.py
│  └─ ・・・
└─ .azuredevops/
    ├─ requirements.txt
    ├─ job_helper.py
    └─ pipeline.yaml

使用しているプラットフォームはAzure Pipelinesです。
pipeline.yamlからjob_helper.pyが実行され、MLJobとしてmodels配下のtraining_xxx.pyがSnowflake上で実行されます。

# models/training_random_forest.py
def train_model_and_save(training_data_df):
    # 中略
    # レジストリへの保存までは先ほどと同じです

    # 既存のstgエイリアスがついたモデルからエイリアスを外す
    mv_old = registry.get_model(model_name).version('stg')
    mv_old.unset_alias('stg')

    # デプロイしたモデルにエイリアスを設定
    mv.set_alias('stg')


if __name__ == "__main__":
    train_model_and_save(train_df)

先程のコードに追加で、作成したモデルにstgというエイリアスを付けています。
これは本番環境へリリースする予定のモデルに付けるエイリアスとします。

# job_helper.py
from snowflake.ml.jobs import submit_file
from snowflake.snowpark import Session
import toml

def training_job (file_name, compute_pool, stage_name):

    # セッションを作成
    config = toml.load("xxxxx/connections.toml")
    connection = 'STG_ENV_CONNECTION'
    session = Session.builder.configs(config[connection]).create()

    # MLJobを送信
    job = submit_file(
        file_name,
        compute_pool,
        stage_name = stage_name,
        session=session
    )

    job.wait()

if __name__ == "__main__":
    # file_nameはmodels配下の学習コードが書かれたファイルです。
    # pipeline.yamlから動的に受け取ります(argparseとか省略してます)。
    training_job(file_name, compute_pool='test_compute_pool', stage_name='model_stage')

MLJobsについてはこちらの記事をご参考下さい。

ここまでで、開発したコードを使用して検証環境にモデルをデプロイすることができました。
MLJobはSnowflake上のコンピュートプールを使用するため、Azure PipelinesやGithub Actionsのコンピューティングは低スペックで問題ないです。
実際にはpipeline.yamlから様々な引数を渡す必要があり、もう少し複雑な構成になりますが、ここでは割愛します。

本番環境へのデプロイ

同じアカウント内であれば、モデルをコピーすることができます。
バージョン名にエイリアスを指定できます。

CREATE MODEL prd_env.model_schema.random_forest_regressor_model WITH VERSION stg
    FROM MODEL uat_env.model_schema.random_forest_regressor_model VERSION stg;

しかし、自作したMLモデルをアカウント間で共有することはまだできません。
公式ドキュメントでは「将来のリリースで利用可能になる予定です。」と記載されています。

いくつか代替案はありそうですが、まずは検証環境のモデルを読み込んで、それを本番環境に登録するという仕組みを考えました。

全部を書くと長くなってしまうので、端折ります。
mainブランチにコミットされたときにトリガーされ、検証環境から本番環境へモデルがデプロイされます。

# 検証環境のモデルを取得する
source_session = ...   # 検証環境のセッション
source_registry = Registry(session=source_session)  
   
model = source_registry.get_model("model_name")  
mv = model.version('stg') 
  
loaded_model = mv.load(force=True)   # モデルの元のPythonオブジェクトをロード  

# 本番環境へモデルを登録する
dest_session = ...   # 本番環境のセッション
dest_registry = Registry(session=dest_session)  
  
# ロードしたモデルを新しいアカウントに登録  
dest_registry.log_model(  
    model=loaded_model,  
    model_name="model_name",  
)

この方法でもできたのですが、検証環境でモデルを学習したときの環境をAzure Pipelinesのコンピューティングで再現しなければならず、少々面倒でした。
モデルファイル一式をエクスポートしてSQLで登録する方法もあります。

# 検証環境のモデルを取得する
source_session = ...  
source_registry = Registry(session=source_session)  
   
model = source_registry.get_model("model_name")  
mv = model.version('stg') 
mv.export('/tmp/model/',export_mode=ExportMode.FULL)    # モデルファイルをダウンロードする


stage_prefix = "@MY_DB.PUBLIC.MODEL_STAGE/model_name/"
for root, _, files in os.walk('/tmp/model/'):

    rel = os.path.relpath(root, local_dir).replace("\\", "/")
    stage_prefix = f"{stage_root}/{rel}" if rel != "." else stage_root

    # エクスポートしたモデルファイルをまとめてアップロード
    pattern = os.path.join(root, "*")
    dest_session.file.put(
            pattern,
            stage_prefix,
            auto_compress=False
        )

# ステージに置いたファイルからモデルを作成
dest_session.sql(f"""
    CREATE MODEL {model_name} FROM {stage_prefix}
""")

こちらの方法はステージにファイルをアップロードする手間があります。
どちらの方法もあまり使い勝手は良くないので、早くアカウント間共有ができるようになってほしいですね。

また、MANIFEST.ymlの冒頭にはリネージュに関するセクションがあります。

# MANIFEST.yml
lineage_sources:
- entity: MY_DB.PUBLIC.my_cool_training_dataset
  type: DATASET
  version: '1.0'
manifest_version: '1.0'
methods:
- handler: functions.predict.infer
# 後略

lineage_sourcesのデータベース名やテーブル名・データセット名が本番環境に存在していないとトラブルのもとになります。
アカウントを跨いでモデルをコピーする時はこのセクションは削除してしまったほうが安全です。
モデルのリネージュが表示できるのはとても良い機能ですが、背に腹は代えられません。

ところで、本番環境で運用中のモデルにはprdというエイリアスを付けます。
エイリアスを取り替えれば、デプロイしたモデルを運用パイプラインで推論させることができます。
本番環境にデプロイしたばかりのモデルはstgというエイリアスがついています。このモデルはprdとは別のコンピューティングでバッチ推論を実行することで、本番稼働に影響を及ぼすことなく、動作確認できます。
あるいは一定期間並行稼働して、stgprdのA/Bテストのようなことを実施することもできます。

この手法は運用パイプラインに手を加えることなく、モデルの交換が可能です。

CTパイプライン

CT(Continuous Training)のプロセスも、基本的な流れは前述のCI/CDプロセスと同じです。
つまり、本番環境の最新データを使って検証環境で再学習を行い、検証が済んだモデルを本番環境にデプロイしてstgエイリアスを付与します。

本番環境で直接モデルを再学習する構成もとれますが、極力デプロイの経路を統一したいため、検証環境で学習を行うこととしています。
開発時のCI/CDパイプラインとCTパイプラインの構成のうち、本番環境へデプロイする部分の構成を統一できるため管理がシンプルになることや、
本番環境のリソースを学習で消費しないこと、本番リリース前に必ず検証環境でのチェックを挟むことで安全性を高められる点がメリットとして挙げられます。

おわり

ここまで読んで頂きありがとうございました。
2025年はモデルレジストリを中心にSnowflake MLに大きなアップデートが沢山ありました。
とはいえまだまだ難しいところもありますね。
本番環境をアカウントごと分けるというユースケースは沢山あると思うので、そこを考慮した機能があると良いな~と思います。
早くアカウント間共有ができるようになってほしい!

疑問点やツッコミがあればコメントや、ワタシのXにDMをくださいm(__)m
いつでもウェルカムです!
https://x.com/takimiko_gohan?s=09

Discussion