🦝
ActiveStatementsExceededExceptionのハンドリング
何をするのか
PythonでAWSを利用する際にはboto3というモジュールを使用することになりますが、botocoreで例外が投げられる場合に対応します。
筆者が出会した例外を例に挙げて説明しますが、他の例外の場合も同様に対応できると思います。
状況
やりたいこと
S3のオブジェクトを適当にパースしてredshiftに書き込む。
書き込みはlambdaに載せたPythonスクリプトで実行する。S3へのオブジェクト追加時にイベント通知をlambdaに送信することでトリガーする。
スクリプトは大体以下です
def lambda_handler(event: dict, context) -> None:
"""
:param event: S3バケットへのオブジェクト追加イベント
:param context:
:return:
"""
bucket_name = event['Records'][0]['s3']['bucket']['name']
object_name = event['Records'][0]['s3']['object']['key']
execute_copy(
REDSHIFT_TABLE,
f"s3://{bucket_name}/{object_name}"
)
def execute_copy(table: str, s3_object_uri: str) -> None:
# 実行するSQLを設定
sql = f"""
COPY {table} FROM '{s3_object_uri}'
REGION 'ap-northeast-1' IAM_ROLE 'redshift書き込み用IAM role'
GZIP
FORMAT JSON 'noshred';
"""
data_client = boto3.client('redshift-data')
# redshiftの環境情報は、環境変数として事前に設定済み
data_client.execute_statement(
Database=REDSHIFT_DB,
DbUser=REDSHIFT_DB_USER,
ClusterIdentifier=REDSHIFT_CLUSTER,
Sql=sql,
)
問題
S3のオブジェクト追加の頻度が多過ぎたため、lambdaの実行頻度が多くなり過ぎPythonスクリプトがエラーを吐いた
lambdaのログが出力されるcloudwatchを確認すると、エラー文とエラー原因は以下のようでした。
ValidationException
)
API Callが多すぎる (エラー文は大体こんな感じです。
[ERROR] ValidationException: An error occurred (ValidationException) when calling the ExecuteStatement operation: Rate exceeded (Service: AmazonRedshift; Status Code: 400; Error Code: Throttling; Request ID: ***; Proxy: null)
Traceback (most recent call last):
...
ActiveStatementsExceededException
)
redshiftのクエリの同時実行数が多すぎる (エラー文は大体こんな感じです。
[ERROR] ActiveStatementsExceededException: An error occurred (ActiveStatementsExceededException) when calling the ExecuteStatement operation: Active statements exceeded the allowed quota (200).
Traceback (most recent call last):
...
とった対策
例外をキャッチして、頻度が原因で生じた例外出会った場合は、クエリを再実行するようにした。
参考: エラーハンドリングについてのboto3の説明
スクリプト例
def lambda_handler(event: dict, context) -> None:
"""
:param event: S3バケットへのオブジェクト追加イベント
:param context:
:return:
"""
bucket_name = event['Records'][0]['s3']['bucket']['name']
object_name = event['Records'][0]['s3']['object']['key']
# 書き込みの成否を管理するフラグ
done = False
# 必要であればリトライしながら書き込みを実行
while not done:
done = True
try:
execute_copy(
REDSHIFT_TABLE,
f"s3://{bucket_name}/{object_name}"
)
except ClientError as error:
error_code = error.response['Error']['Code']
print("error code of a raised exception: " + error_code)
#
if is_retry_target(error_code):
print('retry query execution after sleeping...')
# 再実行させるため、フラグを更新してsleepする
done = False
time.sleep(1)
def execute_copy(table: str, s3_object_uri: str) -> None:
# 実行するSQLを設定
sql = f"""
COPY {table} FROM '{s3_object_uri}'
REGION 'ap-northeast-1' IAM_ROLE 'redshift書き込み用IAM role'
GZIP
FORMAT JSON 'noshred';
"""
data_client = boto3.client('redshift-data')
data_client.execute_statement(
Database=REDSHIFT_DB,
DbUser=REDSHIFT_DB_USER,
ClusterIdentifier=REDSHIFT_CLUSTER,
Sql=sql,
)
# エラーが、リトライすべきものかどうか判定
def is_retry_target(error_code: str) -> bool:
retry_codes = {
'ValidationException',
'ActiveStatementsExceededException'
}
return error_code in retry_codes
補足
説明ではValidationException
, ActiveStatementsExceededException
を対象にしましたが、その他の例外の場合にも同様の方法で対応できます。
Discussion