AWS AthenaのPartitionとBucketingを解説
背景
AWSを用いたデータサイエンス環境として、S3にデータを置きAthenaでクエリというのは鉄板パターンの1つになりつつあると感じます。やはり、数あるAWSのデータ関連サービスの中でストレージ単価が最も安いのはS3ですから。
また、parquet形式のデータにも対応しているので速度の面でも魅力的で、インデックスを張り巡らせたRDSよりも高速なクエリを達成するポテンシャルがあります。ただそのためには、Athenaの性能を最大限に引き出すように、データを正しく分割してS3に置くことが大切です。
Athenaにおけるデータの分割
データの分割がクエリの性能に直結することは明確です。クエリのwhere条件が特定の分割先を指しているようなら、その範囲のデータだけをスキャンすればよいからです。よって、分割に使うキーはwhere条件でよく使うことが想定されるキーとすべきです。
Athenaでは分割の仕方としてPartitionとBucketingが用意されています。Amazon Athena のパフォーマンスチューニング Tips トップ 10でも、PartitionとBucketingが1番目と2番目で紹介されているように、パフォーマンスチューニングにおいて真っ先に考慮すべき事項であることがわかります。
Partitionは比較的わかりやすいです。一方でBucketingはとっつきにくいところがあります。Paritionはその名の通り分割手段の主役である一方、Bucketingは補助的な分割手段という位置づけのようです。そのためかGlueのテーブル設定画面を見ても、Partitionに関する情報はすぐわかるところに表示がある一方で、Bucketingに関する情報は明示的に表示されません。
aws-data-wrangler
S3+Athenaの設計は、AWSのGUIコンソールからシコシコ書いてもできるとは思いますが、Pythonコードとしてテーブルを定義できる(つまりIaS; infrastructure as codeができる)aws-data-wranglerという素敵なPythonモジュールがあるので活用します。
インストール方法:
pip install awswrangler
aws-data-wranglerでは、Pandas DataFrameでテーブルを表しAWSに転送するのが基本パターンです。ここでは使い方を解説しませんが、上記リンクから辿れる公式チュートリアルはわかりやすいと思います。また、本記事の範囲ではチュートリアルを見なくても雰囲気は伝わるかと思います。
執筆時点でawsranglerのバージョンは2.13.0でしたが、Bucketingにバグがあるようです。一時的にモジュールの中を書き換えるなどして、https://github.com/awslabs/aws-data-wrangler/issues/1061 に記載の修正をしてください。
例題
上述のように、データ分割戦略は想定されるクエリ次第ですから、題材とする例を決めます。
(題材) ある小売チェーンのエリアマネージャーは、エリア内店舗の日々の売上を記録することにしました。
import pandas as pd
from datetime import date
sale = pd.DataFrame(
columns=["t", "shop", "volume"],
data=[
[date(2021, 12, 1), "新宿店", 24],
[date(2021, 12, 1), "渋谷店", 31],
[date(2021, 12, 1), "池袋店", 18],
[date(2021, 12, 2), "新宿店", 42],
[date(2021, 12, 2), "渋谷店", 45],
[date(2021, 12, 2), "池袋店", 26]
])
エリアマネージャーは、日々のルーチンとして特定の日の売上一覧を見ます。
select * from sale where t = date('2021-12-02')
また、以下のように特定の店舗の売上推移を観察することもあります。
select * from sale where shop = '渋谷店'
日付と店舗の組み合わせで絞ることも考えられます。
select * from sale where t = date('2021-12-02') and shop = '渋谷店'
分割戦略
このように想定クエリを洗い出してみると、このデータはカラムt
とshop
で分割すべきということがわかります。ではどちらのキーでPartitionを、どちらのキーでBucketingをするのがよいでしょうか?
答えは、t
でPartition、shop
でBucketingです。以降はその理由を説明していきます。
まず、S3へのデータの置かれ方ですが、以下のようにPartitionに従ってフォルダが切られ、その中でBucketingによって振り分けられるように個別ファイルが作られます。Paritionがメインの分割手段であり、Bucketingは補助的な分割手段という位置づけであることがわかります。
S3://top-bucket-name/path/to/sale
└───partition=p1
│ bucket0.parquet
│ bucket1.parquet
│ bucket2.parquet
│ ...
└───partition=p2
│ bucket0.parquet
│ bucket1.parquet
│ bucket2.parquet
│ ...
└───...
│ ...
注意: S3のバケットと本記事で述べているBucketingは全く無関係な概念です。
Partition
このように、BucketingよりPartitionのほうが上位の分割ですから、最も頻繁にクエリ条件にするキーが妥当で、今回の例では日付のt
が該当します。また、日付毎にフォルダが増えていくのが自然でメンテナンスもしやすいです。逆にそうしないと、一日溜まったデータを書き込む際に、あちこちのサブフォルダにデータを書き込みに行くことになるので非効率です。日付のように、後から発生したり無くなったりしない値がParitionキーとして好ましいです。
この例に限らず、多くの場合Partitionは日付が妥当だと思います。日単位か月単位はデータの生成速度によって様々だとは思いますが。このような典型的なPartitionは、"Partition Projection"が適用できます。
Partition Projection
パーティションの隠れたデメリットとして、どのデータがどのパーティションにあるのかをAthenaが逐一導かなければならず、それがパフォーマンスのオーバーヘッドになります。しかし、日付パーティションのような典型的なパーティションは、サブフォルダの命名を所定のルールに従えば、Athenaは効率的にデータの場所を特定できるという仕組みがあり、これをPartition Projectionと言います。つまり上述のデメリットはなくなります。aws-data-wranglerを使用すれば、命名ルールは別に覚える必要なく、アップロード呼び出し時の引数projection_enabled=True
としてPartition Projectionを使うことを明示するだけです。
Partitionしたデータを置くコード
import awswrangler as wr
import boto3
wr.s3.to_parquet(
sale,
path="s3://path-to-s3-folder/sale/",
database="the-database-defined-in-gule",
dataset=True,
table="sale",
partition_cols=["t"], # Paritionキーを指定
projection_enabled=True, # Partion projectionを有効化
projection_types={"t": "date"}, # tは日付型であることを伝える
projection_ranges={"t": "2021-12-01,NOW"}, # 日付の取りうる範囲
)
S3://top-bucket-name/path/to/sale
└───2021-12-01
│ bucket0.parquet
│ bucket1.parquet
│ bucket2.parquet
│ ...
└───2021-12-02
│ bucket0.parquet
│ bucket1.parquet
│ bucket2.parquet
│ ...
└───...
│ ...
Bucketing
日付t
に次いでクエリのwhere条件にすることが多いshop
をBukcetingのキーとします。
ここで注意すべきなのは、bucket0.parquetに新宿店、bucket1.parquetに渋谷店、bucket2.parquetに池袋店のデータが入るとは限らないということです。
Bucketingにおいて我々が制御できるのは、どのくらいデータを分散させたいか、つまり何個の個別ファイルに分けるかだけです。なぜそうなっているのか理解するために、Bucketingの実装を理解したほうが早いので説明します。
Bukcetingのロジック
"新宿店", "渋谷店", "池袋店"という文字列のハッシュ値を求めます。
例えば、"新宿店"=41269, "渋谷店"=43154, "池袋店"=61073 だったとします (ハッシュ値は架空です)。
ハッシュ値を指定のバケット数で割った余りを求めます。例えばバケット数=3として計算してみると
"新宿店": 41269 % 3 = 1
"渋谷店": 43154 % 3 = 2
"池袋店": 61073 % 3 = 2
となります。この剰余の値が割り当てられるバケット番号です。つまり、新宿店のデータはバケット1に入れられます。渋谷店と池袋店のデータは両方とも2に割り当てられる一方で、バケット0には何も入りません。
今回、shop
の取りうる値が3だけだったので不均等なバケット割当になってしまいましたが、もっとたくさん店舗があれば確率的に均等にバケットに割り当てられることでしょう。
ただ、データの性質によっては、量が増えても偏りがある場合もあります。例えば、お店エリアのもっと大きい括りとして関東地方、東北地方、東海地方、関西地方・・・というカラムを追加するとしましょう。大抵の場合、関東地方の店舗数が他を圧倒することになるので、このカラムの値は著しく偏りがあり、Bukectingのキーには向いていないと言えます。
PartitionとBucketingしたデータを置くコード
さて、PartitionとともにBucketingも適用したコードを示します。そしてこれが本解説の完成形のコードです。
import awswrangler as wr
import boto3
wr.s3.to_parquet(
sale,
path="s3://path-to-s3-folder/sale/",
database="the-database-defined-in-gule",
dataset=True,
table="sale",
partition_cols=["t"], # Paritionキーを指定
projection_enabled=True, # Partion projectionを有効化
projection_types={"t": "date"}, # tは日付型であることを伝える
projection_ranges={"t": "2021-12-01,NOW"}, # 日付の取りうる範囲
bucketing_info=("shop", n_buckets) # Bucketingを有効するための引数。n_bucketsはバケット数
)
適切なバケット数
未調査です。残念ながらわかりません。
まとめ
- PartitionとBuketingに用いるキーは、クエリにおいて条件としてよく現れるものにすべきである。
- ただし、データの分布や性質によってPartitionキーやBuketingのキーに向かないこともある。
- その向き不向きは、本解説を読めばわかると思います。
Discussion