🐙

ElasticSearch(OpenSearch)の更新ロジック【雑メモ】

2024/02/12に公開

概要

Rails✖︎MySQLで実行されているアプリのデータに変更があった場合、OpenSearch側へデータ同期するときの動き

  • OpenSearch2.11

  • opensearch-ruby

  • 変更対象の処理を振り分け(OpenSearchへ作成 or 更新 or 削除)

  # 作成、更新、削除に振り分け
  def self.set_categorize_actions(target_job_ids:, job_ids:)

    # return if all_job_ids.blank?
    #
    response = client.mget(
      index: INDEX_NAME,
      body: { docs: job_ids.map { |id| { _id: id } } }
    )
    create_ids = []
    update_ids = []
    delete_ids = []
    # 振り分け
    response["docs"].each do |doc|
      document_id = doc["_id"].to_i
      if doc["found"] # OpenSearch上に既に存在している場合
        # 変更対象の場合は更新
        if target_job_ids.include?(document_id)
          update_ids << document_id
        else
          # OpenSearch上に既に存在しているが、変更対象ではない場合は削除
          delete_ids << document_id
        end
      else # OpenSearch上に存在せず、変更対象である場合は作成
        create_ids << document_id if target_job_ids.include?(document_id)
      end
    end
    return { update_ids: update_ids, create_ids: create_ids, delete_ids: delete_ids }
  end
  • 一括で処理
  # データ更新時、OpenSearch側にも変更が必要(作成、更新、削除)
  def self.sync_documents(job_ids:, update_targets: [])
    begin # 実行時エラーが出てもRails側を止めないようにbegin rescue処理
      # 変更対象のjobを抽出
      target_jobs = Job.where(id: job_ids).opensearch_filter

      # 振り分け(作成、更新、削除)
      categorize_actions = set_categorize_actions(target_job_ids: target_jobs.pluck(:id), job_ids: Array.wrap(job_ids))

      bulk_data = []
      create_ids = categorize_actions[:create_ids] # 作成対象
      update_ids = categorize_actions[:update_ids] # 更新対象
      if create_ids.present? || update_ids.present?
        target_jobs = set_preload(jobs: target_jobs, targets: update_targets)
      end

      # 更新用のデータ生成
      if update_ids.present?
        target_jobs.where(id: update_ids).each do |job|
          bulk_data << {
            update: {
              _index: INDEX_NAME, _id: job.id,
              data: { doc: set_document(job: job, targets: update_targets) }
            }
          }
        end
      end

      # 作成用のデータ生成
      if create_ids.present?
        target_jobs.where(id: create_ids).each do |job|
          bulk_data << {
            index: {
              _index: INDEX_NAME,
              _id: job.id, data: set_document(job: job, targets: update_targets)
            }
          }
        end
      end

      # 削除用のデータ生成
      if categorize_actions[:delete_ids].present?
        categorize_actions[:delete_ids].each do |job_id|
          bulk_data << { delete: { _index: INDEX_NAME, _id: job_id } }
        end
      end

      # 実行
      # バルクデータを分割して処理(多すぎるとOpensearch Bulk Error: [429] 429 Too Many Requests /_bulk」が発生する )
      bulk_data.each_slice(1000) do |data|
        client.bulk(body: data) if data.present?
      end
    rescue => e
      # TODO_Elastic:エラー処理
      p e
    end
  end

Discussion