🐧

複数のAWS Lambdaを連携する方法について

2024/07/07に公開

はじめに

AWS Lambdaはイベント駆動型のサーバーレスコンピューティングサービスです。
API GatewayやEventBridge等様々なサービスから起動されます。

今回は複数のAWS Lambdaを連携する方法とベストプラクティスについて考えてみます。

どういった方法があるか

複数のAWS Lambdaを連携する方法としてはいくつかのパターンがありますが、代表的なものは以下となります。

  1. LambdaからLamdbaを直接起動する
  2. LambdaからSQSを介してLambdaと連携する
  3. Step Functionsを使ってLambdaを連携する

他にもLambda→SNS→LambdaやDynamoDBやS3のトリガーを使用するパターン等様々なものがあります。

LambdaからLamdbaを直接起動する

概要

起動元のLambdaから別のLambdaを直接起動するパターンです。

このパターンで重要なポイントは「同期」と「非同期」の2つの実行形式です。

  • 同期実行

    • 呼び出し元が呼び出したLambdaのレスポンスの返送を待つ方式
      処理結果が返却されるので例えばAPI Gateway等で処理結果をレスポンスに乗せる必要がある場合等のリアルタイム応答が必要な場合に使用されます。
  • 非同期実行

    • 呼び出し元が呼び出したLambdaのレスポンスの返送を待たずに後続の処理を継続する方式
      この場合、Lambdaへの呼び出しは一度キューに入って実行されます
      実行タイミングはキューから呼び出されたタイミングとなるので、例えば、バッチ処理やS3と連携した画像処理やデータ処理等のすぐに処理結果が必要でない場合に使用されます。

サンプルコード

今回は呼び出し元でDynamoDBにデータを登録、呼び出し先にkey情報を連携し、DynamoDBよりデータを取得するようなサンプルを準備しました。

呼び出し先は共通で以下となります。

index.mjs
import {DynamoDBClient} from '@aws-sdk/client-dynamodb';
import {DynamoDBDocumentClient, GetCommand} from '@aws-sdk/lib-dynamodb';

const client = new DynamoDBClient({});
const docClient = DynamoDBDocumentClient.from(client);

export const handler = async (event) => {
  
    console.log('呼び出し先Lambda関数:', event);
	try {

		// DynamoDBから値を読み込み
		const result = await docClient.send(
			new GetCommand({
				TableName: "testApp",
				Key: {
					"testId": event.key,
				},
			})
		);
		
		console.log('データ取得結果:' , result.Item);
		
        return {
            statusCode: 200,
            body: JSON.stringify({
                message: '呼び出し成功',
                data: result.Item
            })
        };
        
	} catch (error) {
		console.error('失敗', error);
		return {
			statusCode: 500,
			body: JSON.stringify({
				message: '呼び出し失敗',
				error: error.message
			})
		};
	}

};

同期パターン

呼び出し元のソースは以下となります。
ポイントはInvocationTypeをRequestResponse(同期)で設定し、レスポンスを待機する部分ですね。
処理結果が返却されるので取得した値を表示することが可能です。

index.mjs
import { DynamoDBClient } from "@aws-sdk/client-dynamodb";
import { DynamoDBDocumentClient, PutCommand } from "@aws-sdk/lib-dynamodb";
import { LambdaClient, InvokeCommand } from "@aws-sdk/client-lambda";

const client = new DynamoDBClient({});
const docClient = DynamoDBDocumentClient.from(client);
const lambdaClient = new LambdaClient({ region: "ap-northeast-1" });

export const handler = async (event) => {

  console.log(event);
  try {
    let param = {};
    let id = crypto.randomUUID();
    let data = "保存データ";

    // dynamoDBに書き込み
    await docClient.send(
      new PutCommand({
        TableName: "testApp",
        Item: {
          testId: id,
          data: data,
        },
      })
    );

    let payload = {
        key: id
    };

    const params = {
        FunctionName: "lambdaToLambda",
        InvocationType: "RequestResponse", // 同期
        Payload: Buffer.from(JSON.stringify(payload)),
    };

    console.log(payload);

    const result = await lambdaClient.send(new InvokeCommand(params));
    console.log('同期呼び出し成功:', JSON.parse(Buffer.from(result.Payload).toString()));

    return {
      statusCode: 200,
      body: JSON.stringify({
        message: "正常",
        data: payload,
      }),
    };
  } catch (error) {
    console.error("失敗", error);
    return {
      statusCode: 500,
      body: JSON.stringify({
        message: "失敗",
        error: error.message,
      }),
    };
  }
};
  • 実行結果
    下記ログより正しく取得できていることが確認できました。

    • 呼び出し元ログ

    • 呼び出し先ログ

非同期パターン

呼び出し元のソースは以下となります。
ポイントはInvocationTypeをEvent(非同期)で設定している箇所になります。
処理結果の返却を待たずにリターンしています。

index.mjs
import { DynamoDBClient } from "@aws-sdk/client-dynamodb";
import { DynamoDBDocumentClient, PutCommand } from "@aws-sdk/lib-dynamodb";
import { LambdaClient, InvokeCommand } from "@aws-sdk/client-lambda";

const client = new DynamoDBClient({});
const docClient = DynamoDBDocumentClient.from(client);
const lambdaClient = new LambdaClient({ region: "ap-northeast-1" });

export const handler = async (event) => {

  console.log(event);
  try {
    let param = {};
    let id = crypto.randomUUID();
    let data = "保存データ";

    // dynamoDBに書き込み
    await docClient.send(
      new PutCommand({
        TableName: "testApp",
        Item: {
          testId: id,
          data: data,
        },
      })
    );

    let payload = {
        key: id
    };

    const params = {
        FunctionName: "lambdaToLambda",
        InvocationType: "Event", // 非同期
        Payload: Buffer.from(JSON.stringify(payload)),
    };

    console.log(payload);

    const result = await lambdaClient.send(new InvokeCommand(params));
    console.log('非同期呼び出し成功');

    return {
      statusCode: 200,
      body: JSON.stringify({
        message: "正常",
        data: payload,
      }),
    };
  } catch (error) {
    console.error("失敗", error);
    return {
      statusCode: 500,
      body: JSON.stringify({
        message: "失敗",
        error: error.message,
      }),
    };
  }
};

  • 実行結果
    下記ログより正しく取得できていることが確認できました。

    • 呼び出し元ログ

    • 呼び出し先ログ

直接起動のメリットとデメリット

メリット

  • 実装を比較的簡単に行うことができる
  • 直接Lambdaを連携させることができるので処理が高速

デメリット

  • エラー処理が複雑となり、エラー発生時のリトライや再送の考慮が必要
  • 呼び出し先の処理中に呼び出し元が待機することになるので待ちの間分無駄にLambdaの課金が発生する
  • 呼び出し元と呼び出し先の同時実行数を一致させないと処理が正しく起動されない

LambdaからSQSを介してLambdaと連携する

概要

起動元のLambdaから一度SQSを経由して別のLambdaを起動するパターンです。

下記のように呼び出し先にSQSトリガーを設定しておきます。

サンプルコード

呼び出し元

index.mjs
import { DynamoDBClient } from "@aws-sdk/client-dynamodb";
import { DynamoDBDocumentClient, PutCommand } from "@aws-sdk/lib-dynamodb";
import { SQSClient, SendMessageCommand } from "@aws-sdk/client-sqs";

const client = new DynamoDBClient({});
const docClient = DynamoDBDocumentClient.from(client);
const sqsClient = new SQSClient({ region: "ap-northeast-1" });

export const handler = async (event) => {

  console.log(event);
  try {
    let param = {};
    let id = crypto.randomUUID();
    let data = "保存データ";

    // dynamoDBに書き込み
    await docClient.send(
      new PutCommand({
        TableName: "testApp",
        Item: {
          testId: id,
          data: data,
        },
      })
    );

    let payload = {
        key: id
    };

    const params = {
        QueueUrl: 'https://sqs.ap-northeast-1.amazonaws.com/905418288093/test',
        MessageBody: JSON.stringify(payload)
    };

    console.log(payload);

    const result = await sqsClient.send(new SendMessageCommand(params));
    console.log('メッセージ送信成功:', result);

    return {
      statusCode: 200,
      body: JSON.stringify({
        message: "正常",
        data: payload,
      }),
    };
  } catch (error) {
    console.error('メッセージ送信失敗:', error);
    return {
      statusCode: 500,
      body: JSON.stringify({
        message: "失敗",
        error: error.message,
      }),
    };
  }
};

呼び出し先

index.mjs
import {DynamoDBClient} from '@aws-sdk/client-dynamodb';
import {DynamoDBDocumentClient, GetCommand} from '@aws-sdk/lib-dynamodb';

const client = new DynamoDBClient({});
const docClient = DynamoDBDocumentClient.from(client);

export const handler = async (event) => {
    for (const record of event.Records) {
        const messageBody = JSON.parse(record.body);
        console.log('メッセージ受信:', messageBody);
        try {
    
            // DynamoDBから値を読み込み
            const result = await docClient.send(
                new GetCommand({
                    TableName: "testApp",
                    Key: {
                        "testId": messageBody.key,
                    },
                })
            );
            
            console.log('データ取得結果:' , result.Item);
            
            return {
                statusCode: 200,
                body: JSON.stringify({
                    message: '呼び出し成功',
                    data: result.Item
                })
            };
            
        } catch (error) {
            console.error('失敗', error);
            return {
                statusCode: 500,
                body: JSON.stringify({
                    message: '呼び出し失敗',
                    error: error.message
                })
            };
        }
    }
};
  • 実行結果
    下記ログより正しく取得できていることが確認できました。

    • 呼び出し元ログ

    • 呼び出し先ログ

SQS経由のメリットとデメリット

メリット

  • SQSでのキューイングにより耐障害性が向上
  • リトライやエラー対応をSQSに任せることができる

デメリット

  • SQSを経由するのでレイテンシが増加する
  • SQS分のサービスコストが発生する

Step Functionsを使ってLambdaを連携する

概要

Step Functionsを使ってLambdaを連携するパターンです。

下記のようにStep Functionsを作成します。

サンプルコード

呼び出し元でDynamoDBに保存したキー情報を返却し、StepFunctionsで後続のLambdaに連携しています。

呼び出し元

index.mjs
import { DynamoDBClient } from "@aws-sdk/client-dynamodb";
import { DynamoDBDocumentClient, PutCommand } from "@aws-sdk/lib-dynamodb";

const client = new DynamoDBClient({});
const docClient = DynamoDBDocumentClient.from(client);

export const handler = async (event) => {

  console.log(event);
  try {
    let param = {};
    let id = crypto.randomUUID();
    let data = "保存データ";

    // dynamoDBに書き込み
    await docClient.send(
      new PutCommand({
        TableName: "testApp",
        Item: {
          testId: id,
          data: data,
        },
      })
    );

    let payload = {
        key: id
    };

    console.log(payload);

    return payload;
    
  } catch (error) {
    console.error("失敗", error);
    return;
  }
};

呼び出し先

index.mjs
import {DynamoDBClient} from '@aws-sdk/client-dynamodb';
import {DynamoDBDocumentClient, GetCommand} from '@aws-sdk/lib-dynamodb';

const client = new DynamoDBClient({});
const docClient = DynamoDBDocumentClient.from(client);

export const handler = async (event) => {
  
    console.log('呼び出し先Lambda関数:', event);
	try {

		// DynamoDBから値を読み込み
		const result = await docClient.send(
			new GetCommand({
				TableName: "testApp",
				Key: {
					"testId": event.key,
				},
			})
		);
		
		console.log('データ取得結果:' , result.Item);
		
        return {
            statusCode: 200,
            body: JSON.stringify({
                message: '呼び出し成功',
                data: result.Item
            })
        };
        
	} catch (error) {
		console.error('失敗', error);
		return {
			statusCode: 500,
			body: JSON.stringify({
				message: '呼び出し失敗',
				error: error.message
			})
		};
	}

};
  • 実行結果
    下記ログより正しく取得できていることが確認できました。

    • 呼び出し元ログ

    • 呼び出し先ログ

Step Functionsを使ってLambdaを連携する場合のメリットとデメリット

メリット

  • 複雑なワークフローを設定し、Lambda含めたオーケストレーションを組むことができる
  • エラー処理やリトライを設定することで自動化できる

デメリット

  • 設定と管理が複雑になる
  • 追加のサービスコストが発生する

どれを選べばいいの?

これまで3パターンを整理してきました。
それぞれメリット・デメリットがありますが選定の際には以下を参考に選定いただければと思います。

  • Step Functions:複雑なワークフローやエラー制御が必要な場合
  • SQS:シンプルなケースで耐障害性やリトライが必要な場合
  • 直接起動:シンプルなケースで早いレスポンスや処理結果の応答が必要な場合

過去、経験した案件ではAPI Gateway経由で処理結果を返送する必要があったため直接起動を選択しました。
エラーハンドリングやリトライの考慮が必要となりましたが、性能要件は満たすことができたので選択肢としては有効だったと思います。

また、それぞれのパターンにおいてLambdaのコールドスタートを考慮しておく必要があります。
コールドスタートと改善方法に関しては以下にまとめているので参考にどうぞ。

https://zenn.dev/penginpenguin/articles/fe42ba481ab5a1

まとめ

複数のLambdaを連携させるには、直接呼出し、SQS、Step Functionsをはじめ複数の選択肢があります。
今回例に挙げたものの他にもSNSを使用したりDynamoDBやS3のトリガーを使用するものなど様々な構成が考えられます。
それぞれの方法にはメリットとデメリットがあり、要件に応じて最適な方法を選択することが重要です。
この記事とは別にAWSの推奨パターンやそれぞれの要件に応じて効率的で信頼性の高いLambdaの連携を実現していく必要があります。

Discussion