Amplify Gen2 × Amazon OpenSearch Service 実践解説 — OSISでDynamoDB連携をCDK構築
はじめに
Amplify Gen2 で構築した Web アプリで OpenSearch を利用する機会があり、CDK と OpenSearch の設定について理解を深めました。本記事では、その知見を整理し、公式ガイドのコードをもとに実装の意図や実運用での注意点を解説します。
出典: Amplify Docs「Connect to Amazon OpenSearch for search and aggregate queries」
https://docs.amplify.aws/react/build-a-backend/data/custom-business-logic/search-and-aggregate-queries/
この記事は、公式ガイドに掲載されたコードを「引用」し、各ブロックの役割と実運用の注意点を具体的に解説します。
AWS CDK
AWS Cloud Development Kit (AWS CDK) は、インフラをコードで定義し、CloudFormation を通じてデプロイするためのフレームワークです。Amplify Gen2 は内部的に CDK を利用するため、必要に応じて CDK の拡張でマネージドサービス(今回の OpenSearch など)を組み合わせます。つまり、Amplify Gen2 では CDK を使って OpenSearch を自前で構築する形になります。
AWS CloudFormation
CDK で定義した構成は最終的に CloudFormation のテンプレートとして展開され、スタックとしてプロビジョニングされます。依存関係の解決やロールバックは CloudFormation の責務です。
OpenSearch 構築のCDKコード(公式抜粋)
import * as dynamodb from "aws-cdk-lib/aws-dynamodb";
import * as opensearch from "aws-cdk-lib/aws-opensearchservice";
import * as iam from "aws-cdk-lib/aws-iam";
import * as osis from "aws-cdk-lib/aws-osis";
import * as logs from "aws-cdk-lib/aws-logs";
import { RemovalPolicy } from "aws-cdk-lib";
import { defineBackend } from "@aws-amplify/backend";
import { auth } from "./auth/resource";
import { data } from "./data/resource";
import { storage } from "./storage/resource";
// Define backend resources
const backend = defineBackend({
auth,
data,
storage,
});
const todoTable =
backend.data.resources.cfnResources.amplifyDynamoDbTables["Todo"];
// Update table settings
todoTable.pointInTimeRecoveryEnabled = true;
todoTable.streamSpecification = {
streamViewType: dynamodb.StreamViewType.NEW_IMAGE,
};
// Get the DynamoDB table ARN
const tableArn = backend.data.resources.tables["Todo"].tableArn;
// Get the DynamoDB table name
const tableName = backend.data.resources.tables["Todo"].tableName;
// Create the OpenSearch domain
const openSearchDomain = new opensearch.Domain(
backend.data.stack,
"OpenSearchDomain",
{
version: opensearch.EngineVersion.OPENSEARCH_2_11,
capacity: {
// upgrade instance types for production use
masterNodeInstanceType: "t3.small.search",
masterNodes: 0,
dataNodeInstanceType: "t3.small.search",
dataNodes: 1,
},
nodeToNodeEncryption: true,
// set removalPolicy to DESTROY to make sure the OpenSearch domain is deleted on stack deletion.
removalPolicy: RemovalPolicy.DESTROY,
encryptionAtRest: {
enabled: true,
},
}
);
// Get the S3Bucket ARN
const s3BucketArn = backend.storage.resources.bucket.bucketArn;
// Get the S3Bucket Name
const s3BucketName = backend.storage.resources.bucket.bucketName;
// Create an IAM role for OpenSearch integration Service
const openSearchIntegrationPipelineRole = new iam.Role(
backend.data.stack,
"OpenSearchIntegrationPipelineRole",
{
assumedBy: new iam.ServicePrincipal("osis-pipelines.amazonaws.com"),
inlinePolicies: {
openSearchPipelinePolicy: new iam.PolicyDocument({
statements: [
new iam.PolicyStatement({
actions: ["es:DescribeDomain"],
resources: [
openSearchDomain.domainArn,
openSearchDomain.domainArn + "/*",
],
effect: iam.Effect.ALLOW,
}),
new iam.PolicyStatement({
actions: ["es:ESHttp*"],
resources: [
openSearchDomain.domainArn,
openSearchDomain.domainArn + "/*",
],
effect: iam.Effect.ALLOW,
}),
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: [
"s3:GetObject",
"s3:AbortMultipartUpload",
"s3:PutObject",
"s3:PutObjectAcl",
],
resources: [s3BucketArn, s3BucketArn + "/*"],
}),
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: [
"dynamodb:DescribeTable",
"dynamodb:DescribeContinuousBackups",
"dynamodb:ExportTableToPointInTime",
"dynamodb:DescribeExport",
"dynamodb:DescribeStream",
"dynamodb:GetRecords",
"dynamodb:GetShardIterator",
],
resources: [tableArn, tableArn + "/*"],
}),
],
}),
},
managedPolicies: [
iam.ManagedPolicy.fromAwsManagedPolicyName(
"AmazonOpenSearchIngestionFullAccess"
),
],
}
);
// Define OpenSearch index mappings
const indexName = "todo";
const indexMapping = {
settings: {
number_of_shards: 1,
number_of_replicas: 0,
},
mappings: {
properties: {
id: {
type: "keyword",
},
isDone: {
type: "boolean",
},
content: {
type: "text",
},
priority: {
type: "text",
},
},
},
};
// OpenSearch template definition
const openSearchTemplate = `
version: "2"
dynamodb-pipeline:
source:
dynamodb:
acknowledgments: true
tables:
- table_arn: "${tableArn}"
stream:
start_position: "LATEST"
export:
s3_bucket: "${s3BucketName}"
s3_region: "${backend.storage.stack.region}"
s3_prefix: "${tableName}/"
aws:
sts_role_arn: "${openSearchIntegrationPipelineRole.roleArn}"
region: "${backend.data.stack.region}"
sink:
- opensearch:
hosts:
- "https://${openSearchDomain.domainEndpoint}"
index: "${indexName}"
index_type: "custom"
template_content: |
${JSON.stringify(indexMapping)}
document_id: '\${getMetadata("primary_key")}'
action: '\${getMetadata("opensearch_action")}'
document_version: '\${getMetadata("document_version")}'
document_version_type: "external"
bulk_size: 4
aws:
sts_role_arn: "${openSearchIntegrationPipelineRole.roleArn}"
region: "${backend.data.stack.region}"
`;
// Create a CloudWatch log group
const logGroup = new logs.LogGroup(backend.data.stack, "LogGroup", {
logGroupName: "/aws/vendedlogs/OpenSearchService/pipelines/1",
removalPolicy: RemovalPolicy.DESTROY,
});
// Create an OpenSearch Integration Service pipeline
const cfnPipeline = new osis.CfnPipeline(
backend.data.stack,
"OpenSearchIntegrationPipeline",
{
maxUnits: 4,
minUnits: 1,
pipelineConfigurationBody: openSearchTemplate,
pipelineName: "dynamodb-integration-2",
logPublishingOptions: {
isLoggingEnabled: true,
cloudWatchLogDestination: {
logGroup: logGroup.logGroupName,
},
},
}
);
コード解説
1) DynamoDB 設定(Streams/PITR)
todoTable.pointInTimeRecoveryEnabled = true;
todoTable.streamSpecification = {
streamViewType: dynamodb.StreamViewType.NEW_IMAGE,
};
-
目的: OpenSearch Ingestion(OSIS)が DynamoDB の変更イベントを取り込むために Streams を有効化します。
NEW_IMAGEは更新後の最新レコードを流すモード、PITR(Point In Time Recovery)はエクスポート/再取り込みの保険です。 -
補足: Streams は一般にはデフォルト無効ですが、Amplify Gen2 が生成するテーブルは既定で
NEW_AND_OLD_IMAGESの場合があります。OSIS はNEW_IMAGE/NEW_AND_OLD_IMAGESのどちらでも動作します。変更前の値が不要ならNEW_IMAGEを選ぶとイベントが軽量になります。 -
補足: PITR は任意ですが、ドメインを作り直す際の全量 Export に便利です。初期全件同期が必要な場合は
start_position: TRIM_HORIZONや Export を併用します。
2) OpenSearch ドメイン作成
const openSearchDomain = new opensearch.Domain(
backend.data.stack,
"OpenSearchDomain",
{
version: opensearch.EngineVersion.OPENSEARCH_2_11,
capacity: {
masterNodeInstanceType: "t3.small.search",
masterNodes: 0,
dataNodeInstanceType: "t3.small.search",
dataNodes: 1,
},
nodeToNodeEncryption: true,
removalPolicy: RemovalPolicy.DESTROY,
encryptionAtRest: {
enabled: true,
},
}
);
- 目的: OpenSearch のドメイン本体を作成します。バージョンやインスタンス種別など、性能に関わる設定をここで行います。
-
補足: CDK の各コンストラクトは
(scope, id, props)の形で生成します。OpenSearch は DynamoDB のデータと連携するため、基本はdataスタックに配置すると分かりやすいです。removalPolicyは CloudFormation のスタック削除時にリソースを削除するか(DESTROY)保持するか(RETAIN)を制御する設定です。学習用途ならDESTROY、本番ではRETAINを検討します。ドメイン作成は 20〜30 分程度かかる点に注意してください。
3) OSIS 用 IAM ロール作成
const openSearchIntegrationPipelineRole = new iam.Role(
backend.data.stack,
"OpenSearchIntegrationPipelineRole",
{
assumedBy: new iam.ServicePrincipal("osis-pipelines.amazonaws.com"),
inlinePolicies: {
openSearchPipelinePolicy: new iam.PolicyDocument({
statements: [
new iam.PolicyStatement({
actions: ["es:DescribeDomain"],
resources: [
openSearchDomain.domainArn,
openSearchDomain.domainArn + "/*",
],
effect: iam.Effect.ALLOW,
}),
new iam.PolicyStatement({
actions: ["es:ESHttp*"],
resources: [
openSearchDomain.domainArn,
openSearchDomain.domainArn + "/*",
],
effect: iam.Effect.ALLOW,
}),
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: [
"s3:GetObject",
"s3:AbortMultipartUpload",
"s3:PutObject",
"s3:PutObjectAcl",
],
resources: [s3BucketArn, s3BucketArn + "/*"],
}),
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: [
"dynamodb:DescribeTable",
"dynamodb:DescribeContinuousBackups",
"dynamodb:ExportTableToPointInTime",
"dynamodb:DescribeExport",
"dynamodb:DescribeStream",
"dynamodb:GetRecords",
"dynamodb:GetShardIterator",
],
resources: [tableArn, tableArn + "/*"],
}),
],
}),
},
managedPolicies: [
iam.ManagedPolicy.fromAwsManagedPolicyName(
"AmazonOpenSearchIngestionFullAccess"
),
],
}
);
-
目的: OSIS(OpenSearch Ingestion Service)が OpenSearch(
ESHttp*など)、DynamoDB Streams、S3 Export にアクセスするための実行ロールを作成します。 -
補足: マネージドポリシー
AmazonOpenSearchIngestionFullAccessに加え、対象ドメイン/テーブル/バケットに対するリソースレベルの許可を inline policy で付与しています。OpenSearch への書き込み、Streams の購読、Export の実行に必要な最小権限をまとめています。
4) OSIS 用インデックスマッピング定義
const indexName = "todo";
const indexMapping = {
settings: {
number_of_shards: 1,
number_of_replicas: 0,
},
mappings: {
properties: {
id: {
type: "keyword",
},
isDone: {
type: "boolean",
},
content: {
type: "text",
},
priority: {
type: "text",
},
},
},
};
- 目的: OpenSearch ドメインに作成するインデックス構造を定義します。
-
補足: 日本語最適化は Kuromoji や N-gram などのアナライザ追加で対応します。
settings.analysisにアナライザを定義して拡張します。
5) OSIS 用テンプレート定義
const openSearchTemplate = `
version: "2"
dynamodb-pipeline:
source:
dynamodb:
acknowledgments: true
tables:
- table_arn: "${tableArn}"
stream:
start_position: "LATEST"
export:
s3_bucket: "${s3BucketName}"
s3_region: "${backend.storage.stack.region}"
s3_prefix: "${tableName}/"
aws:
sts_role_arn: "${openSearchIntegrationPipelineRole.roleArn}"
region: "${backend.data.stack.region}"
sink:
- opensearch:
hosts:
- "https://${openSearchDomain.domainEndpoint}"
index: "${indexName}"
index_type: "custom"
template_content: |
${JSON.stringify(indexMapping)}
document_id: '\${getMetadata("primary_key")}'
action: '\${getMetadata("opensearch_action")}'
document_version: '\${getMetadata("document_version")}'
document_version_type: "external"
bulk_size: 4
aws:
sts_role_arn: "${openSearchIntegrationPipelineRole.roleArn}"
region: "${backend.data.stack.region}"
`;
- 目的: DynamoDB の変更イベントを取得し、マッピング定義に従って OpenSearch のインデックスへ反映するための OSIS パイプライン定義です。
-
補足: ドキュメント ID でインデックスの一意性を担保しています。初期同期が必要な場合は
start_position: TRIM_HORIZONまたは Export(全量)を併用します。 -
補足: パイプライン本体の定義は YAML だけでなく JSON でも指定可能です(
pipelineConfigurationBodyに JSON 文字列を渡す実装も動作確認済み)。本記事では公式ガイドの抜粋に合わせて YAML 例を掲載し、YAML を JSON オブジェクトに書き換える具体的な方法は補足記事「AWS CDK で Amazon OpenSearch Service のパイプラインは YAML の代わりに JSON で定義できる!」で詳しく解説しています。
6) CloudWatch ロググループ作成
const logGroup = new logs.LogGroup(backend.data.stack, "LogGroup", {
logGroupName: "/aws/vendedlogs/OpenSearchService/pipelines/1",
removalPolicy: RemovalPolicy.DESTROY,
});
- 目的: OSIS の実行/失敗ログを観測するためのロググループを作成します。
-
補足:
removalPolicyは CloudFormation 削除時にロググループを削除するかを制御します。固定名にすると再作成時に衝突する可能性があるため、環境に応じて命名規則を検討してください。
7) OSIS パイプライン作成
const cfnPipeline = new osis.CfnPipeline(
backend.data.stack,
"OpenSearchIntegrationPipeline",
{
maxUnits: 4,
minUnits: 1,
pipelineConfigurationBody: openSearchTemplate,
pipelineName: "dynamodb-integration-2",
logPublishingOptions: {
isLoggingEnabled: true,
cloudWatchLogDestination: {
logGroup: logGroup.logGroupName,
},
},
}
);
- 目的: 上記テンプレートに従い、DynamoDB → OpenSearch の取り込みを行う OSIS パイプラインを作成します。
-
補足:
minUnitsとmaxUnitsは負荷に応じて調整できるようパラメータ化しておくと運用が楽です。
まとめ
- 前提: DynamoDB テーブルで Streams(必要に応じて PITR)を有効化
- OpenSearch ドメインを作成 → 性能・暗号化・削除ポリシーを設定
- OSIS 用 IAM ロールを作成 → OpenSearch/DynamoDB/S3 への最小権限付与
- インデックスのマッピングを定義 → 検索要件に合わせて型・アナライザを設計
- OSIS のパイプライン(テンプレート)を定義 → DDB 変更を OpenSearch に反映
- CloudWatch ロググループを用意 → 実行/失敗ログを観測
- OSIS パイプラインを作成 → Export(初期全量)+ Streams(増分)で同期
この手順で、ドメイン作成からパイプライン作成までを一気通貫で構築できます。
トラブルシュート
- AccessDenied: OSIS ロールの権限(OpenSearch/DynamoDB/S3)が対象 ARN・リージョンに合っているか。
- インデックス不整合: 既存マッピングに非互換な変更は不可。必要に応じてインデックス再作成を検討。
-
データ未流入: Streams の有効化漏れ、
start_position: LATESTによる履歴未取り込み、パイプラインエラーを CloudWatch で確認。 -
検索結果が空: OpenSearch API へのリクエストが
GETのままでクエリ条件が渡っていない可能性。POSTで body を付与して実行。
運用・コスト・スケール
- コスト: Managed OpenSearch のノードと OSIS の OCU が主なコスト要因です。検証は最小構成で開始し、実負荷で調整します。
- スケール: OSIS の OCU、OpenSearch のノード数/インスタンスタイプ/シャード数で対応します。
-
削除保護: 学習用の
DESTROYは、本番ではRETAINを推奨します。 - アクセス制御/VPC: 本番は VPC 配置やドメインアクセスポリシーの明確化を検討します。OSIS から到達可能なネットワーク(VPC/エンドポイント)を確保してください。
-
本番構成の注意: 専用マスタ/マルチ AZ/自動スナップショットの有効化を検討します。
masterNodes: 0は学習向けの最小構成です。 - 観測とアラート: OSIS の失敗件数・スループット、OpenSearch の CPU/JVMMem/ディスク/検索遅延などに CloudWatch アラームを設定します。
おわりに
Amplify Gen2 と OpenSearch を組み合わせることで、アプリの検索機能を柔軟に拡張できます。一方で、CDK/CloudFormation を跨いだ設定や待ち時間、権限設計など、実運用での勘所も多い領域です。本記事が、公式ガイドのコードを読み解き、手元の環境に落とし込む際の参考になれば幸いです。より詳しい設定(日本語向けアナライザやテンプレートの拡張など)は、別途掘り下げて紹介します。
参考(出典)
- Amplify Docs: Connect to Amazon OpenSearch for search and aggregate queries
https://docs.amplify.aws/react/build-a-backend/data/custom-business-logic/search-and-aggregate-queries/
使い倒せ、テクノロジー。(MAX OUT TECHNOLOGY)をミッションに掲げる、株式会社リバネスナレッジのチャレンジを共有するブログです。Buld in Publichの精神でオープンに綴ります。 Qiita:qiita.com/organizations/leaveanest
Discussion