AWS SDK for Pandas(AWS Data Wrangler)でAthenaにクエリを投げる
はじめに
動画配信サービスmillvi (ミルビィ)で主にバックエンドエンジニアとして業務をしている片山です。
前回私が投稿した記事【Pyathena + SQLAlchemy】PythonでAmazon Athenaをオブジェクト指向的に扱いたいでは、SQLAlchemyでAmazon Athenaにクエリを発行する方法を説明しました。
今回の記事では別のパッケージであるAWS SDK for Pandas(AWS Data Wrangler)を使い、クエリを発行する方法を説明しつつ、管理が楽になる設定についても説明します。
AWS SDK for pandasとは
読んで字の如く、AWSのリソースをpandasで扱うためのPythonパッケージです。以前はAWS Data Wranglerと呼ばれていましたが、2022年に名称変更しました。
便利な点
AWS SDK for pandasはAthenaのクエリ結果を手軽にプログラムで利用できる点が便利です。
Athenaでクエリを実行する際は、まず出力先のS3バケットを指定する必要があります。クエリを実行すると、そのバケットにcsvファイルが生成されますが、ファイル名はユーザー側で指定できません。また、boto3でのクエリ実行処理は開始処理であり、csvファイルの生成を待ちません。
つまり、boto3を使ってAthenaを操作する場合は非同期的に処理できる実装を行うか、クエリの完了を待つ処理を挟む必要があります。具体的には、出力先のバケットにイベント通知を作成し、csvファイルの生成をトリガーにして後続の処理を行う必要があります。このように、boto3からAthenaを操作すると、単純にデータを取得するだけでも若干の手間が伴います。
一方で、AWS SDK for pandasは出力先バケットの指定は必須ではありません。また、クエリを実行すると同期的に関数の返り値としてクエリ結果を取得できます。そのため、データ取得を1つのプログラムの中で完結させることが可能となり、実装上の手間が省けます。
使い方
実装例
クエリの投げ方を説明します。テーブルは前回の記事のものを使用します。このデータはNOAA(アメリカ海洋大気庁)が提供する世界各地の気象データであり、無料で利用できます。(参考)実行環境はAWS Lambdaとします。
count | id | date | element | value | m-flag | q-flag | s-flag | obs-time |
---|---|---|---|---|---|---|---|---|
6106305 | US1LAAV0001 | 20210305 | PRCP | 0 | N | 0700 | ||
6106306 | US1LAAV0001 | 20210305 | SNOW | 0 | N | 0700 | ||
6106307 | US1LABG0002 | 20210305 | PRCP | 0 | N | 0800 | ||
6106308 | US1LABG0002 | 20210305 | SNOW | 0 | N | 0800 | ||
6106309 | US1LABG0003 | 20210305 | PRCP | 0 | N | 0530 | ||
6106310 | US1LABG0003 | 20210305 | SNOW | 0 | N | 0530 | ||
6106311 | US1LABS0001 | 20210305 | PRCP | 0 | N | 0730 | ||
6106312 | US1LABS0001 | 20210305 | SNOW | 0 | N | 0730 | ||
6106313 | US1LABS0008 | 20210305 | PRCP | 0 | N | 0700 | ||
6106314 | US1LABS0008 | 20210305 | SNOW | 0 | N | 0700 |
AWS SDK for pandasのパッケージとしての名称はawswrangler
です。boto3などとは違いデフォルト状態ではimportできないのでレイヤーが必要です。AWS SDK for pandasのレイヤーはAWS側で提供されるため、こちらで用意することなく簡単に試すことができます。
ただし、プロダクトで使う場合など、バージョンを固定したいという要件がある場合は自前でアップロードしたレイヤーを使うことになると思います。一般的な手法でmkdir python;pip install awswrangler -t python
と実行すると、python
フォルダのサイズがレイヤーの上限である250MBを超えてしまいます。そのため、aws-sdk-pandas
のリリースページにて公開されている、Lambdaレイヤー用に軽量化されたzipファイルを利用します。参考
クエリ発行はawswrangler.athena.read_sql_query
関数で行います。前回の記事と同様に、観測地点が日本で、2021年1月に測定された平均気温
という条件にマッチするレコードを抽出するコードは以下のようになります。
import awswrangler as wr
def lambda_handler(event, context):
query = """
SELECT
"count",
"id",
"date",
"element",
"value",
"m-flag",
"q-flag",
"s-flag",
"obs-time"
FROM {table-name}
WHERE
id LIKE 'JA%'
AND element = 'TAVG'
AND "date" >= '20210101'
AND "date" < '20210201'
ORDER BY date ASC, id
;
"""
df = wr.athena.read_sql_query(query, database={database-name})
クエリ文字列とデータベース名を指定するだけで、簡単にクエリ結果がDataFrame形式で取得できます。
Lambda関数にはAmazonAthenaFullAccess
とクエリ対象のファイルが入ったバケットへのアクセスを許可するポリシーを含んだロールをアタッチします。
詳細説明
wr.athena.read_sql_query
の引数にはAthenaの結果の出力先は明記しませんが、デフォルトではaws-athena-query-results-{aws-account-id}-{region}
という名称のバケットに結果が出力されます。マネージドポリシーのAmazonAthenaFullAccess
にはaws-athena-query-results-*
という名称のバケットへの操作が許容されているため、明示的に許可を与えずとも作成されます。
バケットの中にはtemp_table_xxxxx
(xxxxxはランダム文字列)というパスが存在し、その中にはクエリ結果のバイナリファイルが存在します。このファイルをpandasで読み込み、プログラム内で取得しています。
一方で、Athenaのクエリログを確認すると、以下のクエリが実行されたことがわかります。
CREATE TABLE {database-name}."temp_table_xxxxx"
WITH(
external_location = 's3://aws-athena-query-results-{aws-account-id}-{region}/temp_table_xxxxx',
format = 'PARQUET')
AS
SELECT
"count",
"id",
"date",
"element",
"value",
"m-flag",
"q-flag",
"s-flag",
"obs-time"
FROM {table-name}
WHERE
id LIKE 'JA%'
AND element = 'TAVG'
AND "date" >= '20210101'
AND "date" < '20210201'
ORDER BY date ASC, id
このクエリではテーブル定義をGlue上に作成し、その内容を's3://aws-athena-query-results-{aws-account-id}-{region}/
にParquet形式で保存しています。Parquetは列指向のファイル形式であるため、同じく列指向でデータを保持しているpandasと相性が良い形式です。なお、外部テーブルを作成し、その内容を利用するこの方法は、Create Table AS
の各文字をとってCTASアプローチと呼ばれます。
その他のオプション
CTASアプローチではCREATE TABLE
の後Glue上のテーブル定義を削除しますが、処理が途中で中断された場合、テーブル定義にtemp_table_xxxxx
がゴミとして残存してしまいます。また、csv形式のクエリ結果を同時に欲しい場合もあります。そのような場合にはread_sql_query
の引数にctas_approach=False
を設定することでCREATE TABLE
を実行せず、csv形式で保存できます。
また、temp_table_xxxxx
ファイルは生成された後削除されないため、実行の度に雪だるま式にファイルサイズが膨れ上がっていきます。read_sql_query
の引数にkeep_files=False
を指定することで、クエリ実行後にParquetファイルやcsvファイルを自動で削除できます。
ただし、これを実行するにはAmazonAthenaFullAccess
で与えられた操作権限の他にaws-athena-query-results-{aws-account-id}-{region}
内のオブジェクト削除権限をアタッチする必要があります。
{
"Sid": "NotKeepFilesAWSWrangler",
"Effect": "Allow",
"Action": "s3:DeleteObject",
"Resource": "arn:aws:s3:::aws-athena-query-results-{aws-account-id}-{region}/*"
}
まとめ
AWS SDK for pandasの説明と便利なオプションについて解説しました。同一プログラムで同期的にAthenaの結果を取得できるのは、簡単な集計機能を作成する際には非常に楽だと思います。また、集計結果を保存するバケットを明示的に指定する必要がない点も便利です。しかし、そのバケットに生成したファイルが自動的には削除されないという点は注意する必要があります。
最後に
エビリーのプロダクトや組織について詳しく知りたい方は、気軽にご相談ください。
Discussion