Amazon AthenaでPartitionしたParquetファイルを読み込む
AWSのETLパイプラインのお勉強ということで、S3->Athena->QuickSight構成を試そうとして、ガッツリAthenaにハマったので覚え書きです。業務時間の合間をぬって1.5日くらいかかってしまいました。
またハマるのも嫌なので、備忘録としてまとめておきます。
やりたいこと
- あるサービスの販売商品のランキングをダッシュボードに表示したい
- データ量がそれなりになっても対応できるように、parquetでデータは格納する。
- purquetデータはSageMaker notebookからS3に格納する。
ハマったこと
- データをpurquetで保存する際に、パーティションを切っていなかった
- パーティション切ろうと思ったら、AthenaのCREATE TABLE文作りに手間取り、なかなかSELECTが通らなかった
- 「テーブルに対してクエリを実行するときにゼロ個のレコードが返される」として公式も取り上げてるエラーにはまりました
動作確認済みの手順
以下の手順では、適切なIAMの権限がついている前提で話を進めます。
1/データ準備
ざっくりこんなデータを用意します。中身は適当です。
今回は検証用ということで2万行程度のデータです。これをPythonのDataFrame形式で作成しておきます。今回は便宜上df
として扱います。
id | created_at | price | product_id | product_name | ・・・ | yyyy | mm |
---|---|---|---|---|---|---|---|
0001 | 2021-07-24 21:54:35 | 1500 | AA001 | 商品1 | ・・・ | 2021 | 07 |
0002 | 2021-07-24 22:01:21 | 2200 | AA002 | 商品2 | ・・・ | 2021 | 07 |
yyyy
とmm
はcreated_at
から作ったパーティション用のカラムです。保存するときとAthenaでCREATE TABLEする時に使います(*1)。
作成したデータをS3に保存します。
この時、amazon wranglerを使って、parquetかつパーティションを指定して保存します(*2)。
実行すると、Hive形式で保存されます(*3)。
詳しくは公式ドキュメントを確認してください。
import awswrangler as wr
wr.s3.to_parquet(
df=df, #保存したいDataFrame
path='s3://{bucket_name}/{prefix}', #ご自身の環境に合わせてパス指定してください
dataset=True,
partition_cols=['year', 'month'] #パーティションするカラムを指定。複数指定できます。
)
2/AthenaでCREATE TABLE
- (初めての場合)ログの出力先の設定する
- まずCREATE DATABASEする
- CREATE TABLEする
- データをパーティションにロードする
- クエリで確認
一番のハマりポイントは「CREATE TABLEする」ことでした。
- なぜか世の中にparquetのデータをロードしている例が少ない。CSVが多い。
- Athenaの料金体系的にもparquetでやるのがよいのでは?と思っていたので、結構苦戦した
- 公式ドキュメントも包括的な情報に行き当たれず。。。
- PARTITIONED BYやTBLPROPATIESあたりの情報がまとまってない(やりたいことに合致していないとも言う)
DDL文を作る
結果から言うと、ネットの記事をつぎはぎしてわけわからなくなったので、1度AWS Glueでクローラーを作り、テーブルを作成した上で、そのDDL文を利用するのがよいです。
このツイートがとてもよく状況を表していました(拝借させていただきました)
DDL文作成の手順
- AWS Glue -> クローラ -> クローラの追加(設定をぽちぽち) -> クローラを実行
- AWS Glue -> データカタログ|データベース|テーブル -> 1.のクローラでテーブルが作成できていることを確認
- Athenaでデータベースを指定。テーブル一覧に2.と同じテーブルがあることを確認したら、テーブル名右の「…」 -> 「Generate Create Table DDL」
- DDL文にパーティションを指定するPARTITIONED BY句を指定
最終的にはこんな感じになります。上記の手順で進めるとTABLEPROPATIESにGlueのプロパティなどが大量ついてくるのですが、ほぼなくても動きます。
CREATE EXTERNAL TABLE `test_table`(
`id` string,
`created_at` timestamp,
`price` int,
`product_id` string,
`product_name` string,
...
)
PARTITIONED BY (
`year` string,
`month` string)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
's3://{bucket_name}/{prefix}'
TBLPROPERTIES (
'classification'='parquet',
'compressionType'='none',
'objectCount'='1',
'typeOfData'='file')
注意点
- PARTITIONED BY(*4)
- パーティションに指定したカラムは、CREATEしないことに注意してください
- PARTTIONED BY()で指定したカラムは、データ型も指定しないとエラーになります
- LOCATIONはディレクトリを指定してください
- ディレクトリ直下にはHive形式になったファイル以外は置かないようにする。
- 別のファイル(例えばcsvとか)置いてあると、読み込めないエラーが出ます
-
ROW FORMAT SERDE
、STORED AS INPUTFORMAT
、STORED AS INPUTFORMAT
のあたりがネットで探してもふんわりしていました。- 一応あるんですけど、これって感じにはならなかったです。なのでGlueで通ったDDLを利用するリバース方式を取りました。このリバース方式をとっている方がtwitterでたくさん観測されました。
3/パーティションを有効にする
また、テーブルを作っただけではパーティションが読み込まれるわけではなく、パーティションにロードする(パーティションを認識させてあげる)必要があります。
CREATE TABLEした後に、別のクエリで
MSCK REPAIR TABLE <テーブル名>;
してください。
うまくいけばSELECT文で確認できます。もしくは、Glue crawlerを設定していれば、データカタログのデータベース->テーブルから「パーティションの表示」で確認できるようになります。
これでSELECTしてテーブルがスキャンできれば成功です!
SELECTに失敗したら
AWS公式のこちらをみるとヒントがあるかもしれません。健闘を祈ります。
また、Glueで通っていたDDLを参考にしているはずなのに、、、と言う場合は、Glue上で通っているテーブルと失敗しているテーブルを見比べると差分に気づくかもしれません。自分はこれで、失敗しているテーブルはパーティションが有効になっていないことに気づき、解決できました。
最後に
今回はデータが数MBと小さいので問題ないのですが、GB〜になってくるとパフォーマンスチューニングした方が良さそうだと思っています。
とりあえず今回はAthenaにテーブルを登録することがゴールだったので終わりにします!
注釈と付録
*1 以下のコードで簡単に作れます
# created_atが yyyy/mm/dd HH:MM:SSとする
df['year'] = df['created_at'].apply(lambda x : x.strftime('%Y-%m-%d')[0:4])
df['month'] = df['created_at'].apply(lambda x : x.strftime('%Y-%m-%d')[5:7])
*2 parquetなので相当早く実行できます。圧縮方式は特に指定していないので、デフォルトのsnappy
になっていると思います。
*3 Hive形式 このあたりを読んでイメージを掴んでもらえればと
*4 Athenaのパーティションについて役に立った記事
Discussion