🙌

Amazon SageMaker Feature StoreにKaggleのTitanicデータを登録してみた

2020/12/19に公開

この記事はニフティグループ Advent Calendar 2020の18日目の記事です。
@y_konoさんでAPI経由でAWXにインベントリーを作成するでした。

はじめに

先日のAWS re:Invent 2020でSageMakerに関するアップデートが多数発表されました。
今回はその一つであるSageMaker Feature Storeを公式のexampleを参考に試してみます。
公式ではクレジットカードの不正利用データを使用していますが、少し改変してKaggleのTitanicデータを使用します。

準備

ノートブックインスタンスを作成

まずはじめにSageMakerのノートブックインスタンスを作成しましょう。
同じく先日から利用可能になったCloudShellからCLIを使って作成してみます。

[cloudshell-user@ip-xxxxx ~]$ aws sagemaker create-notebook-instance --notebook-instance-name test-feature-store --instance-type ml.t2.medium --role-arn arn:aws:iam::xxxxx
{
    "NotebookInstanceArn": "arn:aws:sagemaker:ap-northeast-1:xxxxx
}

InServiceになったら下記のUrlを確認して、ノートブックインスタンスにアクセスします。

[cloudshell-user@ip-xxxxx ~]$ aws sagemaker describe-notebook-instance --notebook-instance-name test-feature-store
{
    "NotebookInstanceArn": "arn:aws:sagemaker:ap-northeast-1:xxxxx",
    "NotebookInstanceName": "test-feature-store",
    "NotebookInstanceStatus": "InService",
    "Url": "test-feature-store.notebook.ap-northeast-1.sagemaker.aws",
    "InstanceType": "ml.t2.medium",
    "RoleArn": "arn:aws:iam::xxxxx",
    "LastModifiedTime": "2020-12-18T10:12:03.771000+00:00",
    "CreationTime": "2020-12-18T10:08:29.150000+00:00",
    "DirectInternetAccess": "Enabled",
    "VolumeSizeInGB": 5,
    "RootAccess": "Enabled"
}

続いて適当なノートブックを作成します。

データの用意

今回はTitanicデータを利用するのでKaggle APIを利用してダウンロードします。
もちろんローカルにダウンロードしてからアップロードしても構いません。

!pip install kaggle --upgrade
!mkdir ~/.kaggle
!echo '{"username":"xxxxx","key":"xxxxx"}' > ~/.kaggle/kaggle.json
!kaggle competitions download -c titanic 
!unzip titanic.zip

pandasでtrain.csvを読み込みます。

import pandas as pd

df = pd.read_csv('train.csv')

Feature Storeを利用するにはレコードを一意に特定するカラムと、それぞれのイベントの発生時刻を表すカラムがそれぞれ必要になります。
それぞれrow_idevent_timeという名前で追加しておきましょう。
なお、event_timeはひとまず現在時刻のUnixtimeを入れておきます。
実際の運用では新規に取得されたデータが随時追加されていくことが想定されますが、その場合にはevent_timeに最新の時刻が入ることでバージョニングが実現されるイメージです。

import time

df = df.reset_index()
df = df.rename(columns={'index': 'row_id'})
current_time_sec = int(round(time.time()))
df['event_time'] = pd.Series([current_time_sec]*len(df), dtype="float64")
df.head()

Feature Storeの利用

Feature Storeのセッションを作成

Feature Storeを利用するためのセッションを作成します。

import boto3
import sagemaker
from sagemaker.session import Session

region = boto3.Session().region_name

boto_session = boto3.Session(region_name=region)

sagemaker_client = boto_session.client(service_name='sagemaker', region_name=region)
featurestore_runtime = boto_session.client(service_name='sagemaker-featurestore-runtime', region_name=region)

feature_store_session = Session(
    boto_session=boto_session,
    sagemaker_client=sagemaker_client,
    sagemaker_featurestore_runtime_client=featurestore_runtime
)

Feature Storeを利用するためのS3とロールの設定を行います

default_s3_bucket_name = feature_store_session.default_bucket()
prefix = 'sagemaker-featurestore-demo'

print(default_s3_bucket_name)

from sagemaker import get_execution_role

# You can modify the following to use a role of your choosing. See the documentation for how to create this.
role = get_execution_role()
print(role)

Featureをまとめる単位であるFeature Groupを作成します。
まず、Feature Groupの名前を現在時刻を元に定義します。

from time import gmtime, strftime

feature_group_name = f'feature-group-{strftime("%d-%H-%M-%S", gmtime())}'

続いて、FeatureGroupのインスタンスを作成します。

from sagemaker.feature_store.feature_group import FeatureGroup

feature_group = FeatureGroup(name=feature_group_name, sagemaker_session=feature_store_session)

続いて、列名と型の定義をfeature_groupに認識させるのですが、Feature StoreではPandasのobject型を使用できないため、string型に変換させる処理を事前に行います。
その後にload_feature_definitionsにデータフレームを渡すことにより、列名と型が認識されFeature Groupを作成する準備が整います。

def cast_object_to_string(data_frame):
    for label in data_frame.columns:
        if data_frame.dtypes[label] == 'object':
            data_frame[label] = data_frame[label].astype("str").astype("string")

# cast object dtype to string. The SageMaker FeatureStore Python SDK will then map the string dtype to String feature type.
cast_object_to_string(df)

# record identifier and event time feature names
record_identifier_feature_name = 'row_id'
event_time_feature_name = 'event_time'

# load feature definitions to the feature group. SageMaker FeatureStore Python SDK will auto-detect the data schema based on input data.
feature_group.load_feature_definitions(data_frame=df); # output is suppressed

Feature Groupの作成

ここまで完了すればいよいよFeature Groupの作成です。
作成には1分未満程度の時間がかかるため、完了するまで待機する関数が紹介されていました。

def wait_for_feature_group_creation_complete(feature_group):
    status = feature_group.describe().get('FeatureGroupStatus')
    while status == 'Creating':
        print('Waiting for Feature Group Creation')
        time.sleep(5)
        status = feature_group.describe().get('FeatureGroupStatus')
    if status != 'Created':
        raise RuntimeError(f'Failed to create feature group {feature_group.name}')
    print(f'FeatureGroup {feature_group.name} successfully created.')

feature_group.create(
    s3_uri=f's3://{default_s3_bucket_name}/{prefix}',
    record_identifier_name=record_identifier_feature_name,
    event_time_feature_name=event_time_feature_name,
    role_arn=role,
    enable_online_store=True
)
Output
wait_for_feature_group_creation_complete(feature_group=feature_group)
Waiting for Feature Group Creation
Waiting for Feature Group Creation
Waiting for Feature Group Creation
Waiting for Feature Group Creation
FeatureGroup feature-group-19-03-53-17 successfully created.

FeatureGroup feature-group-xxxxx successfully created.と表示されたら無事Feature Groupの作成完了です。

データの投入

作成したFeature Groupにデータを投入してみましょう。feature_groupのingest関数を使います。

feature_group.ingest(
    data_frame=df, max_workers=3, wait=True
)

以上でFeature Storeへのデータの登録は完了です。

データの取得

最後にFeature Storeからデータを取得してみましょう。Athenaに対してクエリを投げる形で取得できます。

query = feature_group.athena_query()
table = query.table_name

query_string = f'SELECT * FROM "{table}"'
print('Running ' + query_string)

# run Athena query. The output is loaded to a Pandas dataframe.
dataset = pd.DataFrame()
query.run(query_string=query_string, output_location=f's3://{default_s3_bucket_name}/{prefix}/query_results/')
query.wait()
dataset = query.as_dataframe()

dataset
Output
Running SELECT * FROM "feature-group-19-03-53-17-1608350000"

	row_id	passengerid	survived	pclass	name	sex	age	sibsp	parch	ticket	fare	cabin	embarked	event_time	write_time	api_invocation_time	is_deleted
0	0	1	0	3	Braund, Mr. Owen Harris	male	22.0	1	0	A/5 21171	7.25	NaN	S	1.608350e+09	2020-12-19 03:59:17.165	2020-12-19 03:53:41.000	False
1	595	596	0	3	Van Impe, Mr. Jean Baptiste	male	36.0	1	1	345773	24.15	NaN	S	1.608350e+09	2020-12-19 03:59:17.165	2020-12-19 03:53:42.000	False
2	4	5	0	3	Allen, Mr. William Henry	male	35.0	0	0	373450	8.05	NaN	S	1.608350e+09	2020-12-19 03:59:17.165	2020-12-19 03:53:42.000	False
3	3	4	1	1	Futrelle, Mrs. Jacques Heath (Lily May Peel)	female	35.0	1	0	113803	53.10	C123	S	1.608350e+09	2020-12-19 03:59:16.080	2020-12-19 03:53:42.000	False
4	594	595	0	2	Chapman, Mr. John Henry	male	37.0	1	0	SC/AH 29037	26.00	NaN	S	1.608350e+09	2020-12-19 03:58:48.791	2020-12-19 03:53:41.000	False

無事にデータの取得ができました!

結び

チームメンバーの各々の環境で別々に特徴量を作成していると、厳密な定義がずれてきてしまうことがあります。
Feature Storeを特徴量のリポジトリとして利用することにより、メンバー間で同一の特徴量を使用している状況が担保できそうです。

明日は@shotawsさんです!

Discussion