Chapter 08

Lambdaの非同期・並列処理を行う

inunekousapion
inunekousapion
2022.07.19に更新
このチャプターの目次

Chaliceの様々なイベントから非同期・並列処理を行うにはSimple Queue Service(SQS)、または、Simple Notification Service(SNS)、StepFunctionsを使用します。
ここではSQSを利用した非同期、並列処理について記述します。

CDKの設定

  • chaliceapp.pyでの定義
dl_queue = sqs.Queue(self, "dl_queue", queue_name="dl_queue")
dl_queue_cnf = sqs.DeadLetterQueue(max_receive_count=1, queue=dl_queue)
my_queue = sqs.Queue(self, "my_queue", queue_name="my_queue", dead_letter_queue=dl_queue_cnf, visibility_timeout=cdk.Duration.seconds(100))

SQSからイベントを受け取ったLambdaが例外を発生させた場合、max_receive_count回数のリトライを行った後、タスクは可視性タイムアウト経過後にデッドレターキューに渡ります。

Chaliceの設定

下記はlambda_function_sourceで30個のタスクを3回に分けてSQSにタスクを積んで、lambda_function_destinationでSQS経由で非同期・並列実行する例です。

  • runtime/app.py
@app.lambda_function(name="lambda_function_source")
def lambda_function_source(event, context):
    client = boto3.client('sqs')
    for index in range(3):
        client.send_message_batch(
            QueueUrl="https://sqs.ap-northeast-1.amazonaws.com/********/my_queue",
            Entries=[
                {"Id": str(uuid.uuid4()), f"MessageBody": "Request is {i}"} for i in range(10)
            ]
        )


@app.on_sqs_message(queue="my_queue", batch_size=1, name="lambda_function_destination")
def lambda_function_destination(event):
    logger.debug(event)
    if random.randint(0, 10) > 8:
        raise Exception("Error!!")

Chalice CDKではLambdaの名前がデプロイ前にわからないため、直接invokeすることはできません。SQS(SNS)を利用して実行する必要があります。