📈

AWS SDK for Pandas(AWS Data Wrangler)でAthenaにクエリを投げる

2023/03/01に公開

はじめに

動画配信サービスmillvi (ミルビィ)で主にバックエンドエンジニアとして業務をしている片山です。

前回私が投稿した記事【Pyathena + SQLAlchemy】PythonでAmazon Athenaをオブジェクト指向的に扱いたいでは、SQLAlchemyでAmazon Athenaにクエリを発行する方法を説明しました。

今回の記事では別のパッケージであるAWS SDK for Pandas(AWS Data Wrangler)を使い、クエリを発行する方法を説明しつつ、管理が楽になる設定についても説明します。

AWS SDK for pandasとは

読んで字の如く、AWSのリソースをpandasで扱うためのPythonパッケージです。以前はAWS Data Wranglerと呼ばれていましたが、2022年に名称変更しました。

https://twitter.com/AWSOpen/status/1564613913416704012

便利な点

AWS SDK for pandasはAthenaのクエリ結果を手軽にプログラムで利用できる点が便利です。

Athenaでクエリを実行する際は、まず出力先のS3バケットを指定する必要があります。クエリを実行すると、そのバケットにcsvファイルが生成されますが、ファイル名はユーザー側で指定できません。また、boto3でのクエリ実行処理は開始処理であり、csvファイルの生成を待ちません。

つまり、boto3を使ってAthenaを操作する場合は非同期的に処理できる実装を行うか、クエリの完了を待つ処理を挟む必要があります。具体的には、出力先のバケットにイベント通知を作成し、csvファイルの生成をトリガーにして後続の処理を行う必要があります。このように、boto3からAthenaを操作すると、単純にデータを取得するだけでも若干の手間が伴います。

一方で、AWS SDK for pandasは出力先バケットの指定は必須ではありません。また、クエリを実行すると同期的に関数の返り値としてクエリ結果を取得できます。そのため、データ取得を1つのプログラムの中で完結させることが可能となり、実装上の手間が省けます。

使い方

実装例

クエリの投げ方を説明します。テーブルは前回の記事のものを使用します。このデータはNOAA(アメリカ海洋大気庁)が提供する世界各地の気象データであり、無料で利用できます。(参考)実行環境はAWS Lambdaとします。

count id date element value m-flag q-flag s-flag obs-time
6106305 US1LAAV0001 20210305 PRCP 0 N 0700
6106306 US1LAAV0001 20210305 SNOW 0 N 0700
6106307 US1LABG0002 20210305 PRCP 0 N 0800
6106308 US1LABG0002 20210305 SNOW 0 N 0800
6106309 US1LABG0003 20210305 PRCP 0 N 0530
6106310 US1LABG0003 20210305 SNOW 0 N 0530
6106311 US1LABS0001 20210305 PRCP 0 N 0730
6106312 US1LABS0001 20210305 SNOW 0 N 0730
6106313 US1LABS0008 20210305 PRCP 0 N 0700
6106314 US1LABS0008 20210305 SNOW 0 N 0700

AWS SDK for pandasのパッケージとしての名称はawswranglerです。boto3などとは違いデフォルト状態ではimportできないのでレイヤーが必要です。AWS SDK for pandasのレイヤーはAWS側で提供されるため、こちらで用意することなく簡単に試すことができます。

ただし、プロダクトで使う場合など、バージョンを固定したいという要件がある場合は自前でアップロードしたレイヤーを使うことになると思います。一般的な手法でmkdir python;pip install awswrangler -t pythonと実行すると、pythonフォルダのサイズがレイヤーの上限である250MBを超えてしまいます。そのため、aws-sdk-pandasのリリースページにて公開されている、Lambdaレイヤー用に軽量化されたzipファイルを利用します。参考

クエリ発行はawswrangler.athena.read_sql_query関数で行います。前回の記事と同様に、観測地点が日本で、2021年1月に測定された平均気温という条件にマッチするレコードを抽出するコードは以下のようになります。

lambda_function.py
import awswrangler as wr
def lambda_handler(event, context):
    query = """
        SELECT 
          "count",
          "id",
          "date",
          "element",
          "value",
          "m-flag",
          "q-flag",
          "s-flag",
          "obs-time" 
        FROM {table-name}
        WHERE
          id LIKE 'JA%'
          AND element = 'TAVG'
          AND "date" >= '20210101'
          AND "date" < '20210201'
        ORDER BY date ASC, id
        ;
    """
    df = wr.athena.read_sql_query(query, database={database-name})

クエリ文字列とデータベース名を指定するだけで、簡単にクエリ結果がDataFrame形式で取得できます。
Lambda関数にはAmazonAthenaFullAccessとクエリ対象のファイルが入ったバケットへのアクセスを許可するポリシーを含んだロールをアタッチします。

詳細説明

wr.athena.read_sql_queryの引数にはAthenaの結果の出力先は明記しませんが、デフォルトではaws-athena-query-results-{aws-account-id}-{region}という名称のバケットに結果が出力されます。マネージドポリシーのAmazonAthenaFullAccessにはaws-athena-query-results-*という名称のバケットへの操作が許容されているため、明示的に許可を与えずとも作成されます。
バケットの中にはtemp_table_xxxxx(xxxxxはランダム文字列)というパスが存在し、その中にはクエリ結果のバイナリファイルが存在します。このファイルをpandasで読み込み、プログラム内で取得しています。

一方で、Athenaのクエリログを確認すると、以下のクエリが実行されたことがわかります。

CREATE TABLE {database-name}."temp_table_xxxxx"
WITH(
    external_location = 's3://aws-athena-query-results-{aws-account-id}-{region}/temp_table_xxxxx',
    format = 'PARQUET')
AS 
SELECT 
  "count",
  "id",
  "date",
  "element",
  "value",
  "m-flag",
  "q-flag",
  "s-flag",
  "obs-time" 
FROM {table-name}
WHERE
  id LIKE 'JA%'
  AND element = 'TAVG'
  AND "date" >= '20210101'
  AND "date" < '20210201'
ORDER BY date ASC, id

このクエリではテーブル定義をGlue上に作成し、その内容を's3://aws-athena-query-results-{aws-account-id}-{region}/にParquet形式で保存しています。Parquetは列指向のファイル形式であるため、同じく列指向でデータを保持しているpandasと相性が良い形式です。なお、外部テーブルを作成し、その内容を利用するこの方法は、Create Table ASの各文字をとってCTASアプローチと呼ばれます。

その他のオプション

CTASアプローチではCREATE TABLEの後Glue上のテーブル定義を削除しますが、処理が途中で中断された場合、テーブル定義にtemp_table_xxxxxがゴミとして残存してしまいます。また、csv形式のクエリ結果を同時に欲しい場合もあります。そのような場合にはread_sql_queryの引数にctas_approach=Falseを設定することでCREATE TABLEを実行せず、csv形式で保存できます。

また、temp_table_xxxxxファイルは生成された後削除されないため、実行の度に雪だるま式にファイルサイズが膨れ上がっていきます。read_sql_queryの引数にkeep_files=Falseを指定することで、クエリ実行後にParquetファイルやcsvファイルを自動で削除できます。

ただし、これを実行するにはAmazonAthenaFullAccessで与えられた操作権限の他にaws-athena-query-results-{aws-account-id}-{region}内のオブジェクト削除権限をアタッチする必要があります。

{
    "Sid": "NotKeepFilesAWSWrangler",
    "Effect": "Allow",
    "Action": "s3:DeleteObject",
    "Resource": "arn:aws:s3:::aws-athena-query-results-{aws-account-id}-{region}/*"
}

まとめ

AWS SDK for pandasの説明と便利なオプションについて解説しました。同一プログラムで同期的にAthenaの結果を取得できるのは、簡単な集計機能を作成する際には非常に楽だと思います。また、集計結果を保存するバケットを明示的に指定する必要がない点も便利です。しかし、そのバケットに生成したファイルが自動的には削除されないという点は注意する必要があります。

最後に

エビリーのプロダクトや組織について詳しく知りたい方は、気軽にご相談ください。
https://www.wantedly.com/companies/eviry
https://recruit.eviry.com/

Discussion