🗂

【Pyathena + SQLAlchemy】PythonでAmazon Athenaをオブジェクト指向的に扱いたい

2022/09/29に公開

はじめに

動画配信サービスmillviで主にバックエンドエンジニアとして業務をしている片山です。

Amazon AthenaはS3に保存されたCSVファイルなどをSQLクエリで分析するサービスです。現在開発中のmillvi新バージョンではサービスの利用量集計にAthenaを利用しています。

集計用コード内でAthenaを利用したい場合、boto3等のSDKを使うことになると思います。具体的には、関数(boto3の場合はstart_query_execution)の引数にクエリ文字列を直接指定し、クエリを投げます。ただし、これには

  • 構文ミスがクエリを実行する時まで検知されない
  • SQLクエリは単なる文字列として扱われるので、IDEの補完機能を活用できない
  • クエリ結果は指定された場所にランダムなファイル名のcsvファイルとして保存されるので、結果を後処理し整形するといった作業が煩雑になる
    (複雑な後処理であっても1つのクエリで完了させる必要がある)

などの問題があります。そこで、この記事ではboto3をラップしたPyAthenaを利用します。

PyAthenaはPythonのDB API2.0に準拠したAmazon Athena用のクライアントモジュールです。

https://pypi.org/project/pyathena/

pandasやdaskのDataFrameと連携させて利用する使用例も多く見受けられますが、[1]
今回はPythonのORMライブラリSQLAlchemyと連携して利用します。

今回の検証で使用するデータ

こちらのNOAA(アメリカ海洋大気庁)が提供する世界各地の気象データを用いました。

今回は年ごとに提供された気象データのうち、2021年のデータで検証します。s3://noaa-ghcn-pds/csv/by_year/2021.csvを適当なバケットにコピーして準備完了です。

データの説明

先頭10行は以下のようになっています。

% aws s3api get-object --bucket {test_data_bucket} --key 2021.csv --range bytes=0-10000 /dev/stdout | head
,ID,DATE,ELEMENT,DATA_VALUE,M_FLAG,Q_FLAG,S_FLAG,OBS_TIME
0,AE000041196,20210101,TMAX,278,,,S,
1,AE000041196,20210101,PRCP,0,D,,S,
2,AE000041196,20210101,TAVG,214,H,,S,
3,AEM00041194,20210101,TMAX,266,,,S,
4,AEM00041194,20210101,TMIN,178,,,S,
5,AEM00041194,20210101,PRCP,0,,,S,
6,AEM00041194,20210101,TAVG,217,H,,S,
7,AEM00041217,20210101,TMAX,262,,,S,
8,AEM00041217,20210101,TMIN,155,,,S,

各列の説明はこちらを参照してください。大まかに説明すると、ID列が観測地点IDで、DATEが日付、ELEMENTが測定内容、DATA_VALUEがその値となっています。
注意点としては、気温(TAVG, TMIN, TMAX)や降水量(PRCP)の値はそれぞれ℃、mmの数値を10倍した値が記録されています。

観測地点IDは先頭2文字が国名を表す11桁の文字列です。

テーブル作成

Athenaのコンソールに入り、以下のクエリを実行してテーブルを定義します。

CREATE EXTERNAL TABLE IF NOT EXISTS `{db-name}`.`{table-name}` (
  `count` int,
  `id` string,
  `date` int,
  `element` string,
  `value` int,
  `m_flag` string,
  `q_flag` string,
  `s_flag` string,
  `obs_time` string
)
 ROW FORMAT DELIMITED
      FIELDS TERMINATED BY ','
      ESCAPED BY '\\'
      LINES TERMINATED BY '\n'LOCATION '{test_data_bucket}'
TBLPROPERTIES ('has_encrypted_data' = 'false');

Athena上で{db-name}データベースの{table-name}テーブルを利用できるようになりました。

SQL Alchemyの準備

まずはAthena用のSQLAlchemyエンジンを作成します。この部分は他のDBに接続するときとほぼ同じです。

example_model.py
from sqlalchemy.engine import create_engine

connect_str = "awsathena+rest://athena.{region_name}.amazonaws.com:443/{schema_name}?s3_staging_dir={s3_staging_dir}"

engine = create_engine(connect_str.format(
        region_name='ap-northeast-1',
        schema_name='{db-name}',
        s3_staging_dir='{s3://output-location-name}'
    ), echo=True)

region_nameでAthenaを実行するリージョンを指定し、schema_nameにはDBの名称を、s3_staging_dirにはクエリ結果を書き出す場所をS3URIの形式で指定します。また、echo=Trueを指定することでSQLAlchemyにより生成されたクエリ文字列を標準出力に書き出します。

次にセッション管理用クラスを作成し、ORMの基底クラスとなるBaseクラスに紐づけます。こちらも他のDBでの作業とほぼ同じです。

example_model.py
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.ext.declarative import declarative_base

Session = scoped_session(
    sessionmaker(autocommit=False, autoflush=True, bind=engine)
)
Base = declarative_base()
Base.query = Session.query_property()

モデルの作成

先述のテーブルを記述するモデルを作成します。

ここでは、それに先んじてdateカラム用の型DateTypevalueカラム用の型ValueTypeを作成します。

example_model.py
from sqlalchemy import Column, Integer, String
from sqlalchemy import types
from pyathena.sqlalchemy_athena import AthenaRestDialect

class DateType(types.TypeDecorator):

    impl = Integer
    cache_ok = True
    
    def __init__(self, fmt: str = "%Y%m%d", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.fmt = fmt

    def process_bind_param(self, value: date, dialect: AthenaRestDialect) -> int:
        return int(value.strftime(self.fmt))

    def process_result_value(self, value: int, dialect: AthenaRestDialect) -> date:
        return datetime.strptime(str(value), self.fmt).date()


class ValueType(types.TypeDecorator):

    impl = Integer
    cache_ok = True

    def process_bind_param(self, value: float, dialect: AthenaRestDialect) -> int:
        return int(value * 10)

    def process_result_value(self, value: int, dialect: AthenaRestDialect) -> float:
        return value / 10


class ClimateDataModel(Base):
    __tablename__ = '{table-name}'
    count = Column("count", Integer, primary_key=True)
    id = Column("id", String)
    date = Column("date", DateType)
    element = Column("element", String)
    value = Column("value", ValueType)
    m_flag = Column("m_flag", String)
    q_flag = Column("q_flag", String)
    s_flag = Column("s_flag", String)
    obs_time = Column("obs_time", String)

types.TypeDecoratorを継承したクラスはカラムの型定義に使える型となります。クラス変数implにAthena上の型を指定し、process_bind_paramメソッドにはPython → テーブルの際に行う処理を指定します。反対にprocess_result_valueメソッドにはテーブル → Pythonの際の処理を記述します。

DateTypeでは、Athena上ではYYYYMMDD形式の整数型のデータをPython上ではdatetime.date型で扱えるよう定義しています。ValueTypeではAthena上では10倍された整数型のデータをPython上では元の値で扱えるよう定義しています。

クエリの実行

いよいよクエリの実行です。観測地点が日本で、2021年1月に測定された平均気温のデータを取得する場合を考えます。観測地点IDは先頭2文字が国名を表し、日本の場合はJAです。また、平均気温のデータはelementTAVGのレコードのvalueです。これらを踏まえると以下のように書けます。

example_query.py
from datetime import date
from testmodel import ClimateDataModel

query = ClimateDataModel.query\
  .filter(
    ClimateDataModel.id.like("JA%"),
    ClimateDataModel.element=="TAVG",
    date(2021, 1, 1) <= ClimateDataModel.date,
    ClimateDataModel.date < date(2021, 2, 1))\
  .order_by(ClimateDataModel.date.asc(), ClimateDataModel.id)\
  .limit(100)

for m in query.all():
  print(m.id, m.date, m.value)

各フィールドはClimateDataModelクラスのメンバであるため、エディタの補完機能を使って楽に書けます。また、dateに関してはDateTypeを定義したため、datetime.date型と比較することが可能になります。さらには、allメソッドによりクエリ結果をプログラム内で取得できます。

実行すると以下のようなクエリ文字列を生成します。

SELECT table.count AS {table-name}_count, {table-name}.id AS {table-name}_id, {table-name}.date AS {table-name}_date, {table-name}.element AS {table-name}_element, {table-name}.value AS {table-name}_value, {table-name}.m_flag AS {table-name}_m_flag, {table-name}.q_flag AS {table-name}_q_flag, {table-name}.s_flag AS {table-name}_s_flag, {table-name}.obs_time AS {table-name}_obs_time 
FROM {table-name} 
WHERE {table-name}.id LIKE %(id_1)s AND {table-name}.element = %(element_1)s AND {table-name}.date >= %(date_1)s AND {table-name}.date < %(date_2)s ORDER BY {table-name}.date ASC, {table-name}.id LIMIT 100
INFO sqlalchemy.engine.Engine [generated in 0.00069s] {'id_1': 'JA%', 'element_1': 'TAVG', 'date_1': 20210101, 'date_2': 20210201}

WHERE節でdatetime.date(...)が適切に整数型に変換されていることがわかります。

標準出力は以下のようになります。

JA000047401 2021-01-01 -9.1
JA000047407 2021-01-01 -12.8
JA000047409 2021-01-01 -11.7
JA000047412 2021-01-01 -10.7
JA000047418 2021-01-01 -8.8
JA000047420 2021-01-01 -9.3
JA000047421 2021-01-01 -9.9
JA000047426 2021-01-01 -8.9
JA000047430 2021-01-01 -9.6
JA000047570 2021-01-01 -1.2
[略]

dateがdatetime.date型に変換されたため、年月日の間にハイフンが挟まる形になりました。さらにはvalueもValueTypeの定義により0.1倍されています。

ただし、先述の通りDateTypeValueTypeプログラム内での型変換を定義しているに過ぎないということに留意する必要があります。具体的には、Athenaにより吐き出されるcsv(エンジン作成で指定した{s3://output-location-name}に生成されるcsvファイルです)の内容は

"{table-name}_count","{table-name}_id","{table-name}_date","{table-name}_element","{table-name}_value","{table-name}_m_flag","{table-name}_q_flag","{table-name}_s_flag","{table-name}_obs_time"
"12912","JA000047401","20210101","TAVG","-91","H","","S",""
"12914","JA000047407","20210101","TAVG","-128","H","","S",""
"12917","JA000047409","20210101","TAVG","-117","H","","S",""
[略]

のように、dateとvalue共に変換されていません。クエリ結果を別のプログラムで使う際にはお気をつけください。

まとめ

PyAthenaを使い、Amazon AthenaをORMのインターフェースで操作する方法について説明しました。使用するメリットとしては

  • クエリがpythonコードの一部になるので入力補完の恩恵を受けられる
  • クエリ結果をコード内でそのまま利用できる
  • テーブル定義をコード内に書けるので実装時のミスを減らせる

などがあります。

この記事がご参考になれば幸いです。

脚注
  1. https://qiita.com/nijigen_plot/items/02051f09eee3a2dee2c2 ↩︎

Discussion