📑

Vertex AI Feature Store: オフラインストアを試してみた

に公開

今回はVertex AI上で提供されているFeature Storeについて、オフラインサービング機能を試してみたので共有します。

Vertex AI Feature Storeとは?

まずはFeature Storeとは何かというところですが、簡単にいうとMLモデルを学習するための特徴量を中央集権的に管理してくれるレジストリです。MLモデルを開発する時はそれぞれの開発者がモデルに必要な特徴量を作るために前処理を実装しますが、生成された特徴量を他のエンジニアに共有するためにわざわざファイルに落として共有する必要があります。Feature Storeを導入すると、作成された特徴量はFeature Storeで管理されるため、他のエンジニアに対してデータを共有することができます。また、モデルをデプロイして推論する時もFeature Storeから取得したデータで推論させることにより、データの出所を一つにすることで処理の流れを明確にできます。

Vertex AIではFeature Storeをマネージドな機能として提供しており、オフラインサービング、オンラインサービング共に提供しています。オフラインサービングはいわゆるバッチ処理であり、複数データをまとめて処理させる時に参照されるストアになります。バックエンドとしてはBigQueryが利用されます。オンラインサービングはリアルタイムでレコード単位で推論する時に利用されるものであり、Bigtableなどがバックエンドとして利用されます。詳しくは以下のドキュメントを参照ください。

https://cloud.google.com/vertex-ai/docs/featurestore/latest/overview?hl=ja

オフラインストアを使ってみる

今回は公式で提供されているサンプルを利用して、オフラインストアにデータを追加し、そこから情報を参照するまでを試してみます。以下のレポジトリはFeature StoreだけでなくVertex AIに関わる様々なサンプルコードが提供されているので、ぜひ参考にしてください。

https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/official/feature_store/offline_feature_serving_from_bigquery_with_feature_registry.ipynb

環境構築

サンプルはノートブックで実装されていますが、今回はスクリプトで実行してみます。uvを使って以下のように環境構築をしました。bigframe2はBigQueryのデータをデータフレームとして扱うためのSDKになります。jinja2は実行している時に必要になるエラーが出たため追加しています。

uv init feature_store_offline -p 3.12
cd feature_store_offline
uv add google-cloud-aiplatform bigframes jinja2 python-dotenv

https://cloud.google.com/bigquery/docs/reference/bigquery-dataframes?hl=ja

環境変数の定義

まず、PROJECT_IDLOCATION.envに定義します。

.env
PROJECT_ID=...
LOCATION=us-central1

次にコード上で以下を実行して環境変数を読み込みます。

import os
from dotenv import load_dotenv

load_dotenv()

PROJECT_ID = os.environ["PROJECT_ID"]
LOCATION = os.environ["LOCATION"]

BigQuery

それではBigQueryでテーブルを作成してみます。今回はGoogle Cloudが提供しているパブリックなサンプルデータセットを利用してテーブルを作成します。

import bigframes
import bigframes.pandas
import pandas as pd

BQ_DATASET_ID="fhfv_dataset_unique"
BQ_TABLE_ID="fhfv_table_unique"
FEATURE_GROUP_ID="fhfv_fg_unique"
DATA_SOURCE="gs://cloud-samples-data-us-central1/vertex-ai/feature-store/datasets/movie_prediction.csv"
BQ_TABLE_URI=f"{PROJECT_ID}.{BQ_DATASET_ID}.{BQ_TABLE_ID}"


def create_bigquery_table():
    session = bigframes.connect(
        bigframes.BigQueryOptions(
            project=PROJECT_ID,
            location=LOCATION,
        )
    )
    df = session.read_csv(DATA_SOURCE)
    df["timestamp"] = pd.to_datetime(df["timestamp"], utc=True)
    df = df.rename(columns={"timestamp": "feature_timestamp"})
    df.to_gbq(BQ_TABLE_URI, if_exists="replace")

create_bigquery_table関数を実行すると、Google Cloud Storageにあるパブリックデータを参照し、データフレームとして読み込んだ上でBigQueryのテーブルを作成しています。なお、後々出てきますがFeature Storeを構築するにあたり一意に識別するためのカラム(プライマリーキーみたいなもの)とイベントタイムスタンプが必要で、元々データにあったtimestampカラムをfeature_timestampという名前に変更しています。この関数を実行するとBigQueryにテーブルが作成されます。


BigQueryのスキーマ画面

特徴グループの作成

特徴グループはGoogle Cloud上の定義は以下になります。

特徴グループは、BigQueryのソーステーブルまたは特徴データを含むビューに対応する特徴レジストリリソースです。特徴ビューには特徴が含まれることがあり、データソース内の特徴列の論理グループと考えることができます。

要約すると、Feature StoreがBigQuery上のデータを特徴量として扱っていることを示すにあたり、どういうカラムをグループとみなすかをまとめているようなものです。

以下の実装をすることで、特徴グループを作成できます。

from vertexai.resources.preview.feature_store import Feature, FeatureGroup
from vertexai.resources.preview.feature_store import utils as fs_utils

def create_feature_group():
    fg: FeatureGroup = FeatureGroup.create(
        f"{FEATURE_GROUP_ID}",
        fs_utils.FeatureGroupBigQuerySource(
            uri=f"bq://{BQ_TABLE_URI}", entity_id_columns=["users"]
        ),
    )
    movies_feature: Feature = fg.create_feature("movies")
    return fg

create_feature_group関数を呼び出すと、特徴グループが作成されます。entity_id_columnsの指定ですが、こちらは先ほど話に出したいわゆるプライマリーキーのようなものになります。この特徴グループでレコードを特定するためのキーとしてusersを参照するというように指定します。タイムスタンプについては特別指定しない場合feature_timestampが自動的に参照されます。また、特徴量としてmoviesカラムを利用したいので、fg.create_feature("movies")を実行して、特徴量としてmoviesを利用すると定義します。これを実行するとVertex AI上のFeature Storeに以下が追加されます。


Feature Store登録結果

特徴量の取得

先ほどまでの手続きで特徴量をBigQueryにて管理し、Feature Storeから参照できるようにしたので、最後にFeature Store経由でデータを取得してみましょう。

import pandas as pd
from vertexai.resources.preview.feature_store import Feature

def fetch_data(fg):
    entity_df = pd.DataFrame(
        data={
            "users": ["alice", "bob"],
            "timestamp": [
                pd.Timestamp("2021-09-14T09:36"),
                pd.Timestamp("2023-12-12T13:13"),
            ],
        },
    )

    movies_feature = fg.get_feature("movies")
    features = offline_store.fetch_historical_feature_values(
        entity_df=entity_df,
        features=[movies_feature],
    )
    print(features)

movies_feaetureですが、取得したい特徴量は選択的に選ばれるため、今回はmoviesを取得するために定義しています。指定しない場合userstimestampだけが取得されます。

実行すると以下のような結果を取得することができます。一つ目のタイムスタンプとalice、二つ目のタイムスタンプとbobの組み合わせで取得されていることが確認できます。

users   movies           timestamp
alice movie_03 2021-09-14 09:36:00
  bob movie_02 2023-12-12 13:13:00

[2 rows x 3 columns]

コード全体像

以下がコード全体像になります。

feature_store_offline.py
import os
import vertexai
from dotenv import load_dotenv
import bigframes
import bigframes.pandas
import pandas as pd
from google.cloud import bigquery
from vertexai.resources.preview.feature_store import (Feature, FeatureGroup,
                                                      offline_store)
from vertexai.resources.preview.feature_store import utils as fs_utils

load_dotenv()

PROJECT_ID = os.environ["PROJECT_ID"]
LOCATION = os.environ["LOCATION"]

BQ_DATASET_ID = "fhfv_dataset_unique"
BQ_TABLE_ID = "fhfv_table_unique"
FEATURE_GROUP_ID = "fhfv_fg_unique"
DATA_SOURCE = f"gs://cloud-samples-data-us-central1/vertex-ai/feature-store/datasets/movie_prediction.csv"
BQ_TABLE_URI=f"{PROJECT_ID}.{BQ_DATASET_ID}.{BQ_TABLE_ID}"


def create_bigquery_table():
    session = bigframes.connect(
        bigframes.BigQueryOptions(
            project=PROJECT_ID,
            location=LOCATION,
        )
    )
    df = session.read_csv(DATA_SOURCE)
    df["timestamp"] = pd.to_datetime(df["timestamp"], utc=True)
    df = df.rename(columns={"timestamp": "feature_timestamp"})
    df.to_gbq(BQ_TABLE_URI, if_exists="replace")
    

def create_feature_group():
    fg: FeatureGroup = FeatureGroup.create(
        f"{FEATURE_GROUP_ID}",
        fs_utils.FeatureGroupBigQuerySource(
            uri=f"bq://{BQ_TABLE_URI}", entity_id_columns=["users"]
        ),
    )
    movies_feature: Feature = fg.create_feature("movies")
    return fg


def fetch_data(fg):
    entity_df = pd.DataFrame(
        data={
            "users": ["alice", "bob"],
            "timestamp": [
                pd.Timestamp("2021-09-14T09:36"),
                pd.Timestamp("2023-12-12T13:13"),
            ],
        },
    )

    movies_feature = fg.get_feature("movies")
    features = offline_store.fetch_historical_feature_values(
        entity_df=entity_df,
        features=[movies_feature],
    )
    print(features)

if __name__ == "__main__":
    vertexai.init(project=PROJECT_ID, location=LOCATION)
    create_bigquery_table()
    fg = create_feature_group()
    fetch_data(fg)

まとめ

今回はVertex AIのFeature Storeを利用して、オフラインデータサービングを試してみました。オフラインサービングを利用する場合はリアルタイム推論ではなくデータが貯まった後に実行される機能となります。バッチ処理を実現したい場合はぜひ利用してみてください。

Discussion