Zenn
🛟

特徴量を言語を越えて一貫して管理する, 『特徴量ドリブン』な MLOps の実現への試み

2025/03/28に公開
10

MIXI minimo の システム開発グループ AI 推進チーム で機械学習関連の施策をしている Taniii です.

モデルの学習から推論, 実サービスへの実装までの一連の流れで, 品質を保証し, 高速にモデル改善のサイクルを回すためには, 特徴量の一貫した管理と, その管理の自動化が重要だと考えています.

これらを実現するために, MIXI の運営するサービス minimo では, 特徴量の管理を中心に据えた自動化を導入しました.

本記事では, 特徴量ドリブンな MLOps を実現するための試みを紹介します.

要約すると...

最初に結論を書くと, 以下のような自動化フローを構築しています.

自動化フローのシステム構成図

詳細について, 次節以降で説明します.

サービスのバックグラウンドと機械学習の活用

minimo は, 月間 200 万人以上 (WEB, アプリの合計. 2021 年 11 月時点) のお客様に利用いただいている予約サービスで, 美容師やネイリスト・アイデザイナーなどのアーティストを, サロン単位ではなく, 個人単位で予約することができます.

ユーザが自らの『なりたい』を叶えるのに最適なアーティストと出会えるように, ユーザやアーティストの特徴量を活用することで, ユーザそれぞれに最適な検索結果を予測するモデルを構築しています.

10 年以上の歴史を持つサービスのため, 様々なシステムが連携してサービスを形成しており, システム間での特徴量の管理が複雑になっていました.

そのため, 各システムそれぞれで, 特徴量リストを管理する必要があり, 特徴量の追加や変更があった際には, それぞれのシステムで特徴量の追加や変更を行うコストが発生するほか, 特徴量の整合性が取れないことによる品質低下のリスクがありました.

このような課題を解決するために, 特徴量の一貫した管理と, その管理の自動化する施策を行うことになりました.

特徴量ドリブンな MLOps とは...

『特徴量ドリブン』という言葉は, 「特徴量を中心にして」という意味を表すために, 今回の取り組みにおける軸となるキーワードとして採用したものです.

特徴量ドリブンな MLOps は, 特徴量を第一級の資産として扱い, 学習・推論・実サービスへの実装の一連のサイクルや, Python, Go, BigQuery, ElasticSearch など言語やシステムを越えて, 特徴量を一貫して管理することで, モデルの品質を保証し, 高速にモデル改善のサイクルを回すことを目指す取り組み と定義しています.

具体的な詳細を以下にまとめてみます.

方針

特徴量ドリブンな MLOps の実現を目指して, 以下の方針を定義しました.

  • 特徴量の定義・バージョニングを体系化

  • 特徴量の定義の更新を起点として, 学習, オフライン評価, 推論用サーバの実装, サービスのメインサーバの更新, AB テスト評価の, 一連のサイクルを自動化

利点

特徴量ドリブンな MLOps を実現することで, 以下の利点を期待します.

  • 再現性の向上

学習時と推論時で, 特徴量の定義が異なることによる品質低下を防ぎます.

学習時のコンテキストを正確に記録し, 推論時に再現することで, モデルの品質を保証します.

  • 保守性の向上

モデルの改良の中で, 特徴量の追加・削除が繰り返されても, モデルのバージョンを正確に管理し, 変更を追跡することで, 特徴量の保守性を向上させます.

AB テストなどの結果, 特定のモデルバージョンに戻す際にも, 簡単にリバートや切り替えができるようになります.

  • 可視性の向上

各モデルバージョンの特徴量の定義を一元管理することで, 各モデルのパフォーマンスと特徴量の関係を可視化し, モデルの品質向上につなげます.

モデルの性能劣化の原因を特徴量単位でトラッキングでき, 早期に異常を検知することができます.

  • 効率性の向上

新しいモデルや特徴量を追加する際に, 特徴量の定義の更新を起点として, 自動化されたフローに従って, 学習, オフライン評価, 推論用サーバの実装, サービスのメインサーバの更新, AB テスト評価を行うことで, モデルの改善サイクルを高速化します.

全体のシステム構成と自動化の流れ

まずは, minimo における全体のシステム構成と, その中でどのように機械学習が取り入れられているかを紹介します.

機械学習導入前の既存のシステム

minimo では, メインのバックエンドサーバが, Go で実装されています. (以下, Go サーバと記載します)

また, 永続化のために Aurora MySQL を使用しており, データウェアハウスとして BigQuery, 検索エンジンとして OpenSearch (ElasticSearch のフォーク) を使用しています.

機械学習導入前の既存のシステム構成図

機械学習導入

ここに, 機械学習を導入するために, Python の推論サーバ (以下, Python 推論サーバ) を新たに用意して, Go サーバから必要に応じてリクエストを送るようにしています.

機械学習導入のシステム構成図

  • 特徴量テーブルの作成には LookML

  • モデル学習には Google Cloud の Vertex AI Custom Training

  • Python 推論サーバには AWS の SageMaker

を使用しています.

黄色の矢印は, ユーザがサービスを利用した時にリアルタイムに実行される内容,

青色の矢印は, モデルの改善時やバッチ処理など, オフラインで実行される内容を表しています.

サービスへの機械学習導入の各部分の詳細については, 以下の記事にまとめられているのでぜひご覧ください.

https://zenn.dev/mixi/articles/e236ef47f4aaf0

https://zenn.dev/mixi/articles/1bf68e571f428d

ここでは, 特徴量を追加したり削除したりして, 新しいモデルを作る際の大まかな流れのみを紹介します.

新しいモデルの作成の流れ

  1. 特徴量を生成する

    • BigQuery からデータを取得し, LookML で特徴量テーブルを生成し, BigQuery に保存する
  2. モデルの学習

    • Python で, BigQuery から特徴量テーブルを取得・前処理・モデルを学習・オフライン評価の一連の流れのスクリプトを準備する
    • これを, Vertex AI Custom Training で実行する
  3. モデルの評価

    • Vertex AI Custom Training の結果を確認し, モデルの品質を評価する
    • この後に進むかどうかを判断する
  4. Python 推論サーバの用意

    • 特徴量を入力として受け取り, モデルを使って推論し, その結果を返す Python 推論サーバを準備する
    • これを, SageMaker にデプロイする
  5. Go サーバの更新

    • Go サーバに, Python 推論サーバへリクエストを送るための, リクエスト定義や実装を更新する
    • 具体的には, オニオンアーキテクチャにおける, インフラ層の datamodel や repository の実装を更新する
    • これを, デプロイする
  6. 公開

    • Go サーバ・Python 推論サーバを介した推論結果と, オフラインでの評価結果を比較し, モデルの品質を確認する
    • これらを, 公開する

自動化フロー

ここまで, サービスへの機械学習導入と新しいモデルの作成の流れについて紹介しました.

ここから, 特徴量ドリブンな MLOps を実現するために, 構築した自動化フローについて紹介します.

今回の施策に限った話ではないかもしれないですが, 自動化の設計の際に最も重要視したポイントは, 依存関係 です.

特徴量の定義を変更した際に, それに依存する各システムの変更, そしてさらにそれに依存するシステムを変更... と, 自動化することで, 特徴量の一貫性を保証し, モデル作成のサイクルの負担を軽減し, モデル改善を高速化できるようにしています.

なお, Go サーバと Python 推論サーバの両方で, 任意の PR を Staging 環境にサンドボックス的に立ち上げて検証を経てから, release ブランチにマージすることで, Production 環境にデプロイする仕組みがすでに構築されており, これに従って自動化フローも構築しています.

  1. 学習時の DataFrame から特徴量リストを出力する
  • 学習時に, DataFrame から特徴量リストを出力するスクリプトを実行し, 特徴量リストを生成します
  • モデルバージョンごとの, 学習済みモデル・カテゴリカルエンコーダー (前処理に使用)・特徴量リストの組み合わせ を, GCS に保存し厳密に管理します

DataFrame から特徴量リストを出力するスクリプトは, 以下のようになっています.

また, 各特徴量の期待される型も, DataFrame から合わせて出力しています.

def dump_features_list(self, df_train):

    # データフレームの特徴量とその型の一覧をバケット出力
    features_list = pd.DataFrame({'feature': df_train.columns, 'type': df_train.dtypes.astype(str)})
    self.bucket.upload_csv_from_string(
        features_list.to_csv(index=False),
        os.path.join(
            'model',
            self.model_version,
            'features_list.csv'
        )
    )
  1. 新しいモデルの, 学習済みモデル・カテゴリカルエンコーダー・特徴量リストの組み合わせを GCS から Python 推論サーバに取得する

  2. 特徴量リストを基に, Python 推論サーバの openapi.yaml のリクエスト型 と 特徴量構造体 (辞書) を自動生成して, PR を立てる

  • モデル学習時のスクリプトが, main ブランチにマージされたのをトリガーにします
  • Actions 上で, 特徴量リストを基に, Python 推論サーバの openapi.yaml のリクエスト型 と 特徴量構造体 (辞書) を自動生成し, PR を立てます

openapi.yaml の自動生成では, 以下のようなスクリプトを, Actions のワークフローから呼び出しています.

リクエストで受け取った特徴量を, Python の辞書に詰めるにあたっての, 特徴量構造体 (辞書) の生成も同様に行っています.

import csv
import yaml
import argparse
import os

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('--csv', required=True, help='Path to the CSV file (features_list.csv)')
    parser.add_argument('--out', required=True, help='Path to the output openapi.yaml')
    args = parser.parse_args()

    # DF 型 -> OpenAPI 型 へのマッピング
    type_mapping = {
        'Int64': 'integer',
        'float64': 'number',
        'object': 'string'
    }

    properties = {}
    required_properties = []

    with open(args.csv, mode='r') as f:
        reader = csv.reader(f)
        header = next(reader)  # ヘッダー行: ['feature', 'type']
        for row in reader:
            if len(row) < 2:
                continue
            feature_name, feature_type = row[0], row[1]
            openapi_type = type_mapping.get(feature_type, 'string')
            property_schema = {
                'type': openapi_type
            }
            if openapi_type == 'integer':
                property_schema['x-go-type'] = 'uint64'
            elif openapi_type == 'number':
                property_schema['x-go-type'] = 'float64'
            elif openapi_type == 'string':
                property_schema['x-go-type'] = 'string'

            if is_required(feature_name):
                required_properties.append(feature_name)
            else:
                property_schema['nullable'] = True

            properties[feature_name] = property_schema

    model_name = os.path.splitext(os.path.basename(args.csv))[0]

    # OpenAPI ボイラプレート
    openapi_spec = {
        "openapi": "3.0.0",
        "info": {
            "title": f'ML PersonalOptimization Server : {model_name}',
            "version": "1.0.0"
        },
        "paths": {
            "/ping": {
                "get": {
                    "summary": "Health check endpoint",
                    "description": "Returns a simple status response indicating the service is running.",
                    "responses": {
                        "200": {
                            "description": "Service is available",
                            "content": {
                                "application/json": {
                                    "schema": {
                                        "type": "object",
                                        "properties": {
                                            "status": {
                                                "type": "string",
                                                "example": "ok"
                                            }
                                        }
                                    }
                                }
                            }
                        }
                    }
                }
            },
            "/invocations": {
                "post": {
                    "summary": "Model inference endpoint",
                    "requestBody": {
                        "required": "true",
                        "content": {
                            "application/json": {
                                "$ref": "#/components/schemas/InvocationRequest"
                            }
                        }
                    },
                    "responses": {
                        "200": {
                            "description": "Successfully returned predictions",
                            "content": {
                                "application/json": {
                                    "schema": {
                                        "type": "array",
                                        "items": {
                                            "type": "object",
                                            "properties": {
                                                "recruitment_id": { "type": "integer", "example": 12345 },
                                                "predicted_score": { "type": "number", "example": 0.85 }
                                            }
                                        }
                                    }
                                }
                            }
                        },
                        "500": {
                            "description": "Internal server error",
                            "content": {
                                "application/json": {
                                    "schema": {
                                        "type": "object",
                                        "properties": {
                                            "status": { "type": "string", "example": "error" },
                                            "message": { "type": "string", "example": "Failed to process request." }
                                        }
                                    }
                                }
                            }
                        }
                    }
                }
            }
        },
        'components': {
            'schemas': {
                'InvocationRequest': {
                    'type': 'object',
                    'properties': properties,
                    'required': required_properties
                }
            }
        }
    }

    # YAMLファイルに書き出し
    with open(args.out, 'w') as out_file:
        yaml.dump(openapi_spec, out_file, default_flow_style=False, sort_keys=False)

    print(f"OpenAPI spec generated: {args.out}")

if __name__ == '__main__':
    main()
  1. 推論に使用するモデルバージョンを指定し, Python 推論サーバを Staging 環境にデプロイする
  • すでに, モデル推論に必要な,
    • 学習済みモデル
    • カテゴリカルエンコーダ
    • 特徴量構造体
    • OpenAPI
      の組み合わせが用意されているので, 今回使用したいモデルバージョンを環境変数で 1 行指定する PR を立てることで, 自動的に Python 推論サーバの Staging 環境を SageMaker にデプロイすることができます
  1. Go サーバのインフラ層に, OpenAPI の定義から, 新しいモデルバージョンの datamodel を自動生成する
  • Python 推論サーバの Staging デプロイが完了したのをトリガーにします
  • Actions で, Python 推論サーバの OpenAPI の定義を基に, Go サーバのインフラ層に, 新しいモデルバージョンの datamodel を自動生成し, PR を立てます

OpenAPI -> Go の datamodel への自動生成では, oapi-codegen を Actions のワークフローから呼び出しています.

  1. AB テストそれぞれで, 使用するモデルバージョンの自動生成された構造体 (⑤) をエイリアスで指定して, Staging 環境にデプロイする
  • すでに, Go から Python 推論サーバにリクエストするのに必要な, datamodel が用意されているので, 今回使用したいモデルバージョンを AB テストのグループごとに, エイリアスで指定して, PR を立てた上で, Go サーバを Staging 環境にデプロイします

AB テストのグループごとに, 使用するモデルバージョンをエイリアスで指定する部分は, 以下のようになっています.

package sagemakermodel

// どの MODEL_VERSION を使うかここで指定
import (
	schema_a "[...省略]/infrastructure/datamodel/sagemakermodel/versions/[旧モデルバージョン名]/schema"
	schema_c "[...省略]/infrastructure/datamodel/sagemakermodel/versions/[新モデルバージョン名]/schema"
)

// A, B, C の任意のグループを使わないときに使うプレースホルダ
type Placeholder struct{}

type FeatureValueDocJson interface {
	FeatureValueADocJson | FeatureValueBDocJson | FeatureValueCDocJson
}

// 使わないグループについては `Placeholder` を割り当てておく
type FeatureValueADocJson = schema_a.InvocationRequest
type FeatureValueBDocJson = Placeholder
type FeatureValueCDocJson = schema_c.InvocationRequest
  1. 同一のモデルバージョン・特徴量で, オフライン環境での推論結果と Staging 環境の推論結果が一致することをチェックする
  • オフライン環境で手元でモデルを動かした場合の推論結果と, Staging 環境を通した推論結果が一致することを確認して, モデルの品質を保証します
  1. Go サーバ / Python 推論サーバの両方を release ブランチにマージして, デプロイ完了!

まとめ

特徴量を中心に据え, 各定義とその依存関係に従って, 自動化を導入しました.

  • 学習時には, 定義した特徴量リストを元に特徴量を構造体に詰めて, モデルを学習します.
  • 学習時の特徴量を基に, 推論時に使う Python 推論サーバの OpenAPI のリクエスト型 と 特徴量構造体 (辞書) を自動生成します.
  • OpenAPI を元に, 既存の Go サーバのインフラ層に, 新しいモデルバージョンの datamodel を自動生成します.

特徴量を中心に据えることで, モデルの品質を保証し, 高速にモデル改善のサイクルを回すことができるようになりました.

今後のさらに範囲を広げた自動化の取り組みにおいても, 依存関係を意識した設計を心がけ, 特徴量を中心に据えた自動化を進めていきたいと考えています.

さいごに

MIXI と minimo では一緒に働く仲間を募集中です!

"叶えたい" 人と, "叶えられる" 人をつなぐことで, 世の中にたくさんの「ちいさな素敵」を生み出せると思っています!

ぜひ, 一緒に minimo を創りませんか?

https://mixigroup-recruit.mixi.co.jp/jobs/

GitHubで編集を提案
10
MIXI DEVELOPERS Tech Blog

Discussion

ログインするとコメントできます