🗒️

OpenSearch パイプラインの数だけ積み重なる請求額に絶望して抗った話

に公開

はじめに

OpenSearch Ingestion (OSIS) はサーバーレスでデータの取り込み・加工ができる非常に便利なサービスです。
しかし、小規模なプロジェクトで導入しようとした際、「コスト」 という大きな壁にぶつかりました。

特に、Amplify Gen2 などで複数の DynamoDB テーブル(例: Todo, Order, User...)を OpenSearch に同期したい場合、「すべてのテーブルを1つのパイプラインで処理してコストを抑えたい」 と考えるのが自然です。

しかし結論から言うと、OSIS の仕様上の制約により、テーブルごとにパイプラインを作成せざるを得ず、その分だけコスト(OCU)が積み上がる という結果になりました。

本記事は、コスト削減のためにアーキテクチャを工夫し抗ったものの、最終的にはコスト増を受け入れるしかなかった敗北の記録です。
なぜパイプラインの一本化が難しかったのか、そして最終的にどのような構成(妥協案)で運用することになったのかを共有します。

前提として、Amplify Gen2 と OpenSearch の基本的な連携設定については、以下の記事で解説している構成をベースとします。

課題:OSIS パイプラインの制約

理想と現実

当初、以下のように1つのパイプラインで複数の DynamoDB テーブルをソースとして定義し、1つの OpenSearch ドメインに流し込む構成を想定していました。

理想的なアーキテクチャ(実現不可能):

試行錯誤:Route機能での分岐はできないのか?

OSIS には routes という条件分岐機能があります。「これを使ってテーブルごとに処理を分ければいいのでは?」と考えましたが、結論から言うと 不可能 でした。

// ❌ これはできない(sourceブロックには1つのプラグインしか定義できない)
{
  "source": {
    "dynamodb": {
      "tables": [
        { "table_arn": "arn:aws:dynamodb:..." },  // Todo Table
        { "table_arn": "arn:aws:dynamodb:..." }   // Order Table
      ]
    }
  }
}

AWS公式ドキュメントにも明確に記載されています。

A pipeline can have only one source. You can't configure multiple sources for a single pipeline.
出典: OpenSearch Ingestion - Sources

routes はあくまで 「単一のソースから入ってきたデータを、条件に応じて複数のシンクに振り分ける」 機能であり、「複数の異なるソース(テーブル)を束ねる」 ための機能ではありませんでした。

全く異なるスキーマを持つテーブルを無理やり tables リストに並べたとしても、流れてくるストリームレコードの構造が混在するため、後続のプロセッサでパースエラーやマッピング不能に陥ります。

現実解:コスト増を許容したマルチパイプライン構成

コストを抑えるためには「1パイプライン」が理想でしたが、技術的な制約により断念しました。
最終的に採用したのは、「テーブルごとに専用のパイプラインを作成する」という、コスト面では妥協したアーキテクチャです。

採用せざるを得なかった構成:

この構成にすることで、以下のような副次的なメリットは得られましたが、コスト増(OCUの増加)は避けられませんでした。

  1. 責務の分離: 各パイプラインが特定のテーブル(TodoやOrderなど)のスキーマ変換に専念でき、それぞれ独立したインデックスとして管理できる
  2. 障害隔離: 片方のパイプラインが停止しても、もう片方は稼働し続ける

実装詳細

1. インデックス設計

まず、OpenSearch 側で受け入れるインデックスを設計します。今回は2つの異なるデータを扱うため、インデックスも分けます。

  • todo-index: Todoアイテム
  • order-index: 注文履歴情報
// インデックスマッピングの定義例
const todoMapping = {
  settings: {
    analysis: { /* 日本語アナライザー設定など */ }
  },
  mappings: {
    properties: {
      pk: { type: "keyword" },
      content: { type: "text", analyzer: "ja_analyzer" },
      status: { type: "keyword" },
      // ...
    }
  }
};

2. CDK によるパイプライン生成のファクトリー化

テーブルごとにパイプライン定義を書くのは冗長なので、パイプラインを作成する関数(ファクトリー)を実装しました。

interface PipelineProps {
  tableName: string;
  tableArn: string;
  indexName: string;
  mapping: any; // OpenSearch Mapping
}

const createPipeline = (
  scope: Construct,
  id: string,
  props: PipelineProps,
  commonProps: {
    roleArn: string;
    domainEndpoint: string;
    region: string;
  }
) => {
  // パイプライン設定をオブジェクトで定義
  const pipelineConfiguration = {
    version: '2',
    'dynamodb-pipeline': {
      source: {
        dynamodb: {
          tables: [
            {
              table_arn: props.tableArn,
              stream: { start_position: 'LATEST' }
            }
          ],
          aws: {
            sts_role_arn: commonProps.roleArn,
            region: commonProps.region
          }
        }
      },
      sink: [
        {
          opensearch: {
            hosts: [`https://${commonProps.domainEndpoint}`],
            index: props.indexName,
            template_content: JSON.stringify(props.mapping),
            aws: {
              sts_role_arn: commonProps.roleArn,
              region: commonProps.region
            }
          }
        }
      ]
    }
  };

  // パイプラインリソース作成
  return new osis.CfnPipeline(scope, id, {
    pipelineName: `pipeline-${props.tableName}`,
    pipelineConfigurationBody: JSON.stringify(pipelineConfiguration),
    minUnits: 1,
    maxUnits: 4,
    // ...ログ設定など
  });
};

// 💡 ヒント: 単純なマッピングだけでなく、データの結合や複雑な加工が必要な場合は、
// Lambda Processor を組み合わせることも可能です。
// 詳しくは「[OpenSearch パイプラインの Lambda Processor で別テーブルを結合して Nested 構造を作る](/lnest_knowledge/articles/dd2120a2fea4c6)」を参照してください。

3. 呼び出し側の実装

メインのスタック定義で、このファクトリー関数を呼び出して複数のパイプラインを作成します。

// Todo用パイプライン
createPipeline(stack, 'TodoPipeline', {
  tableName: 'Todo',
  tableArn: todoTable.tableArn,
  indexName: 'todo',
  mapping: todoMapping
}, commonProps);

// Order用パイプライン
createPipeline(stack, 'OrderPipeline', {
  tableName: 'Order',
  tableArn: orderTable.tableArn,
  indexName: 'order',
  mapping: orderMapping
}, commonProps);

注意点とハマりどころ

1. IAM ロールの権限

パイプラインで使用する IAM ロールには、すべてのソーステーブルと、OpenSearch ドメインへのアクセス権限が必要です。
特に dynamodb:DescribeTable, dynamodb:GetRecords などの権限が、対象となる全てのテーブル ARN に対して許可されているか確認してください。

// ポリシー例
new iam.PolicyStatement({
  actions: ["dynamodb:DescribeTable", "dynamodb:GetRecords", ...],
  resources: [
    todoTable.tableArn,
    orderTable.tableArn, // 忘れずに追加
    // ストリームのARNも必要
    `${todoTable.tableArn}/stream/*`,
    `${orderTable.tableArn}/stream/*`,
  ]
})

コスト:OCUの積み上げによる負担

本構成の最大のデメリットはコストです。
OSIS はプロビジョニングされた OCU (OpenSearch Compute Unit) に応じて課金されますが、パイプラインごとに最低 1 OCU が必要になります。

理論値(最低構成)

  • 1 OCU ≈ $0.326/時間(東京リージョン)
  • 月額 ≈ $235(約3.5万円)

もしパイプラインを2つ作成すると、単純計算で 月額 7万円以上 かかります。
これに加えて、OpenSearch ドメイン自体のインスタンス料金(例: m5.large.search × 1 = 月額約$130)も発生します。

実測値:1日41ドルの衝撃

しかし、実際の請求額を見て戦慄しました。
1日あたり 約41ドル(月額換算 1,200ドル以上) のコストが発生していたのです。

内訳を確認すると、OpenSearch Service 全体で約41ドル/日でしたが、その大半を Ingestion OCU が占めていました。

  • OSIS (Ingestion OCU): 約 31.3 ドル / 日
    • 4 OCU × 0.326 USD × 24h
  • OpenSearch Domain: 約 9.7 ドル / 日
    • m5.large.search 等
4 \text{ OCU} \times 0.326 \text{ USD} \times 24\text{h} \approx 31.3 \text{ USD} / \text{day}

原因を調査したところ、本番環境向けの構成として minUnits: 2 を設定していたことが大きく影響していました。

これは「設定ミス」ではありません。OSISパイプラインは単一障害点を避けるため、本番環境では複数OCUでの冗長構成が推奨されます。つまり、正しく設計した結果としてコストが跳ね上がるのです。

  • パイプライン数: 2本
  • 各パイプラインの最小ユニット数: 2 OCU(冗長化)
  • 合計: 4 OCU (常時稼働)

冗長性確保のために設定した minUnits: 2 ですが、データ量が少ない初期フェーズにおいて、マルチパイプライン構成と組み合わせることは「財布への大打撃」を意味していました。

「待機時は最低料金で済むだろう」という甘い見積もりと、安易な冗長化設定は、現実の請求書によって打ち砕かれました。検証フェーズや小規模プロジェクトにとっては、致命的な出費となりかねません。

コスト抑制の工夫

せめて本番以外の環境ではコストを抑えようと、涙ぐましい運用を行っています。
CDK では minUnits: 1 が最小値であり、設定で 0 にして停止させておくことはできません。

そのため、以下のような泥臭い対応を強制されています。

  1. 夜間・休日の停止: LambdaなどでOSISパイプラインの停止APIを叩くスケジューラーを自作
  2. スタックごとの削除: 検証が終わったらパイプラインを含むスタックごと削除する

しかし、OSISのパイプライン作成・開始には数分〜数十分の時間がかかります。
コスト削減の代償として、「環境が立ち上がるまで開発を始められない」「ちょっとした確認のために長い待ち時間が発生する」 という、開発者体験(DX)の著しい低下を招くことになりました。

技術的なエレガントさは皆無ですが、背に腹は代えられません。これが現状の「あがき」の限界です。

おわりに:OSISを採用すべきか?

「OpenSearch Ingestion を使えばサーバーレスで簡単にデータ連携できる」と思って飛びつきましたが、複数テーブルを扱う際のコスト設計には注意が必要でした。

技術的には「テーブルごとにパイプラインを分ける」以外に現実的な選択肢はありませんが、その代償としてインフラコストが増加します。
もし、このコスト(パイプライン数 × 推奨ユニット数)が予算に見合わないのであれば、OSIS の採用自体を見直すべきかもしれません。

代替案

小規模なプロジェクトにおいては、無理に OSIS を使わず、DynamoDB Streams + Lambda でインデックス処理を自前実装する方が、結果としてコストと運用のバランスが良い場合が多いです。

OSIS vs Lambda コスト比較の目安:

  • OSIS: データ流量に関わらず固定費がかかる(待機時も課金)。大規模トラフィック向き。
  • Lambda: データ更新回数に応じた従量課金。データが少なければほぼ無料。

データ流量が少ないフェーズでは、Lambda の方が圧倒的に安価に済みます。

本番環境での採用検討時には、データ量や更新頻度だけでなく、「テーブル数 × パイプライン単価」 が予算内に収まるかを最初に見積もることを強く推奨します。

リバナレテックブログ

Discussion