AWS Athena Prepared Statementsをboto3から利用する
Athenaエンジンバージョン2から prepared statements がサポートされた。
これを Python SDK である boto3 から作成したり実行したりする方法を検証したのでそのまとめ。
検証に利用したコードは一部抜粋する。全コードについては リポジトリ を参照。
検証にはCloudTrailログに関するクエリを利用する。
事前準備
CloudTrailログをS3に保存する。
CloudTrailログについては省略。詳細はCloudTrailに関する公式ドキュメントを参照。
Athenaエンジンをバージョン2にアップグレードする。
prepared statement は Athena エンジンバージョン2からサポートされた機能なので
バージョン2の利用が必須。
明示的にバージョン1を利用することを選択していない限り自動的にバージョン2にアップデート済かも。
詳細はAthenaエンジンバージョン2に関する公式ドキュメントを参照。
Athenaのテーブル作成には公式ドキュメントに従い、
パーティション射影を用いたテーブルを作成する。
ただし、公式ドキュメントの方法では日付でしかパーティションが作成されない。
また、クエリの際に日にち単位で指定する必要がある。
AWSリージョンもパーティションに追加したい場合やパーティションには月単位を指定したい場合はカスタマイズする。
カスタマイズ方法は クラスメソッドさんの記事が参考になる。
以上を踏まえて以下のようなテーブルを作成した。
(BUCKET_NAME
や AWS_ACCOUNT_ID
は環境に応じた値を設定する)
CREATE EXTERNAL TABLE cloudtrail_logs(
eventVersion STRING,
userIdentity STRUCT< type: STRING,
principalId: STRING,
arn: STRING,
accountId: STRING,
invokedBy: STRING,
accessKeyId: STRING,
userName: STRING,
sessionContext: STRUCT< attributes: STRUCT< mfaAuthenticated: STRING,
creationDate: STRING>,
sessionIssuer: STRUCT< type: STRING,
principalId: STRING,
arn: STRING,
accountId: STRING,
userName: STRING>>>,
eventTime STRING,
eventSource STRING,
eventName STRING,
awsRegion STRING,
sourceIpAddress STRING,
userAgent STRING,
errorCode STRING,
errorMessage STRING,
requestParameters STRING,
responseElements STRING,
additionalEventData STRING,
requestId STRING,
eventId STRING,
readOnly STRING,
resources ARRAY<STRUCT< arn: STRING,
accountId: STRING,
type: STRING>>,
eventType STRING,
apiVersion STRING,
recipientAccountId STRING,
serviceEventDetails STRING,
sharedEventID STRING,
vpcEndpointId STRING
) PARTITIONED BY (
`region` string,
`timestamp` string
)
ROW FORMAT SERDE 'com.amazon.emr.hive.serde.CloudTrailSerde'
STORED AS INPUTFORMAT 'com.amazon.emr.cloudtrail.CloudTrailInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://BUCKET_NAME/AWSLogs/AWS_ACCOUNT_ID/CloudTrail/'
TBLPROPERTIES (
'projection.enabled'='true',
'projection.timestamp.format'='yyyy/MM',
'projection.timestamp.interval'='1',
'projection.timestamp.interval.unit'='MONTHS',
'projection.timestamp.range'='2018/01,NOW',
'projection.timestamp.type'='date',
'projection.region.type' = 'enum',
'projection.region.values'='us-east-1,us-east-2,us-west-1,us-west-2,af-south-1,ap-east-1,ap-south-1,ap-northeast-2,ap-southeast-1,ap-southeast-2,ap-northeast-1,ca-central-1,eu-central-1,eu-west-1,eu-west-2,eu-south-1,eu-west-3,eu-north-1,me-south-1,sa-east-1',
'storage.location.template'='s3://BUCKET_NAME/AWSLogs/AWS_ACCOUNT_ID/CloudTrail/${region}/${timestamp}'
)
上記セットアップが完了すると、以下のようなクエリでログが取得できるようになる。
SELECT *
FROM "default"."cloudtrail_logs"
WHERE region = 'ap-northeast-1' AND timestamp = '2021/06'
LIMIT 10;
prepared statementsなしの実行
詳細はリポジトリの query_wo_prepared.py
を参照。
正常にクエリを実行できていれば、実行結果をS3からダウンロードして結果を表示できる。
今回やりたいことは、これを prepared statementを用いて実行すること。
resp = athena.start_query_execution(
QueryString = """
SELECT *
FROM \"cloudtrail_logs\"
WHERE region = 'ap-northeast-1' AND timestamp = '2021/06'
LIMIT 10;
""",
ResultConfiguration = {
'OutputLocation': 's3://' + os.environ['AWS_S3_BUCKET_QUERY_RESULTS']
}
)
prepared statementsを用いた実行
boto3を用いた prepared statements の作成・実行方法は大きく2つの方法がある。
start_query_executionで作成する
PREPARE
句を利用することで通常のクエリと同様にprepared statementsを作成・更新することができる。
詳細はリポジトリの create_prepared.py
を参照。
resp = athena.start_query_execution(
QueryString = """
PREPARE cloudtrail FROM
SELECT *
FROM \"cloudtrail_logs\"
WHERE region = ? AND timestamp = ?
LIMIT 10;
""",
ResultConfiguration = {
'OutputLocation': 's3://' + os.environ['AWS_S3_BUCKET_QUERY_RESULTS']
}
)
通常のクエリと異なるのは、実行結果がS3に保存されるのではなく、直接レスポンスとして返ること。
クエリ結果が S3 に <execution_id>.txt
として保存されるが、試した限り内容は常に空だった。
prepared statementsの作成に失敗する場合はエラーメッセージが保存されるかもしれないが、
エラーになるケースが思い付かなかった。
実際に作成された prepared statement は以下の通り。
デフォルトで Description が設定される。
$ aws athena get-prepared-statement --work-group primary --statement-name cloudtrail
{
"PreparedStatement": {
"StatementName": "cloudtrail",
"QueryStatement": "SELECT *\nFROM\n cloudtrail_logs\nWHERE ((region = ?) AND (timestamp = ?))\nLIMIT 10\n",
"WorkGroupName": "primary",
"Description": "Created through SQL command.",
"LastModifiedTime": "2021-06-27T22:38:30.314000+09:00"
}
}
create_prepared_statementで作成する
prepared statementを作成・更新する専用のAPIとして
create_prepared_statement
および update_prepared_statement
が提供されているのでこれを利用する。
通常のクエリとは異なり、クエリの実行を待つ必要がなくなるため get_query_execution
で
実行したクエリの結果を確認するといった作業が不要になる。
prepared statementに不正がある場合は create_prepared_statement
の実行時にエラーになってくれる。
一方で、prepared statementの作成と更新でAPIが異なることが面倒という課題がある。
同一名称のprepared statementが既に存在する場合、 create_prepared_statement
は失敗する。
また、同一名称のprepared statementが存在しない場合、 update_prepared_statement
は失敗する。
(create_prepared_statementでエラー)
botocore.errorfactory.InvalidRequestException: An error occurred (InvalidRequestException) when calling the CreatePreparedStatement operation: Prepared Statement cloudtrail already exists in WorkGroup primary
(update_prepared_statementでエラー)
botocore.errorfactory.ResourceNotFoundException: An error occurred (ResourceNotFoundException) when calling the UpdatePreparedStatement operation: Prepared Statement cloudtrail does not exist in WorkGroup primary
対策としては対象のprepared statementが存在するか確認した上で create or updateを使い分ける必要がある。
ただし、 get_prepared statement
は対象の prepared statement が存在しなければ例外になる。
このため、エラーハンドリングが必要で少し面倒。
上記を踏まえた create_prepared_statement
および update_prepared_statement
の実行方法は以下の通り。
詳細はリポジトリの query_create_prepared.py
を参照。
try:
result = athena.get_prepared_statement(
StatementName = "cloudtrail",
WorkGroup = "primary"
)
except athena.exceptions.ResourceNotFoundException:
result = None
if result is None:
resp = athena.create_prepared_statement(
StatementName = "cloudtrail",
WorkGroup = "primary",
QueryStatement = """
SELECT *
FROM \"cloudtrail_logs\"
WHERE region = ? AND timestamp = ?
LIMIT 10;
"""
)
print('create prepared statement: ' + str(resp))
else:
resp = athena.update_prepared_statement(
StatementName = "cloudtrail",
WorkGroup = "primary",
QueryStatement = """
SELECT *
FROM \"cloudtrail_logs\"
WHERE region = ? AND timestamp = ?
LIMIT 10;
"""
)
print('update prepared statement: ' + str(resp))
prepared_statementsを実行する
基本的なクエリの流れは prepared statement なしの場合と同じで、
実際に投げるクエリが異なるだけ。
以下のように、クエリ実行の際はパラメタの指定のみを行うので
SQLインジェクションのリスクを低減することができる。
resp = athena.start_query_execution(
QueryString = """
EXECUTE cloudtrail
USING 'ap-northeast-1', '2021/06';
""",
ResultConfiguration = {
'OutputLocation': 's3://' + os.environ['AWS_S3_BUCKET_QUERY_RESULTS']
}
)
その他
マネジメントコンソール上でのprepared statementの確認
マネジメントコンソール上で作成したprepared statementを確認しようとしたが,
作成済のprepared statementを表示するようなタブ等は存在しなかった.
おそらくマネジメントコンソール上で確認することができない.
作成済のprepared statementを確認するには,aws cliやboto3を利用する必要がある.
aws cliを利用する場合は list-prepared-statements
や get-prepared-statement
を利用する.
$ aws athena list-prepared-statements --work-group primary
{
"PreparedStatements": [
{
"StatementName": "cloudtrail",
"LastModifiedTime": "2021-06-27T18:37:53.353000+09:00"
}
]
}
$ aws athena get-prepared-statement --work-group primary --statement-name cloudtrail
{
"PreparedStatement": {
"StatementName": "cloudtrail",
"QueryStatement": "SELECT *\nFROM\n cloudtrail_log\nWHERE ((region = ?) AND (timestamp = ?))\nLIMIT 10\n",
"WorkGroupName": "primary",
"LastModifiedTime": "2021-06-27T18:37:53.353000+09:00"
}
}
boto3を利用する場合は list_prepared_statements
や get_prepared_statement
を利用する.
Discussion