Elasticsearch/OpenSearch クラスタ間のデータ移行ガイド
Amazon Elasticsearch Service から別の OpenSearch クラスタへデータを移行する方法を解説します。本記事では、Scroll API と Bulk API を使用したシンプルかつ確実な移行手法を紹介します。
背景
クラウドサービスの移行やコスト最適化のため、Elasticsearch/OpenSearch クラスタ間でデータを移行する必要が生じることがあります。今回は以下の環境間での移行を行いました。
- 移行元: Amazon Elasticsearch Service (AWS)
- 移行先: セルフホスト OpenSearch
移行の流れ
- 移行元・移行先のインデックス確認
- マッピング情報の取得と調整
- 移行先にインデックスを作成
- Scroll API + Bulk API でデータ移行
- 移行結果の確認
事前準備:インデックスの確認
まず、移行元と移行先のインデックス一覧を確認します。
# 移行元のインデックス一覧
curl -u "user:password" "https://source-cluster/_cat/indices?v&s=index"
# 移行先のインデックス一覧
curl -u "user:password" "https://dest-cluster/_cat/indices?v&s=index"
Step 1: マッピング情報の取得
移行元からマッピング情報を取得します。
curl -s -u "user:password" \
"https://source-cluster/index_name/_mapping" \
> mappings.json
Step 2: マッピングの調整
移行先の環境によっては、カスタムアナライザーが利用できない場合があります。例えば、日本語形態素解析用の kuromoji プラグインがインストールされていない場合、アナライザー設定を除去する必要があります。
def remove_analyzer(obj):
"""マッピングからanalyzer設定を再帰的に削除"""
if isinstance(obj, dict):
if 'analyzer' in obj:
del obj['analyzer']
for key, value in obj.items():
remove_analyzer(value)
elif isinstance(obj, list):
for item in obj:
remove_analyzer(item)
return obj
Step 3: 移行先にインデックスを作成
調整したマッピングを使用してインデックスを作成します。
import json
import requests
from requests.auth import HTTPBasicAuth
def create_index(source_index, dest_index, dest_url, dest_auth):
# マッピングファイルを読み込み
with open(f'{source_index}_mappings.json') as f:
mappings_data = json.load(f)
mappings = mappings_data[source_index]['mappings']
clean_mappings = remove_analyzer(mappings)
index_body = {
'settings': {
'number_of_shards': 1,
'number_of_replicas': 1
},
'mappings': clean_mappings
}
response = requests.put(
f"{dest_url}/{dest_index}",
json=index_body,
auth=dest_auth
)
print(f"Create {dest_index}: Status {response.status_code}")
return response.status_code in [200, 201]
Step 4: データ移行スクリプト
Scroll API を使用して移行元からデータを取得し、Bulk API で移行先に投入します。
import json
import requests
from requests.auth import HTTPBasicAuth
import time
SOURCE_URL = 'https://source-cluster'
SOURCE_AUTH = HTTPBasicAuth('user', 'password')
DEST_URL = 'https://dest-cluster'
DEST_AUTH = HTTPBasicAuth('user', 'password')
def migrate_index(source_index, dest_index, batch_size=1000):
print(f"=== Migrating {source_index} -> {dest_index} ===")
# Scroll APIの初期化
scroll_url = f"{SOURCE_URL}/{source_index}/_search?scroll=5m"
query = {
"size": batch_size,
"query": {"match_all": {}}
}
response = requests.post(scroll_url, json=query, auth=SOURCE_AUTH)
if response.status_code != 200:
print(f"Error: {response.text}")
return False
data = response.json()
scroll_id = data['_scroll_id']
hits = data['hits']['hits']
total = data['hits']['total']['value']
print(f"Total documents: {total}")
migrated = 0
errors = 0
start_time = time.time()
while hits:
# Bulk リクエストの準備
bulk_body = ""
for hit in hits:
action = {"index": {"_index": dest_index, "_id": hit['_id']}}
bulk_body += json.dumps(action) + "\n"
bulk_body += json.dumps(hit['_source']) + "\n"
# Bulk リクエストの送信
headers = {"Content-Type": "application/x-ndjson"}
bulk_response = requests.post(
f"{DEST_URL}/_bulk",
data=bulk_body,
headers=headers,
auth=DEST_AUTH
)
if bulk_response.status_code == 200:
result = bulk_response.json()
if result.get('errors'):
for item in result['items']:
if 'error' in item.get('index', {}):
errors += 1
migrated += len(hits)
else:
print(f"Bulk error: {bulk_response.text[:200]}")
errors += len(hits)
# 進捗表示
elapsed = time.time() - start_time
rate = migrated / elapsed if elapsed > 0 else 0
eta = (total - migrated) / rate if rate > 0 else 0
print(f"\rProgress: {migrated}/{total} ({migrated*100//total}%) "
f"- {rate:.0f} docs/sec - ETA: {eta:.0f}s", end='', flush=True)
# 次のバッチを取得
scroll_response = requests.post(
f"{SOURCE_URL}/_search/scroll",
json={"scroll": "5m", "scroll_id": scroll_id},
auth=SOURCE_AUTH
)
if scroll_response.status_code != 200:
break
data = scroll_response.json()
scroll_id = data['_scroll_id']
hits = data['hits']['hits']
# Scrollコンテキストの解放
requests.delete(
f"{SOURCE_URL}/_search/scroll",
json={"scroll_id": scroll_id},
auth=SOURCE_AUTH
)
print(f"\n\nCompleted: {migrated} documents, {errors} errors")
return True
Step 5: 移行の実行と確認
# 移行の実行
migrate_index('items1', 'cj-items1')
migrate_index('items2', 'cj-items2')
migrate_index('images', 'cj-images')
# 移行結果の確認
curl -u "user:password" "https://dest-cluster/_cat/indices?v&s=index"
注意点とベストプラクティス
1. カスタムアナライザーの対応
移行先に kuromoji などのプラグインがない場合、以下の選択肢があります。
- アナライザーなしで移行: 日本語検索の精度は下がるが、データは保持される
- プラグインをインストール: 移行先に必要なプラグインを事前にインストール
- 代替アナライザーを使用: standard アナライザーなどで代用
2. バッチサイズの調整
# 小さいドキュメント: バッチサイズを大きく
migrate_index('small_docs', 'dest_small', batch_size=5000)
# 大きいドキュメント: バッチサイズを小さく
migrate_index('large_docs', 'dest_large', batch_size=500)
3. 並列実行
複数のインデックスを並列で移行することで、全体の移行時間を短縮できます。
# バックグラウンドで並列実行
python migrate.py items1 cj-items1 &
python migrate.py items2 cj-items2 &
python migrate.py images cj-images &
wait
4. エラーハンドリング
Bulk API のレスポンスを確認し、エラーが発生したドキュメントを記録・リトライする仕組みを実装することを推奨します。
if result.get('errors'):
for item in result['items']:
if 'error' in item.get('index', {}):
error_doc_id = item['index']['_id']
error_reason = item['index']['error']['reason']
# エラーログに記録
logging.error(f"Failed: {error_doc_id} - {error_reason}")
5. ドキュメント数の検証
移行後、ソースと移行先のドキュメント数を比較して、データの整合性を確認します。
# ソースのドキュメント数
curl -s -u "user:password" "https://source/_count?q=*"
# 移行先のドキュメント数
curl -s -u "user:password" "https://dest/_count?q=*"
パフォーマンスの目安
移行速度は環境やドキュメントサイズによって異なりますが、一般的に以下の程度が期待できます。
- 小〜中規模ドキュメント: 500〜1,000 docs/sec
- 大規模ドキュメント: 100〜500 docs/sec
- ネットワーク帯域: クラスタ間の距離や帯域幅が速度に大きく影響
まとめ
Scroll API と Bulk API を組み合わせることで、大規模なデータでも安定して移行できます。カスタムアナライザーの有無など、環境の差異に注意しながら移行を進めることが重要です。
Discussion