Amazon SageMaker Feature StoreにKaggleのTitanicデータを登録してみた
この記事はニフティグループ Advent Calendar 2020の18日目の記事です。
@y_konoさんでAPI経由でAWXにインベントリーを作成するでした。
はじめに
先日のAWS re:Invent 2020でSageMakerに関するアップデートが多数発表されました。
今回はその一つであるSageMaker Feature Storeを公式のexampleを参考に試してみます。
公式ではクレジットカードの不正利用データを使用していますが、少し改変してKaggleのTitanicデータを使用します。
準備
ノートブックインスタンスを作成
まずはじめにSageMakerのノートブックインスタンスを作成しましょう。
同じく先日から利用可能になったCloudShellからCLIを使って作成してみます。
[cloudshell-user@ip-xxxxx ~]$ aws sagemaker create-notebook-instance --notebook-instance-name test-feature-store --instance-type ml.t2.medium --role-arn arn:aws:iam::xxxxx
{
"NotebookInstanceArn": "arn:aws:sagemaker:ap-northeast-1:xxxxx
}
InServiceになったら下記のUrl
を確認して、ノートブックインスタンスにアクセスします。
[cloudshell-user@ip-xxxxx ~]$ aws sagemaker describe-notebook-instance --notebook-instance-name test-feature-store
{
"NotebookInstanceArn": "arn:aws:sagemaker:ap-northeast-1:xxxxx",
"NotebookInstanceName": "test-feature-store",
"NotebookInstanceStatus": "InService",
"Url": "test-feature-store.notebook.ap-northeast-1.sagemaker.aws",
"InstanceType": "ml.t2.medium",
"RoleArn": "arn:aws:iam::xxxxx",
"LastModifiedTime": "2020-12-18T10:12:03.771000+00:00",
"CreationTime": "2020-12-18T10:08:29.150000+00:00",
"DirectInternetAccess": "Enabled",
"VolumeSizeInGB": 5,
"RootAccess": "Enabled"
}
続いて適当なノートブックを作成します。
データの用意
今回はTitanicデータを利用するのでKaggle APIを利用してダウンロードします。
もちろんローカルにダウンロードしてからアップロードしても構いません。
!pip install kaggle --upgrade
!mkdir ~/.kaggle
!echo '{"username":"xxxxx","key":"xxxxx"}' > ~/.kaggle/kaggle.json
!kaggle competitions download -c titanic
!unzip titanic.zip
pandasでtrain.csv
を読み込みます。
import pandas as pd
df = pd.read_csv('train.csv')
Feature Storeを利用するにはレコードを一意に特定するカラムと、それぞれのイベントの発生時刻を表すカラムがそれぞれ必要になります。
それぞれrow_id
、event_time
という名前で追加しておきましょう。
なお、event_time
はひとまず現在時刻のUnixtimeを入れておきます。
実際の運用では新規に取得されたデータが随時追加されていくことが想定されますが、その場合にはevent_time
に最新の時刻が入ることでバージョニングが実現されるイメージです。
import time
df = df.reset_index()
df = df.rename(columns={'index': 'row_id'})
current_time_sec = int(round(time.time()))
df['event_time'] = pd.Series([current_time_sec]*len(df), dtype="float64")
df.head()
Feature Storeの利用
Feature Storeのセッションを作成
Feature Storeを利用するためのセッションを作成します。
import boto3
import sagemaker
from sagemaker.session import Session
region = boto3.Session().region_name
boto_session = boto3.Session(region_name=region)
sagemaker_client = boto_session.client(service_name='sagemaker', region_name=region)
featurestore_runtime = boto_session.client(service_name='sagemaker-featurestore-runtime', region_name=region)
feature_store_session = Session(
boto_session=boto_session,
sagemaker_client=sagemaker_client,
sagemaker_featurestore_runtime_client=featurestore_runtime
)
Feature Storeを利用するためのS3とロールの設定を行います
default_s3_bucket_name = feature_store_session.default_bucket()
prefix = 'sagemaker-featurestore-demo'
print(default_s3_bucket_name)
from sagemaker import get_execution_role
# You can modify the following to use a role of your choosing. See the documentation for how to create this.
role = get_execution_role()
print(role)
Featureをまとめる単位であるFeature Groupを作成します。
まず、Feature Groupの名前を現在時刻を元に定義します。
from time import gmtime, strftime
feature_group_name = f'feature-group-{strftime("%d-%H-%M-%S", gmtime())}'
続いて、FeatureGroupのインスタンスを作成します。
from sagemaker.feature_store.feature_group import FeatureGroup
feature_group = FeatureGroup(name=feature_group_name, sagemaker_session=feature_store_session)
続いて、列名と型の定義をfeature_groupに認識させるのですが、Feature StoreではPandasのobject型を使用できないため、string型に変換させる処理を事前に行います。
その後にload_feature_definitions
にデータフレームを渡すことにより、列名と型が認識されFeature Groupを作成する準備が整います。
def cast_object_to_string(data_frame):
for label in data_frame.columns:
if data_frame.dtypes[label] == 'object':
data_frame[label] = data_frame[label].astype("str").astype("string")
# cast object dtype to string. The SageMaker FeatureStore Python SDK will then map the string dtype to String feature type.
cast_object_to_string(df)
# record identifier and event time feature names
record_identifier_feature_name = 'row_id'
event_time_feature_name = 'event_time'
# load feature definitions to the feature group. SageMaker FeatureStore Python SDK will auto-detect the data schema based on input data.
feature_group.load_feature_definitions(data_frame=df); # output is suppressed
Feature Groupの作成
ここまで完了すればいよいよFeature Groupの作成です。
作成には1分未満程度の時間がかかるため、完了するまで待機する関数が紹介されていました。
def wait_for_feature_group_creation_complete(feature_group):
status = feature_group.describe().get('FeatureGroupStatus')
while status == 'Creating':
print('Waiting for Feature Group Creation')
time.sleep(5)
status = feature_group.describe().get('FeatureGroupStatus')
if status != 'Created':
raise RuntimeError(f'Failed to create feature group {feature_group.name}')
print(f'FeatureGroup {feature_group.name} successfully created.')
feature_group.create(
s3_uri=f's3://{default_s3_bucket_name}/{prefix}',
record_identifier_name=record_identifier_feature_name,
event_time_feature_name=event_time_feature_name,
role_arn=role,
enable_online_store=True
)
wait_for_feature_group_creation_complete(feature_group=feature_group)
Waiting for Feature Group Creation
Waiting for Feature Group Creation
Waiting for Feature Group Creation
Waiting for Feature Group Creation
FeatureGroup feature-group-19-03-53-17 successfully created.
FeatureGroup feature-group-xxxxx successfully created.
と表示されたら無事Feature Groupの作成完了です。
データの投入
作成したFeature Groupにデータを投入してみましょう。feature_group
のingest関数を使います。
feature_group.ingest(
data_frame=df, max_workers=3, wait=True
)
以上でFeature Storeへのデータの登録は完了です。
データの取得
最後にFeature Storeからデータを取得してみましょう。Athenaに対してクエリを投げる形で取得できます。
query = feature_group.athena_query()
table = query.table_name
query_string = f'SELECT * FROM "{table}"'
print('Running ' + query_string)
# run Athena query. The output is loaded to a Pandas dataframe.
dataset = pd.DataFrame()
query.run(query_string=query_string, output_location=f's3://{default_s3_bucket_name}/{prefix}/query_results/')
query.wait()
dataset = query.as_dataframe()
dataset
Running SELECT * FROM "feature-group-19-03-53-17-1608350000"
row_id passengerid survived pclass name sex age sibsp parch ticket fare cabin embarked event_time write_time api_invocation_time is_deleted
0 0 1 0 3 Braund, Mr. Owen Harris male 22.0 1 0 A/5 21171 7.25 NaN S 1.608350e+09 2020-12-19 03:59:17.165 2020-12-19 03:53:41.000 False
1 595 596 0 3 Van Impe, Mr. Jean Baptiste male 36.0 1 1 345773 24.15 NaN S 1.608350e+09 2020-12-19 03:59:17.165 2020-12-19 03:53:42.000 False
2 4 5 0 3 Allen, Mr. William Henry male 35.0 0 0 373450 8.05 NaN S 1.608350e+09 2020-12-19 03:59:17.165 2020-12-19 03:53:42.000 False
3 3 4 1 1 Futrelle, Mrs. Jacques Heath (Lily May Peel) female 35.0 1 0 113803 53.10 C123 S 1.608350e+09 2020-12-19 03:59:16.080 2020-12-19 03:53:42.000 False
4 594 595 0 2 Chapman, Mr. John Henry male 37.0 1 0 SC/AH 29037 26.00 NaN S 1.608350e+09 2020-12-19 03:58:48.791 2020-12-19 03:53:41.000 False
無事にデータの取得ができました!
結び
チームメンバーの各々の環境で別々に特徴量を作成していると、厳密な定義がずれてきてしまうことがあります。
Feature Storeを特徴量のリポジトリとして利用することにより、メンバー間で同一の特徴量を使用している状況が担保できそうです。
明日は@shotawsさんです!
Discussion