🦝

ActiveStatementsExceededExceptionのハンドリング

2023/01/03に公開

何をするのか

PythonでAWSを利用する際にはboto3というモジュールを使用することになりますが、botocoreで例外が投げられる場合に対応します。
筆者が出会した例外を例に挙げて説明しますが、他の例外の場合も同様に対応できると思います。

状況

やりたいこと

S3のオブジェクトを適当にパースしてredshiftに書き込む。
書き込みはlambdaに載せたPythonスクリプトで実行する。S3へのオブジェクト追加時にイベント通知をlambdaに送信することでトリガーする。

スクリプトは大体以下です

def lambda_handler(event: dict, context) -> None:
    """
    :param event: S3バケットへのオブジェクト追加イベント
    :param context:
    :return:
    """
    bucket_name = event['Records'][0]['s3']['bucket']['name']
    object_name = event['Records'][0]['s3']['object']['key']

    execute_copy(
	REDSHIFT_TABLE,
	f"s3://{bucket_name}/{object_name}"
    )

def execute_copy(table: str, s3_object_uri: str) -> None:
    # 実行するSQLを設定
    sql = f"""
    COPY {table} FROM '{s3_object_uri}'
    REGION 'ap-northeast-1' IAM_ROLE 'redshift書き込み用IAM role'
    GZIP
    FORMAT JSON 'noshred';
    """

    data_client = boto3.client('redshift-data')
    # redshiftの環境情報は、環境変数として事前に設定済み
    data_client.execute_statement(
        Database=REDSHIFT_DB,
        DbUser=REDSHIFT_DB_USER,
        ClusterIdentifier=REDSHIFT_CLUSTER,
        Sql=sql,
    )

問題

S3のオブジェクト追加の頻度が多過ぎたため、lambdaの実行頻度が多くなり過ぎPythonスクリプトがエラーを吐いた
lambdaのログが出力されるcloudwatchを確認すると、エラー文とエラー原因は以下のようでした。

API Callが多すぎる (ValidationException)

エラー文は大体こんな感じです。

[ERROR] ValidationException: An error occurred (ValidationException) when calling the ExecuteStatement operation: Rate exceeded (Service: AmazonRedshift; Status Code: 400; Error Code: Throttling; Request ID: ***; Proxy: null)
Traceback (most recent call last):
...

redshiftのクエリの同時実行数が多すぎる (ActiveStatementsExceededException)

エラー文は大体こんな感じです。

[ERROR] ActiveStatementsExceededException: An error occurred (ActiveStatementsExceededException) when calling the ExecuteStatement operation: Active statements exceeded the allowed quota (200).
Traceback (most recent call last):
...

とった対策

例外をキャッチして、頻度が原因で生じた例外出会った場合は、クエリを再実行するようにした。
参考: エラーハンドリングについてのboto3の説明
https://boto3.amazonaws.com/v1/documentation/api/latest/guide/error-handling.html#botocore-exceptions

スクリプト例

def lambda_handler(event: dict, context) -> None:
    """
    :param event: S3バケットへのオブジェクト追加イベント
    :param context:
    :return:
    """
    bucket_name = event['Records'][0]['s3']['bucket']['name']
    object_name = event['Records'][0]['s3']['object']['key']

    # 書き込みの成否を管理するフラグ
    done = False

    # 必要であればリトライしながら書き込みを実行
    while not done:
        done = True
        try:
            execute_copy(
                REDSHIFT_TABLE,
                f"s3://{bucket_name}/{object_name}"
            )

        except ClientError as error:
            error_code = error.response['Error']['Code']
            print("error code of a raised exception: " + error_code)
	    # 
            if is_retry_target(error_code):
                print('retry query execution after sleeping...')
                # 再実行させるため、フラグを更新してsleepする
                done = False
                time.sleep(1)

def execute_copy(table: str, s3_object_uri: str) -> None:
    # 実行するSQLを設定
    sql = f"""
    COPY {table} FROM '{s3_object_uri}'
    REGION 'ap-northeast-1' IAM_ROLE 'redshift書き込み用IAM role'
    GZIP
    FORMAT JSON 'noshred';
    """

    data_client = boto3.client('redshift-data')
    data_client.execute_statement(
        Database=REDSHIFT_DB,
        DbUser=REDSHIFT_DB_USER,
        ClusterIdentifier=REDSHIFT_CLUSTER,
        Sql=sql,
    )

# エラーが、リトライすべきものかどうか判定
def is_retry_target(error_code: str) -> bool:
    retry_codes = {
        'ValidationException',
        'ActiveStatementsExceededException'
    }
    return error_code in retry_codes

補足

説明ではValidationException, ActiveStatementsExceededExceptionを対象にしましたが、その他の例外の場合にも同様の方法で対応できます。

Discussion