【Pyathena + SQLAlchemy】PythonでAmazon Athenaをオブジェクト指向的に扱いたい
はじめに
動画配信サービスmillviで主にバックエンドエンジニアとして業務をしている片山です。
Amazon AthenaはS3に保存されたCSVファイルなどをSQLクエリで分析するサービスです。現在開発中のmillvi新バージョンではサービスの利用量集計にAthenaを利用しています。
集計用コード内でAthenaを利用したい場合、boto3等のSDKを使うことになると思います。具体的には、関数(boto3の場合はstart_query_execution)の引数にクエリ文字列を直接指定し、クエリを投げます。ただし、これには
- 構文ミスがクエリを実行する時まで検知されない
- SQLクエリは単なる文字列として扱われるので、IDEの補完機能を活用できない
- クエリ結果は指定された場所にランダムなファイル名のcsvファイルとして保存されるので、結果を後処理し整形するといった作業が煩雑になる
(複雑な後処理であっても1つのクエリで完了させる必要がある)
などの問題があります。そこで、この記事ではboto3をラップしたPyAthenaを利用します。
PyAthenaはPythonのDB API2.0に準拠したAmazon Athena用のクライアントモジュールです。
pandasやdaskのDataFrameと連携させて利用する使用例も多く見受けられますが、[1]
今回はPythonのORMライブラリSQLAlchemyと連携して利用します。
今回の検証で使用するデータ
こちらのNOAA(アメリカ海洋大気庁)が提供する世界各地の気象データを用いました。
今回は年ごとに提供された気象データのうち、2021年のデータで検証します。s3://noaa-ghcn-pds/csv/by_year/2021.csvを適当なバケットにコピーして準備完了です。
データの説明
先頭10行は以下のようになっています。
% aws s3api get-object --bucket {test_data_bucket} --key 2021.csv --range bytes=0-10000 /dev/stdout | head
,ID,DATE,ELEMENT,DATA_VALUE,M_FLAG,Q_FLAG,S_FLAG,OBS_TIME
0,AE000041196,20210101,TMAX,278,,,S,
1,AE000041196,20210101,PRCP,0,D,,S,
2,AE000041196,20210101,TAVG,214,H,,S,
3,AEM00041194,20210101,TMAX,266,,,S,
4,AEM00041194,20210101,TMIN,178,,,S,
5,AEM00041194,20210101,PRCP,0,,,S,
6,AEM00041194,20210101,TAVG,217,H,,S,
7,AEM00041217,20210101,TMAX,262,,,S,
8,AEM00041217,20210101,TMIN,155,,,S,
各列の説明はこちらを参照してください。大まかに説明すると、ID列が観測地点IDで、DATEが日付、ELEMENTが測定内容、DATA_VALUEがその値となっています。
注意点としては、気温(TAVG, TMIN, TMAX)や降水量(PRCP)の値はそれぞれ℃、mmの数値を10倍した値が記録されています。
観測地点IDは先頭2文字が国名を表す11桁の文字列です。
テーブル作成
Athenaのコンソールに入り、以下のクエリを実行してテーブルを定義します。
CREATE EXTERNAL TABLE IF NOT EXISTS `{db-name}`.`{table-name}` (
`count` int,
`id` string,
`date` int,
`element` string,
`value` int,
`m_flag` string,
`q_flag` string,
`s_flag` string,
`obs_time` string
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
ESCAPED BY '\\'
LINES TERMINATED BY '\n'LOCATION '{test_data_bucket}'
TBLPROPERTIES ('has_encrypted_data' = 'false');
Athena上で{db-name}データベースの{table-name}テーブルを利用できるようになりました。
SQL Alchemyの準備
まずはAthena用のSQLAlchemyエンジンを作成します。この部分は他のDBに接続するときとほぼ同じです。
from sqlalchemy.engine import create_engine
connect_str = "awsathena+rest://athena.{region_name}.amazonaws.com:443/{schema_name}?s3_staging_dir={s3_staging_dir}"
engine = create_engine(connect_str.format(
region_name='ap-northeast-1',
schema_name='{db-name}',
s3_staging_dir='{s3://output-location-name}'
), echo=True)
region_name
でAthenaを実行するリージョンを指定し、schema_name
にはDBの名称を、s3_staging_dir
にはクエリ結果を書き出す場所をS3URIの形式で指定します。また、echo=True
を指定することでSQLAlchemyにより生成されたクエリ文字列を標準出力に書き出します。
次にセッション管理用クラスを作成し、ORMの基底クラスとなるBaseクラスに紐づけます。こちらも他のDBでの作業とほぼ同じです。
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.ext.declarative import declarative_base
Session = scoped_session(
sessionmaker(autocommit=False, autoflush=True, bind=engine)
)
Base = declarative_base()
Base.query = Session.query_property()
モデルの作成
先述のテーブルを記述するモデルを作成します。
ここでは、それに先んじてdate
カラム用の型DateType
とvalue
カラム用の型ValueType
を作成します。
from sqlalchemy import Column, Integer, String
from sqlalchemy import types
from pyathena.sqlalchemy_athena import AthenaRestDialect
class DateType(types.TypeDecorator):
impl = Integer
cache_ok = True
def __init__(self, fmt: str = "%Y%m%d", *args, **kwargs):
super().__init__(*args, **kwargs)
self.fmt = fmt
def process_bind_param(self, value: date, dialect: AthenaRestDialect) -> int:
return int(value.strftime(self.fmt))
def process_result_value(self, value: int, dialect: AthenaRestDialect) -> date:
return datetime.strptime(str(value), self.fmt).date()
class ValueType(types.TypeDecorator):
impl = Integer
cache_ok = True
def process_bind_param(self, value: float, dialect: AthenaRestDialect) -> int:
return int(value * 10)
def process_result_value(self, value: int, dialect: AthenaRestDialect) -> float:
return value / 10
class ClimateDataModel(Base):
__tablename__ = '{table-name}'
count = Column("count", Integer, primary_key=True)
id = Column("id", String)
date = Column("date", DateType)
element = Column("element", String)
value = Column("value", ValueType)
m_flag = Column("m_flag", String)
q_flag = Column("q_flag", String)
s_flag = Column("s_flag", String)
obs_time = Column("obs_time", String)
types.TypeDecorator
を継承したクラスはカラムの型定義に使える型となります。クラス変数impl
にAthena上の型を指定し、process_bind_param
メソッドにはPython → テーブルの際に行う処理を指定します。反対にprocess_result_value
メソッドにはテーブル → Pythonの際の処理を記述します。
DateType
では、Athena上ではYYYYMMDD
形式の整数型のデータをPython上ではdatetime.date
型で扱えるよう定義しています。ValueType
ではAthena上では10倍された整数型のデータをPython上では元の値で扱えるよう定義しています。
クエリの実行
いよいよクエリの実行です。観測地点が日本で、2021年1月に測定された平均気温のデータを取得する場合を考えます。観測地点IDは先頭2文字が国名を表し、日本の場合はJA
です。また、平均気温のデータはelement
がTAVG
のレコードのvalue
です。これらを踏まえると以下のように書けます。
from datetime import date
from testmodel import ClimateDataModel
query = ClimateDataModel.query\
.filter(
ClimateDataModel.id.like("JA%"),
ClimateDataModel.element=="TAVG",
date(2021, 1, 1) <= ClimateDataModel.date,
ClimateDataModel.date < date(2021, 2, 1))\
.order_by(ClimateDataModel.date.asc(), ClimateDataModel.id)\
.limit(100)
for m in query.all():
print(m.id, m.date, m.value)
各フィールドはClimateDataModel
クラスのメンバであるため、エディタの補完機能を使って楽に書けます。また、date
に関してはDateType
を定義したため、datetime.date
型と比較することが可能になります。さらには、all
メソッドによりクエリ結果をプログラム内で取得できます。
実行すると以下のようなクエリ文字列を生成します。
SELECT table.count AS {table-name}_count, {table-name}.id AS {table-name}_id, {table-name}.date AS {table-name}_date, {table-name}.element AS {table-name}_element, {table-name}.value AS {table-name}_value, {table-name}.m_flag AS {table-name}_m_flag, {table-name}.q_flag AS {table-name}_q_flag, {table-name}.s_flag AS {table-name}_s_flag, {table-name}.obs_time AS {table-name}_obs_time
FROM {table-name}
WHERE {table-name}.id LIKE %(id_1)s AND {table-name}.element = %(element_1)s AND {table-name}.date >= %(date_1)s AND {table-name}.date < %(date_2)s ORDER BY {table-name}.date ASC, {table-name}.id LIMIT 100
INFO sqlalchemy.engine.Engine [generated in 0.00069s] {'id_1': 'JA%', 'element_1': 'TAVG', 'date_1': 20210101, 'date_2': 20210201}
WHERE節でdatetime.date(...)
が適切に整数型に変換されていることがわかります。
標準出力は以下のようになります。
JA000047401 2021-01-01 -9.1
JA000047407 2021-01-01 -12.8
JA000047409 2021-01-01 -11.7
JA000047412 2021-01-01 -10.7
JA000047418 2021-01-01 -8.8
JA000047420 2021-01-01 -9.3
JA000047421 2021-01-01 -9.9
JA000047426 2021-01-01 -8.9
JA000047430 2021-01-01 -9.6
JA000047570 2021-01-01 -1.2
[略]
dateがdatetime.date
型に変換されたため、年月日の間にハイフンが挟まる形になりました。さらにはvalueもValueType
の定義により0.1倍されています。
ただし、先述の通りDateType
、ValueType
はプログラム内での型変換を定義しているに過ぎないということに留意する必要があります。具体的には、Athenaにより吐き出されるcsv(エンジン作成で指定した{s3://output-location-name}
に生成されるcsvファイルです)の内容は
"{table-name}_count","{table-name}_id","{table-name}_date","{table-name}_element","{table-name}_value","{table-name}_m_flag","{table-name}_q_flag","{table-name}_s_flag","{table-name}_obs_time"
"12912","JA000047401","20210101","TAVG","-91","H","","S",""
"12914","JA000047407","20210101","TAVG","-128","H","","S",""
"12917","JA000047409","20210101","TAVG","-117","H","","S",""
[略]
のように、dateとvalue共に変換されていません。クエリ結果を別のプログラムで使う際にはお気をつけください。
まとめ
PyAthenaを使い、Amazon AthenaをORMのインターフェースで操作する方法について説明しました。使用するメリットとしては
- クエリがpythonコードの一部になるので入力補完の恩恵を受けられる
- クエリ結果をコード内でそのまま利用できる
- テーブル定義をコード内に書けるので実装時のミスを減らせる
などがあります。
この記事がご参考になれば幸いです。
Discussion