👏

まだdelayed_job使っているの?今はgood_jobが熱い!

2023/06/26に公開

delayed_jobしか使ったことなくて、また使おうとしているなら是非検討してみてほしいgood_job

まずgood_jobはのgithubはこちら
https://github.com/bensheldon/good_job

good_jobをおすすめする理由

・メンテされている(2023.06現在delayed_jobの最新コミットは2023年01月16日でしかも落ちている...)
・delayed_jobと同じでキューをDBで管理するため、sidekickみたいにredisを立てる必要はない
・delayed_jobよりドキュメントがしっかりしている
・delayed_jobと違いダッシュボードがある

もうこれだけ理由があるので、good_jobを使いましょう

delayed_jobにできて、good_jobにできないこと

多分ないです。あったら教えてください。

good_jobの注意点。それは排他処理。

ただしgood_jobはmultithreadなので排他処理をしっかり意識して実装しないと事故ります。
排他処理については日本語のドキュメントが見つからなかったのでこの記事ではそこをメインに書きます。

排他性を意識した並行処理の要件の例

仕事で使うときに、大量のjobを高速処理しつつ要所要所で排他処理を行わないといけなかったのでgood_jobのBatchesとConcurreny controlsの機能を利用しました。
どんな機能なのかは後述しますが、まずは何が必要だったかをご紹介します。

化石燃料を使用した火力発電から得たエネルギーと対比して、太陽光や風力、水力発電から得たエネルギーをグリーン電力と呼びますが、近年そのグリーン電力が具体的にどこの発電所でいつ発電されたものかをトラッキングできることが求められています。電気は通常1日を30分ごとに48コマ分割し、各30分ごとに計測された物を30分値と呼びます。

このグリーン電力30分値を特定のグループの電気の使用者の使用量30分値に割り当てる機能を実現する必要がありました。また電気を余すことなく割り当てるために、優先度の高いグループに割り当てた後はその他のグループにも割り当てるというような優先度の要件もありました。

例えば4/1の0:00~0:30のコマの優先度1の割り当て計算をジョブAで実行したとき、並行処理できるその他のジョブは同じ優先度1で別コマの割り当て計算ジョブなります。

ジョブAと異なるコマは以下になります。
・4/1の0:30~1:00のコマ、4/1の1:00~1:30のコマ...4/1の23:30~24:00のコマ
・4/2の0:00~0:30のコマ、4/2の1:00~1:30のコマ...4/39の23:30~24:00のコマ

一方で優先度が異なる同じコマの割り当て計算はジョブAが終了したことを持って起動させる必要があります。

以上が要件の説明になります。

順序性を持たせるBatchとコールバック機能

英語を読むのが苦じゃない方はこちらを読んでください。
https://github.com/bensheldon/good_job#batches

順序性を持たせるためにBatchとコールバックという機能を説明します。
今回の要件では優先度という概念があり、優先度1のjobが全て終了してから優先度2のjobを実行しなければいけません。

これは下記のように実装しました。(かなり簡略化しています)

class ParentJob < ApplicationJob
  def perform(batch, priorities)
    index = batch.properties[:index] || 0
    # prioritiesは優先度の配列です。優先度1と2と3があれば[1,2,3]となります。
    priorities = batch.properties[:priorities]

    ChildrenJob.perform_later(
      priorities: priorities, 
      index: index,
    )
  end
end

class ChildrenJob < ApplicationJob
  def perform(index:, priorities:)
    new_batch = GoodJob::Batch.new

    new_batch.add do
      GrandChildrenJob.perform_later(index)
    end

    if index == priorities.length - 1 # 全てマッチングし終えたら次の処理へ
      new_batch.enqueue(on_success: NextProcessJob)
    else # まだマッチングしていない優先度グループがあれば再度マッチングへ
      new_batch.enqueue(on_success: ParentJob, index: index + 1, priorities: priorities)
    end
  end
end

class GrandChildrenJob < ApplicationJob
  def perform(_index)
    # 4/1から4/30の30日分つまり30個のjobが一度にキューイングされ30日 * 48コマの割り当て処理が並行実行されます。
    (Date.parse('2023-04-01')..Date.parse('2023-04-30')).each do |date|
      DayJob.perform_later
    end
  end
end

parentジョブの仕事

初回のchildrenジョブの実行を担当します。prioritiesとindex(初回は0、次回からは受け取った値)をchildrenジョブに渡します。

childrenジョブの仕事

grandchildrenジョブの実行を行います。
batchの中でキューイングされたジョブの実行が全て成功したのちparentジョブから渡されたindexがprioritiesの最後のindexでない場合は、indexをインクリメントしてもう一度parentジョブを実行します。prioritiesの最後のindexであった場合、全ての優先度の実行が終わったことを意味しますから、次の処理に移ります。

この「batchの中でキューイングされたジョブの実行が全て成功したのち 」というコールバック機能を実現しているのがbatchのenqueueメソッドで、下記のon_successというコールバックのトリガーを指すキーに実行されるジョブを引数に渡します。

new_batch.enqueue(on_success: NextProcessJob)

実行したいジョブに引数を渡したいときは下記のようにします。

new_batch.enqueue(on_success: ParentJob, index: index + 1, priorities: priorities)

排他性を実現するConcurreny controls機能

英語を読むのが苦じゃない方はこちらを読んでください。
https://github.com/bensheldon/good_job#concurrency-controls

grandchildrenの処理はchildrenジョブの中で処理されます。
そのため、タイミングによって複数サーバーでデキューイングされ、ダブって実行される危険性があります。
そういう時は下記の実装をしましょう。


class GrandChildrenJob < ApplicationJob
  good_job_control_concurrency_with(
    # Maximum number of jobs with the concurrency key to be concurrently enqueued
    # 並列にキューイングしてよいジョブ数を入れる
    enqueue_limit: 1,

    # Maximum number of jobs with the concurrency key to be concurrently performed
    # 並列に実行されてよいジョブ数を入れる
    perform_limit: 1,

    # A unique key to be globally locked against.
    # Can be String or Lambda/Proc that is invoked in the context of the job.
    # Note: Arguments passed to #perform_later must be accessed through `arguments` method.
    # ロックするために使用するユニークキーで、String型やjobのコンテキストで呼ばれるLambda/Proc型を指定できます。
    # 留意点: argumentsメソッドはperform_laterから渡される引数にアクセスするために使用できます。
    key: -> { "任意のジョブの名前-#{arguments.first}" }
  )
  def perform(_index)
    (Date.parse('2023-04-01')..Date.parse('2023-04-30')).each do |date|
      DayJob.perform_later
    end
  end
end

keyはユニークになるように設定します。完全に命名は任意ですが、ユニークになるような値が名前に入るようにします。argumentsはjobを実行するperform_laterに渡した引数の配列になります。今回はinexを渡しているので、arguments.firstではindexが取得できるようになります。

もしユニークに設定できていないと重複したときに下記のエラーが起きます。

GoodJob::ActiveJobExtensions::Concurrency::ConcurrencyExceededError:

最後に

今回紹介した実装や要件は説明をシンプルにするためにかなり簡略化しており、もしかするとその過程で記述の誤りを生んでいるかもしれません。実行して確認しているわけではないので、ご了承ください。good_jobの排他処理についての大体の把握の一助となれば幸いです。

ENECHANGE

Discussion