このチャプターの目次
Chaliceの様々なイベントから非同期・並列処理を行うにはSimple Queue Service(SQS)、または、Simple Notification Service(SNS)、StepFunctionsを使用します。
ここではSQSを利用した非同期、並列処理について記述します。
CDKの設定
- chaliceapp.pyでの定義
dl_queue = sqs.Queue(self, "dl_queue", queue_name="dl_queue")
dl_queue_cnf = sqs.DeadLetterQueue(max_receive_count=1, queue=dl_queue)
my_queue = sqs.Queue(self, "my_queue", queue_name="my_queue", dead_letter_queue=dl_queue_cnf, visibility_timeout=cdk.Duration.seconds(100))
SQSからイベントを受け取ったLambdaが例外を発生させた場合、max_receive_count回数のリトライを行った後、タスクは可視性タイムアウト経過後にデッドレターキューに渡ります。
Chaliceの設定
下記はlambda_function_source
で30個のタスクを3回に分けてSQSにタスクを積んで、lambda_function_destination
でSQS経由で非同期・並列実行する例です。
- runtime/app.py
@app.lambda_function(name="lambda_function_source")
def lambda_function_source(event, context):
client = boto3.client('sqs')
for index in range(3):
client.send_message_batch(
QueueUrl="https://sqs.ap-northeast-1.amazonaws.com/********/my_queue",
Entries=[
{"Id": str(uuid.uuid4()), f"MessageBody": "Request is {i}"} for i in range(10)
]
)
@app.on_sqs_message(queue="my_queue", batch_size=1, name="lambda_function_destination")
def lambda_function_destination(event):
logger.debug(event)
if random.randint(0, 10) > 8:
raise Exception("Error!!")
Chalice CDKではLambdaの名前がデプロイ前にわからないため、直接invokeすることはできません。SQS(SNS)を利用して実行する必要があります。