🐙
ElasticSearch(OpenSearch)の更新ロジック【雑メモ】
概要
Rails✖︎MySQLで実行されているアプリのデータに変更があった場合、OpenSearch側へデータ同期するときの動き
-
OpenSearch2.11
-
opensearch-ruby
-
変更対象の処理を振り分け(OpenSearchへ作成 or 更新 or 削除)
# 作成、更新、削除に振り分け
def self.set_categorize_actions(target_job_ids:, job_ids:)
# return if all_job_ids.blank?
#
response = client.mget(
index: INDEX_NAME,
body: { docs: job_ids.map { |id| { _id: id } } }
)
create_ids = []
update_ids = []
delete_ids = []
# 振り分け
response["docs"].each do |doc|
document_id = doc["_id"].to_i
if doc["found"] # OpenSearch上に既に存在している場合
# 変更対象の場合は更新
if target_job_ids.include?(document_id)
update_ids << document_id
else
# OpenSearch上に既に存在しているが、変更対象ではない場合は削除
delete_ids << document_id
end
else # OpenSearch上に存在せず、変更対象である場合は作成
create_ids << document_id if target_job_ids.include?(document_id)
end
end
return { update_ids: update_ids, create_ids: create_ids, delete_ids: delete_ids }
end
- 一括で処理
# データ更新時、OpenSearch側にも変更が必要(作成、更新、削除)
def self.sync_documents(job_ids:, update_targets: [])
begin # 実行時エラーが出てもRails側を止めないようにbegin rescue処理
# 変更対象のjobを抽出
target_jobs = Job.where(id: job_ids).opensearch_filter
# 振り分け(作成、更新、削除)
categorize_actions = set_categorize_actions(target_job_ids: target_jobs.pluck(:id), job_ids: Array.wrap(job_ids))
bulk_data = []
create_ids = categorize_actions[:create_ids] # 作成対象
update_ids = categorize_actions[:update_ids] # 更新対象
if create_ids.present? || update_ids.present?
target_jobs = set_preload(jobs: target_jobs, targets: update_targets)
end
# 更新用のデータ生成
if update_ids.present?
target_jobs.where(id: update_ids).each do |job|
bulk_data << {
update: {
_index: INDEX_NAME, _id: job.id,
data: { doc: set_document(job: job, targets: update_targets) }
}
}
end
end
# 作成用のデータ生成
if create_ids.present?
target_jobs.where(id: create_ids).each do |job|
bulk_data << {
index: {
_index: INDEX_NAME,
_id: job.id, data: set_document(job: job, targets: update_targets)
}
}
end
end
# 削除用のデータ生成
if categorize_actions[:delete_ids].present?
categorize_actions[:delete_ids].each do |job_id|
bulk_data << { delete: { _index: INDEX_NAME, _id: job_id } }
end
end
# 実行
# バルクデータを分割して処理(多すぎるとOpensearch Bulk Error: [429] 429 Too Many Requests /_bulk」が発生する )
bulk_data.each_slice(1000) do |data|
client.bulk(body: data) if data.present?
end
rescue => e
# TODO_Elastic:エラー処理
p e
end
end
Discussion