OpenSearch パイプラインの Lambda Processor で別テーブルを結合して Nested 構造を作る
はじめに
AWS CDK で OpenSearch のインデックスに Nested 型フィールドを定義してフィルタやソートをできるようにするの記事では、Todo の中に assignees を Nested 型として持つ構造を扱いました。
しかし、実際の開発では「Todo テーブル」と「TodoAssignee テーブル」のように、データが複数のテーブルに正規化されているケースも多く存在します。
この場合、単純な OSIS パイプラインだけでは、DynamoDB の JOIN 操作ができないため、OpenSearch に入れる前にデータを結合する必要があります。
そのような場合に強力な武器となるのが「Lambda Processor」です。
本記事では、OSIS パイプラインから Lambda 関数を呼び出し、別テーブルのデータを結合して Nested 構造を作成してから OpenSearch にインデックスする方法と、その際のトラブルシューティングについて解説します。
ユースケース:別テーブルとの JOIN によるデータ変換
例えば、以下のような正規化されたテーブル構成を想定します。
-
Todoテーブル: タスクそのもの(id,content,createdAtなど) -
TodoAssigneeテーブル: 誰にいつ割り当てられたか(todoId,userId,userName,assignedAtなど)
しかし、DynamoDB 上では別テーブルになっているため、OSIS 単体では JOIN できません。
そこで、Lambda 関数を経由して TodoAssignee テーブルから該当するレコードを取得し、Todo に結合(Nested 構造化) してから OpenSearch に送ります。
アーキテクチャ
実装手順
1. Lambda 関数の実装
OSIS からは、複数のレコードがバッチとして Lambda に渡されます。Lambda は受け取ったレコードを加工し、同じ順序で結果を返す必要があります。
import { DynamoDBClient, GetItemCommand } from "@aws-sdk/client-dynamodb";
const ddbClient = new DynamoDBClient({});
export const handler = async (event: any) => {
const results = [];
// batch: { key_name: 'documents' } を指定しているため、
// 入力は event.documents に格納されています。
const records = event.documents;
for (const record of records) {
// REMOVEイベントは NewImage がないためスキップ、または delete アクションを設定する
if (record.eventName === 'REMOVE') {
results.push({
...record,
opensearch_action: 'delete',
primary_key: record.dynamodb.Keys.id.S,
document_version: Date.now()
});
continue;
}
const image = record.dynamodb.NewImage;
const todoId = image.id.S;
try {
// 3. TodoAssignee テーブルから、この Todo に紐づく割り当て情報を取得
const assignees = await getAssigneesForTodo(todoId);
// 4. Nested 構造に変換
const transformedRecord = {
...record,
// OpenSearchに入れたい形に整形
assignees: assignees, // Nested型フィールドとして追加
updatedAt: new Date().toISOString(),
// OSISのsink設定で使用するメタデータ
primary_key: todoId,
opensearch_action: 'index',
document_version: Date.now()
};
// 5. 結果の格納
results.push(transformedRecord);
} catch (error) {
// エラー時は status: 'Failed' を返すと、OSIS側でDLQに送るなどの制御が可能
console.error("Processing failed", error);
results.push({ ...record, status: 'Failed', errorMessage: error.message });
}
}
// batch設定に合わせて、戻り値もキーでラップして返す必要があります
return { documents: results };
};
// ヘルパー関数:TodoAssignee テーブルから割り当て情報を取得
const getAssigneesForTodo = async (todoId: string) => {
// 実装のヒント:
// Query または BatchGetItem で TodoAssignee テーブルから取得
// 複数の Todo を処理する場合、ループ内で1件ずつ Query すると遅くなるため、
// 事前に全 todoId をリストアップし、BatchGetItem で一括取得することを推奨します。
// 例:DynamoDB Query で todoId でフィルタ
// const result = await dynamoClient.query({
// TableName: 'TodoAssignee',
// KeyConditionExpression: 'todoId = :todoId',
// ExpressionAttributeValues: { ':todoId': todoId }
// });
// return result.Items.map(item => ({
// userId: item.userId,
// userName: item.userName,
// assignedAt: item.assignedAt,
// status: item.status
// }));
return [
{ userId: "user-A", userName: "田中太郎", assignedAt: "2024-01-01T10:00:00Z", status: "active" },
{ userId: "user-B", userName: "佐藤花子", assignedAt: "2024-02-01T15:00:00Z", status: "completed" }
];
};
2. パイプライン定義
提供された CDK コードに基づき、OSIS のパイプライン設定(JSON)は以下のようになります。
aws_lambda プロセッサの設定構造(aws ブロックのネストや batch 設定)に注意してください。
{
"version": "2",
"dynamodb-pipeline": {
"source": {
"dynamodb": {
"tables": [
{
"table_arn": "arn:aws:dynamodb:ap-northeast-1:123456789012:table/Todo",
"stream": {
"start_position": "LATEST"
}
}
],
"aws": {
"region": "ap-northeast-1",
"sts_role_arn": "arn:aws:iam::123456789012:role/os-pipeline-role"
}
}
},
"processor": [
{
"aws_lambda": {
"function_name": "arn:aws:lambda:ap-northeast-1:123456789012:function:TransformLambda",
"aws": {
"region": "ap-northeast-1",
"sts_role_arn": "arn:aws:iam::123456789012:role/os-pipeline-role"
},
"batch": {
"key_name": "documents"
}
}
}
],
"sink": [
{
"opensearch": {
"hosts": ["https://search-my-domain.ap-northeast-1.es.amazonaws.com"],
"index": "todo",
"index_type": "custom",
"document_id": "${/primary_key}",
"action": "${/opensearch_action}",
"document_version": "${/document_version}",
"document_version_type": "external",
"aws": {
"region": "ap-northeast-1",
"sts_role_arn": "arn:aws:iam::123456789012:role/os-pipeline-role"
}
}
}
]
}
}
特に batch: { key_name: 'documents' } を指定している点が重要です。これにより、Lambda への入出力が documents キーでラップされます。
また、sink ブロックの document_id や action に指定している ${/primary_key} などの記述は、Lambda が返した JSON 内のフィールドを参照しています。
これにより、OpenSearch 側でのドキュメント ID や操作タイプ(作成・更新・削除)を Lambda 側で動的に制御することが可能になります。
document_version_type: "external" は、Lambda 側で指定した document_version(タイムスタンプ等)を使ってバージョン管理を行うために必須の設定です。
[!WARNING]
注意:getMetadataは使えません
通常の DynamoDB パイプライン(Lambda なし)ではdocument_id: '${getMetadata("primary_key")}'と記述しますが、Lambda Processor を経由すると元の DynamoDB イベントのメタデータ構造が Lambda の戻り値(JSON)に置き換わります。
そのため、getMetadata関数は使用できなくなり、代わりに${/primary_key}のように JSON パスで直接フィールドを参照する必要があります。
3. IAM 権限の設定
OSIS のパイプラインロールに、Lambda 関数の実行権限 (lambda:InvokeFunction) を付与することを忘れないでください。
また、Lambda 側のリソースベースポリシーでも、OSIS サービスプリンシパル (osis-pipelines.amazonaws.com) からの呼び出しを許可する必要があります。これを見落とすと AccessDeniedException が発生します。
エラーが発生した場合は、OSIS のパイプラインログ(CloudWatch Logs)を確認し、User: ... is not authorized to perform: lambda:InvokeFunction といったメッセージが出ていないかチェックしてください。
トラブルシューティング
実装中に遭遇した主な問題とその解決策を共有します。
1. 削除イベント (REMOVE) の扱い
問題: DynamoDB でアイテムを削除すると、OSIS に REMOVE イベントが流れますが、Lambda で NewImage を参照しようとしてエラーになることがあります(削除時は OldImage しかない、またはキー情報のみ)。
解決策: Lambda 内で eventName === 'REMOVE' の場合は、データ加工をスキップしてそのまま通すか、削除に必要な ID 情報だけを残して返すように実装します。OpenSearch シンク側は document_id が一致すれば削除処理を行ってくれます。
2. タイムアウトとバッチサイズ
問題: Lambda の処理に時間がかかり、OSIS 側でタイムアウトが発生する。
解決策:
- Lambda のタイムアウト時間を延ばす。
- パイプライン定義の
batch_sizeを小さくする(デフォルトが大きい場合がある)。 -
concurrent_requestsを調整して並列度を上げる。
3. レスポンス形式の厳密さ
問題: Lambda が返す JSON の形式が不正で、パイプラインが停止する。
解決策: OSIS は Lambda からの戻り値を厳密にチェックします。
今回の構成(key_name: 'documents')の場合、Lambda は処理結果を { documents: [...] } という形式で返す必要があります。
単なる配列 [...] を返すと、OSIS はレスポンスを認識できずエラーとなります。
おわりに
Lambda Processor を利用することで、OSIS の可能性は無限に広がります。
ただし、Lambda の呼び出しコストやレイテンシが追加されるため、本当に必要な処理だけに絞って利用するのがベストプラクティスです。
使い倒せ、テクノロジー。(MAX OUT TECHNOLOGY)をミッションに掲げる、株式会社リバネスナレッジのチャレンジを共有するブログです。Buld in Publichの精神でオープンに綴ります。 Qiita:qiita.com/organizations/leaveanest
Discussion