🌟

Elasticsearch/OpenSearch クラスタ間のデータ移行ガイド

に公開

Amazon Elasticsearch Service から別の OpenSearch クラスタへデータを移行する方法を解説します。本記事では、Scroll API と Bulk API を使用したシンプルかつ確実な移行手法を紹介します。

背景

クラウドサービスの移行やコスト最適化のため、Elasticsearch/OpenSearch クラスタ間でデータを移行する必要が生じることがあります。今回は以下の環境間での移行を行いました。

  • 移行元: Amazon Elasticsearch Service (AWS)
  • 移行先: セルフホスト OpenSearch

移行の流れ

  1. 移行元・移行先のインデックス確認
  2. マッピング情報の取得と調整
  3. 移行先にインデックスを作成
  4. Scroll API + Bulk API でデータ移行
  5. 移行結果の確認

事前準備:インデックスの確認

まず、移行元と移行先のインデックス一覧を確認します。

# 移行元のインデックス一覧
curl -u "user:password" "https://source-cluster/_cat/indices?v&s=index"

# 移行先のインデックス一覧
curl -u "user:password" "https://dest-cluster/_cat/indices?v&s=index"

Step 1: マッピング情報の取得

移行元からマッピング情報を取得します。

curl -s -u "user:password" \
  "https://source-cluster/index_name/_mapping" \
  > mappings.json

Step 2: マッピングの調整

移行先の環境によっては、カスタムアナライザーが利用できない場合があります。例えば、日本語形態素解析用の kuromoji プラグインがインストールされていない場合、アナライザー設定を除去する必要があります。

def remove_analyzer(obj):
    """マッピングからanalyzer設定を再帰的に削除"""
    if isinstance(obj, dict):
        if 'analyzer' in obj:
            del obj['analyzer']
        for key, value in obj.items():
            remove_analyzer(value)
    elif isinstance(obj, list):
        for item in obj:
            remove_analyzer(item)
    return obj

Step 3: 移行先にインデックスを作成

調整したマッピングを使用してインデックスを作成します。

import json
import requests
from requests.auth import HTTPBasicAuth

def create_index(source_index, dest_index, dest_url, dest_auth):
    # マッピングファイルを読み込み
    with open(f'{source_index}_mappings.json') as f:
        mappings_data = json.load(f)

    mappings = mappings_data[source_index]['mappings']
    clean_mappings = remove_analyzer(mappings)

    index_body = {
        'settings': {
            'number_of_shards': 1,
            'number_of_replicas': 1
        },
        'mappings': clean_mappings
    }

    response = requests.put(
        f"{dest_url}/{dest_index}",
        json=index_body,
        auth=dest_auth
    )
    print(f"Create {dest_index}: Status {response.status_code}")
    return response.status_code in [200, 201]

Step 4: データ移行スクリプト

Scroll API を使用して移行元からデータを取得し、Bulk API で移行先に投入します。

import json
import requests
from requests.auth import HTTPBasicAuth
import time

SOURCE_URL = 'https://source-cluster'
SOURCE_AUTH = HTTPBasicAuth('user', 'password')

DEST_URL = 'https://dest-cluster'
DEST_AUTH = HTTPBasicAuth('user', 'password')

def migrate_index(source_index, dest_index, batch_size=1000):
    print(f"=== Migrating {source_index} -> {dest_index} ===")

    # Scroll APIの初期化
    scroll_url = f"{SOURCE_URL}/{source_index}/_search?scroll=5m"
    query = {
        "size": batch_size,
        "query": {"match_all": {}}
    }

    response = requests.post(scroll_url, json=query, auth=SOURCE_AUTH)
    if response.status_code != 200:
        print(f"Error: {response.text}")
        return False

    data = response.json()
    scroll_id = data['_scroll_id']
    hits = data['hits']['hits']
    total = data['hits']['total']['value']

    print(f"Total documents: {total}")

    migrated = 0
    errors = 0
    start_time = time.time()

    while hits:
        # Bulk リクエストの準備
        bulk_body = ""
        for hit in hits:
            action = {"index": {"_index": dest_index, "_id": hit['_id']}}
            bulk_body += json.dumps(action) + "\n"
            bulk_body += json.dumps(hit['_source']) + "\n"

        # Bulk リクエストの送信
        headers = {"Content-Type": "application/x-ndjson"}
        bulk_response = requests.post(
            f"{DEST_URL}/_bulk",
            data=bulk_body,
            headers=headers,
            auth=DEST_AUTH
        )

        if bulk_response.status_code == 200:
            result = bulk_response.json()
            if result.get('errors'):
                for item in result['items']:
                    if 'error' in item.get('index', {}):
                        errors += 1
            migrated += len(hits)
        else:
            print(f"Bulk error: {bulk_response.text[:200]}")
            errors += len(hits)

        # 進捗表示
        elapsed = time.time() - start_time
        rate = migrated / elapsed if elapsed > 0 else 0
        eta = (total - migrated) / rate if rate > 0 else 0
        print(f"\rProgress: {migrated}/{total} ({migrated*100//total}%) "
              f"- {rate:.0f} docs/sec - ETA: {eta:.0f}s", end='', flush=True)

        # 次のバッチを取得
        scroll_response = requests.post(
            f"{SOURCE_URL}/_search/scroll",
            json={"scroll": "5m", "scroll_id": scroll_id},
            auth=SOURCE_AUTH
        )

        if scroll_response.status_code != 200:
            break

        data = scroll_response.json()
        scroll_id = data['_scroll_id']
        hits = data['hits']['hits']

    # Scrollコンテキストの解放
    requests.delete(
        f"{SOURCE_URL}/_search/scroll",
        json={"scroll_id": scroll_id},
        auth=SOURCE_AUTH
    )

    print(f"\n\nCompleted: {migrated} documents, {errors} errors")
    return True

Step 5: 移行の実行と確認

# 移行の実行
migrate_index('items1', 'cj-items1')
migrate_index('items2', 'cj-items2')
migrate_index('images', 'cj-images')

# 移行結果の確認
curl -u "user:password" "https://dest-cluster/_cat/indices?v&s=index"

注意点とベストプラクティス

1. カスタムアナライザーの対応

移行先に kuromoji などのプラグインがない場合、以下の選択肢があります。

  • アナライザーなしで移行: 日本語検索の精度は下がるが、データは保持される
  • プラグインをインストール: 移行先に必要なプラグインを事前にインストール
  • 代替アナライザーを使用: standard アナライザーなどで代用

2. バッチサイズの調整

# 小さいドキュメント: バッチサイズを大きく
migrate_index('small_docs', 'dest_small', batch_size=5000)

# 大きいドキュメント: バッチサイズを小さく
migrate_index('large_docs', 'dest_large', batch_size=500)

3. 並列実行

複数のインデックスを並列で移行することで、全体の移行時間を短縮できます。

# バックグラウンドで並列実行
python migrate.py items1 cj-items1 &
python migrate.py items2 cj-items2 &
python migrate.py images cj-images &
wait

4. エラーハンドリング

Bulk API のレスポンスを確認し、エラーが発生したドキュメントを記録・リトライする仕組みを実装することを推奨します。

if result.get('errors'):
    for item in result['items']:
        if 'error' in item.get('index', {}):
            error_doc_id = item['index']['_id']
            error_reason = item['index']['error']['reason']
            # エラーログに記録
            logging.error(f"Failed: {error_doc_id} - {error_reason}")

5. ドキュメント数の検証

移行後、ソースと移行先のドキュメント数を比較して、データの整合性を確認します。

# ソースのドキュメント数
curl -s -u "user:password" "https://source/_count?q=*"

# 移行先のドキュメント数
curl -s -u "user:password" "https://dest/_count?q=*"

パフォーマンスの目安

移行速度は環境やドキュメントサイズによって異なりますが、一般的に以下の程度が期待できます。

  • 小〜中規模ドキュメント: 500〜1,000 docs/sec
  • 大規模ドキュメント: 100〜500 docs/sec
  • ネットワーク帯域: クラスタ間の距離や帯域幅が速度に大きく影響

まとめ

Scroll API と Bulk API を組み合わせることで、大規模なデータでも安定して移行できます。カスタムアナライザーの有無など、環境の差異に注意しながら移行を進めることが重要です。

参考リンク

Discussion