🦔

AWS Athena Prepared Statementsをboto3から利用する

10 min read

Athenaエンジンバージョン2から prepared statements がサポートされた。
これを Python SDK である boto3 から作成したり実行したりする方法を検証したのでそのまとめ。
検証に利用したコードは一部抜粋する。全コードについては リポジトリ を参照。
検証にはCloudTrailログに関するクエリを利用する。

事前準備

CloudTrailログをS3に保存する。
CloudTrailログについては省略。詳細はCloudTrailに関する公式ドキュメントを参照。

Athenaエンジンをバージョン2にアップグレードする。
prepared statement は Athena エンジンバージョン2からサポートされた機能なので
バージョン2の利用が必須。
明示的にバージョン1を利用することを選択していない限り自動的にバージョン2にアップデート済かも。
詳細はAthenaエンジンバージョン2に関する公式ドキュメントを参照。

Athenaのテーブル作成には公式ドキュメントに従い、
パーティション射影を用いたテーブルを作成する。
ただし、公式ドキュメントの方法では日付でしかパーティションが作成されない。
また、クエリの際に日にち単位で指定する必要がある。
AWSリージョンもパーティションに追加したい場合やパーティションには月単位を指定したい場合はカスタマイズする。
カスタマイズ方法は クラスメソッドさんの記事が参考になる。
以上を踏まえて以下のようなテーブルを作成した。
(BUCKET_NAMEAWS_ACCOUNT_ID は環境に応じた値を設定する)

CREATE EXTERNAL TABLE cloudtrail_logs(
         eventVersion STRING,
         userIdentity STRUCT< type: STRING,
         principalId: STRING,
         arn: STRING,
         accountId: STRING,
         invokedBy: STRING,
         accessKeyId: STRING,
         userName: STRING,
         sessionContext: STRUCT< attributes: STRUCT< mfaAuthenticated: STRING,
         creationDate: STRING>,
         sessionIssuer: STRUCT< type: STRING,
         principalId: STRING,
         arn: STRING,
         accountId: STRING,
         userName: STRING>>>,
         eventTime STRING,
         eventSource STRING,
         eventName STRING,
         awsRegion STRING,
         sourceIpAddress STRING,
         userAgent STRING,
         errorCode STRING,
         errorMessage STRING,
         requestParameters STRING,
         responseElements STRING,
         additionalEventData STRING,
         requestId STRING,
         eventId STRING,
         readOnly STRING,
         resources ARRAY<STRUCT< arn: STRING,
         accountId: STRING,
         type: STRING>>,
         eventType STRING,
         apiVersion STRING,
         recipientAccountId STRING,
         serviceEventDetails STRING,
         sharedEventID STRING,
         vpcEndpointId STRING
) PARTITIONED BY (
        `region` string,
         `timestamp` string
)
ROW FORMAT SERDE 'com.amazon.emr.hive.serde.CloudTrailSerde'
STORED AS INPUTFORMAT 'com.amazon.emr.cloudtrail.CloudTrailInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://BUCKET_NAME/AWSLogs/AWS_ACCOUNT_ID/CloudTrail/'
TBLPROPERTIES (
  'projection.enabled'='true',
  'projection.timestamp.format'='yyyy/MM',
  'projection.timestamp.interval'='1',
  'projection.timestamp.interval.unit'='MONTHS',
  'projection.timestamp.range'='2018/01,NOW',
  'projection.timestamp.type'='date',
  'projection.region.type' = 'enum',
  'projection.region.values'='us-east-1,us-east-2,us-west-1,us-west-2,af-south-1,ap-east-1,ap-south-1,ap-northeast-2,ap-southeast-1,ap-southeast-2,ap-northeast-1,ca-central-1,eu-central-1,eu-west-1,eu-west-2,eu-south-1,eu-west-3,eu-north-1,me-south-1,sa-east-1',
  'storage.location.template'='s3://BUCKET_NAME/AWSLogs/AWS_ACCOUNT_ID/CloudTrail/${region}/${timestamp}'
)

上記セットアップが完了すると、以下のようなクエリでログが取得できるようになる。

SELECT *
  FROM "default"."cloudtrail_logs"
 WHERE region = 'ap-northeast-1' AND timestamp = '2021/06'
 LIMIT 10;

prepared statementsなしの実行

詳細はリポジトリの query_wo_prepared.py を参照。
正常にクエリを実行できていれば、実行結果をS3からダウンロードして結果を表示できる。
今回やりたいことは、これを prepared statementを用いて実行すること。

prepared statement なしクエリ実行部分抜粋
resp = athena.start_query_execution(
    QueryString = """
    SELECT *
    FROM \"cloudtrail_logs\"
    WHERE region = 'ap-northeast-1' AND timestamp = '2021/06'
    LIMIT 10;
    """,
    ResultConfiguration = {
        'OutputLocation': 's3://' + os.environ['AWS_S3_BUCKET_QUERY_RESULTS']
    }
)

prepared statementsを用いた実行

boto3を用いた prepared statements の作成・実行方法は大きく2つの方法がある。

start_query_executionで作成する

PREPARE 句を利用することで通常のクエリと同様にprepared statementsを作成・更新することができる。
詳細はリポジトリの create_prepared.py を参照。

start_query_execution によるprepared statements作成部分抜粋
resp = athena.start_query_execution(
    QueryString = """
    PREPARE cloudtrail FROM
    SELECT *
    FROM \"cloudtrail_logs\"
    WHERE region = ? AND timestamp = ?
    LIMIT 10;
    """,
    ResultConfiguration = {
        'OutputLocation': 's3://' + os.environ['AWS_S3_BUCKET_QUERY_RESULTS']
    }
)

通常のクエリと異なるのは、実行結果がS3に保存されるのではなく、直接レスポンスとして返ること。
クエリ結果が S3 に <execution_id>.txt として保存されるが、試した限り内容は常に空だった。
prepared statementsの作成に失敗する場合はエラーメッセージが保存されるかもしれないが、
エラーになるケースが思い付かなかった。

実際に作成された prepared statement は以下の通り。
デフォルトで Description が設定される。

$ aws athena get-prepared-statement --work-group primary --statement-name cloudtrail
{
    "PreparedStatement": {
        "StatementName": "cloudtrail",
        "QueryStatement": "SELECT *\nFROM\n  cloudtrail_logs\nWHERE ((region = ?) AND (timestamp = ?))\nLIMIT 10\n",
        "WorkGroupName": "primary",
        "Description": "Created through SQL command.",
        "LastModifiedTime": "2021-06-27T22:38:30.314000+09:00"
    }
}

create_prepared_statementで作成する

prepared statementを作成・更新する専用のAPIとして
create_prepared_statement および update_prepared_statement
が提供されているのでこれを利用する。

通常のクエリとは異なり、クエリの実行を待つ必要がなくなるため get_query_execution
実行したクエリの結果を確認するといった作業が不要になる。
prepared statementに不正がある場合は create_prepared_statement の実行時にエラーになってくれる。

一方で、prepared statementの作成と更新でAPIが異なることが面倒という課題がある。
同一名称のprepared statementが既に存在する場合、 create_prepared_statement は失敗する。
また、同一名称のprepared statementが存在しない場合、 update_prepared_statement は失敗する。

(create_prepared_statementでエラー)
botocore.errorfactory.InvalidRequestException: An error occurred (InvalidRequestException) when calling the CreatePreparedStatement operation: Prepared Statement cloudtrail already exists in WorkGroup primary

(update_prepared_statementでエラー)
botocore.errorfactory.ResourceNotFoundException: An error occurred (ResourceNotFoundException) when calling the UpdatePreparedStatement operation: Prepared Statement cloudtrail does not exist in WorkGroup primary 

対策としては対象のprepared statementが存在するか確認した上で create or updateを使い分ける必要がある。
ただし、 get_prepared statement は対象の prepared statement が存在しなければ例外になる。
このため、エラーハンドリングが必要で少し面倒。

上記を踏まえた create_prepared_statement および update_prepared_statement
の実行方法は以下の通り。
詳細はリポジトリの query_create_prepared.py を参照。

create_prepared_statement および update_prepared_statement 実行部分抜粋
try:
    result = athena.get_prepared_statement(
        StatementName = "cloudtrail",
        WorkGroup = "primary"
    )
except athena.exceptions.ResourceNotFoundException:
    result = None

if result is None:
    resp = athena.create_prepared_statement(
        StatementName = "cloudtrail",
        WorkGroup = "primary",
        QueryStatement = """
        SELECT *
        FROM \"cloudtrail_logs\"
        WHERE region = ? AND timestamp = ?
        LIMIT 10;
        """
    )
    print('create prepared statement: ' + str(resp))
else:
    resp = athena.update_prepared_statement(
        StatementName = "cloudtrail",
        WorkGroup = "primary",
        QueryStatement = """
        SELECT *
        FROM \"cloudtrail_logs\"
        WHERE region = ? AND timestamp = ?
        LIMIT 10;
        """
    )
    print('update prepared statement: ' + str(resp))

prepared_statementsを実行する

基本的なクエリの流れは prepared statement なしの場合と同じで、
実際に投げるクエリが異なるだけ。
以下のように、クエリ実行の際はパラメタの指定のみを行うので
SQLインジェクションのリスクを低減することができる。

prepared statement ありクエリ実行部分抜粋
resp = athena.start_query_execution(
    QueryString = """
    EXECUTE cloudtrail
    USING 'ap-northeast-1', '2021/06';
    """,
    ResultConfiguration = {
        'OutputLocation': 's3://' + os.environ['AWS_S3_BUCKET_QUERY_RESULTS']
    }
)

その他

マネジメントコンソール上でのprepared statementの確認

マネジメントコンソール上で作成したprepared statementを確認しようとしたが,
作成済のprepared statementを表示するようなタブ等は存在しなかった.
おそらくマネジメントコンソール上で確認することができない.

作成済のprepared statementを確認するには,aws cliやboto3を利用する必要がある.
aws cliを利用する場合は list-prepared-statementsget-prepared-statement を利用する.

作成済 prepared statements 確認方法
$ aws athena list-prepared-statements --work-group primary
{
    "PreparedStatements": [
        {
            "StatementName": "cloudtrail",
            "LastModifiedTime": "2021-06-27T18:37:53.353000+09:00"
        }
    ]
}

$ aws athena get-prepared-statement --work-group primary --statement-name cloudtrail
{
    "PreparedStatement": {
        "StatementName": "cloudtrail",
        "QueryStatement": "SELECT *\nFROM\n  cloudtrail_log\nWHERE ((region = ?) AND (timestamp = ?))\nLIMIT 10\n",
        "WorkGroupName": "primary",
        "LastModifiedTime": "2021-06-27T18:37:53.353000+09:00"
    }
}

boto3を利用する場合は list_prepared_statementsget_prepared_statement を利用する.

Refs

Discussion

ログインするとコメントできます