Chapter 03無料公開

どのように実装されているか

「Data Update Scripts」がどういったものかイメージが掴めたところで、次はどのように実装されているかを見てみましょう。
機能を構成する技術要素は下記のようになっています。

1つ1つ見てみましょう。

CUIから処理を実行するためのrakeタスク

rakeタスクとは

rakeとは、CUI(ターミナルやコマンドプロンプト)からrubyのコードをタスクとして実行できるようにするものです。

https://github.com/ruby/rake

CUIから実行できるということで、今回の「Data Update Scripts」のようなデプロイ時や開発環境を構築する際に必要な処理を実装するのにとても便利ですし、そのほかにも色々便利な使い方があります。

Railsのデフォルトでも色々なrakeタスクが定義されています。
例えばDBのマイグレーションを実行する db:migrate や、アセットデータを削除する assets:clean などもrakeタスクです。
コマンド bin/rails --tasks を実行するとRailsで定義されているrakeタスクの一覧を見ることができます。

また、アプリケーション側で定義したrakeタスクはlib/tasksに配置されます。
ですので、devtoで定義されているrakeタスクの一覧は こちら で見ることができます。

rakeタスクの実行は bin/rails {namespace}:{タスク名} の形式で行えます。
今回のトピックであるData Update Scriptの場合はbin/rails data_updates:enqueue_data_update_workerとなります。
また、rakeタスクから別のrakeタスクを実行することも可能で、その場合は下記の形式になります。

Rake::Task["data_updates:enqueue_data_update_worker"].execute

「Data Update Scripts」内での使われ方

前述した通り、「Data Update Scripts」のデータ更新処理は、rakeタスクdata_updates:enqueue_data_update_workerとして定義されています。
そして、このrakeタスクは次の処理から呼び出されます。

では、実際にrakeタスクdata_updates:enqueue_data_update_workerのコードを見てみましょう。
lib/tasks/data_updates.rake

namespace :data_updates do
  desc "Enqueue Sidekiq worker to handle data updates"
  task enqueue_data_update_worker: :environment do
    if Rails.env.development?
      Rake::Task["data_updates:run"].execute
    else
      # Ensure new code has been deployed before we run our update scripts
      DataUpdateWorker.perform_in(10.minutes)
    end
  end

  desc "Run data updates"
  task run: :environment do
    DataUpdateWorker.new.perform
  end
end

rakeタスクの書式は下記のようになっています。

  1. namespaceブロックでタスクのnamespaceを定義
    • 今回はnamespaceとしてdata_updatesが定義されている
  2. descブロックで下のタスクの説明文を定義する
    • bin/rails --tasksコマンドでタスクの一覧を出した時に表示される
    • 必須ではない
  3. taskブロック内にタスク名とタスクの処理を記述する
    • 今回はタスクとしてenqueue_data_update_workerrunが定義されている

16行の短いコードですが、気になる点を2つ取り上げて見たいと思います。

タスク名の後ろの:environmentって何?

前述したlib/tasks/data_updates.rake内に定義されている2つのタスクのどちらもタスク名の後ろに:environmentと付いています。
これは何なのでしょうか?

rakeではタスクに対して、そのタスクが依存するタスクを設定することができます。
依存するタスクを設定しておくと、タスクが実行される前に依存するタスクが実行されます。
この機能は「Prerequisites」と呼ばれていまして、詳しくは こちら に記載されています。

今回の場合は2つのタスクはどちらもenvironmentタスクに依存しているということになります。
environmentタスクはRailsが定義している、Railsのアプリケーションのコードを読み込むタスクです。

このタスクのおかげでDataUpdateWorkerクラスのようなRailsアプリケーションのクラスを参照することができるようになります。

development環境とproduction環境の実行方法の違い

lib/tasks/data_updates.rake内のコードでもう1つ取り上げたい点は、
development環境とproduction環境の実行方法を分けている点です。

タスクenqueue_data_update_worker上で、development環境ならrunタスクを実行して、
それ以外の環境なら DataUpdateWorker.perform_in(10.minutes) を実行していますね。
(「development以外の環境」は説明を簡潔にするために、以降は「production環境」と表現します。)

if Rails.env.development?
  Rake::Task["data_updates:run"].execute
else
  # Ensure new code has been deployed before we run our update scripts
  DataUpdateWorker.perform_in(10.minutes)
end

development環境で実行されるrunタスクを見てみると、たった一行DataUpdateWorker.new.performがあります。

task run: :environment do
  DataUpdateWorker.new.perform
end

これはDataUpdateWorkerクラスのpeformメソッドを呼んでいるだけです。
performメソッドにはタスクの本体である、データを更新するための処理が記述されています。

一方で、production環境で呼ばれるperform_in(10.minutes)DataUpdateWorkerクラスに直接定義されていないメソッドで、includeしているSidekiq::Workerクラスに定義されているメソッドです。

Sidekiqはバックグラウンド処理を簡単に実現してくれるgemです。
上記のDataUpdateWorker.perform_in(10.minutes)は言い換えますと、「10分後にバックグラウンドでDataUpdateWorkerクラスのperformメソッドを実行してください」とSidekiqに対して依頼していることになります。

バックグラウンド処理とはその名の通り「裏側での処理」でして、対になる「表側の処理(フォアグラウンド処理)」とはwebアプリケーションの場合は一般的にリクエストを受け付けてからレスポンスを返すまで処理を指します。

例えばフォアグラウンド処理でメール送信処理や前述したデータを更新する処理などのような時間がかかる処理を行なってしまうと、レスポンスを返す時間が遅くなってしまい、その間ユーザーを待てせてしまうことになります。
こういった処理をバックグラウンドで行うことでレスポンスを早く返すことができるようになります。

バックグラウンド処理のイメージ図

バックグラウンド処理とSidekiqについては後ほど詳しく取り扱いますので、一旦ざっくりとした役割を把握していただければ大丈夫です。

rakeタスクの中身に話を戻しましょう。
development環境では単にDataUpdateWorkerクラスのperformメソッドを実行しているのに対して、
production環境ではDataUpdateWorker.perform_in(10.minutes)という記述で、performメソッドの処理を10分後にバックグラウンドで行なっているという話でした。

なぜdevelopment環境とproduction環境で実行方法を変えているのでしょうか?

この疑問を言い換えると「なぜproduction環境では10分後に実行する必要があるのか」と言えると思います。
答えはコード内のコメントに書かれていますね。

# Ensure new code has been deployed before we run our update scripts
DataUpdateWorker.perform_in(10.minutes)

「新しいコードがデプロイされている状態でデータ更新スクリプトを実行する」という意味です。

まず今回の主役であるrakeタスクdata_updates:enqueue_data_update_workerはデプロイ時に自動的に実行されます。
デプロイ時の処理の流れは下記のようになっています。

①. デプロイ時に実行されるrelease-tasks.shから、rakeタスクapp_initializer:setupが呼ばれる

release-tasks.sh から一部抜粋

# ...

STATEMENT_TIMEOUT=4500000 bundle exec rails app_initializer:setup

# ...

②. rakeタスクapp_initializer:setupからrakeタスクdata_updates:enqueue_data_update_workerが呼ばれる
lib/tasks/app_initializer.rake から一部抜粋

namespace :app_initializer do
  desc "Prepare Application on Boot Up"
  task setup: :environment do

    # ...

    puts "\n== Updating Data =="
    Rake::Task["data_updates:enqueue_data_update_worker"].execute
  
    # ...
  
  end
end

そして、先ほどバックグラウンド処理の説明でwebアプリケーションではリクエストを受け取ってレスポンスを返すまでの処理を「フォアグラウンドの処理」と説明しましたが、今回の場合は上記のデプロイ処理が「フォアグラウンドの処理」となります。

フォアグラウンドのデプロイ処理が終わってから10分後に処理をさせたいので、自然とバックグラウンドで処理を行うことになります。

デプロイフローやデプロイにかかる時間はプロダクトによって様々だと思いますが、devtoではデプロイフローの10分後であれば新しいコードがデプロイされているという前提の元にこのような方法をとっているのでしょう。

このようにバックグラウンドでスケジューリング(今回のような遅延実行や定期実行など)された処理を実行するという方法を開発の選択肢に加えることができると、課題の解決方法の発想の幅を広げることができると思います。

Railsのようなwebアプリケーション開発にとってバックグラウンド処理は大事な要素だと思いますので、少し掘り下げてみたいと思います。

バックグラウンドで処理を行うためのSidekiq

Sidekiqとは

前述にあるrakeタスクの中に出てきたSidekiqについて少し説明したいと思います。
Sidekiqはバックグラウンドの処理を効率的に行えるようにするgemです。

https://github.com/mperham/sidekiq

バックグラウンド処理は、主にメール送信処理や前述したデータを更新する処理のような、時間がかかる処理に使われています。
時間のかかる処理をバックグラウンドで行うことで、ユーザーへのレスポンスを早くするなど、品質の高いwebアプリケーションを実現することができます。

SidekiqはRailsアプリケーションからバックグラウンド処理を依頼されると、redis上にfirst-in、first-outのqueue形式で依頼内容を保存します。
その後、Sidekiqプロセスがredis上のqueueから依頼内容を取り出して、実行します。

ですので、Sidekiqを使うためにはredisとSidekiqプロセスを稼働させる必要があります。

そして、Sidekiqを使用している場合の「バックグラウンドで処理する」という表現は「Sidekiqプロセス上で処理する」と同等ということになります。

どのような流れでバックグラウンドで処理が行われるか

どのような流れでバックグラウンドで処理が行われるかを把握できると、バックグラウンド処理をより活用しやすくなると思います。

バックグランド処理が実行されるまでのイメージ図

まず、バックグラウンドで行わせたい処理の内容は、前述したDataUpdateWorkerクラスのようにSidekiq::WorkerクラスをincludeしたWorkerクラスのperformメソッドに記述します。

そして、Sidekiqにバックグラウンド処理を依頼するには前述したperform_inメソッドで行えます。
また、遅延させる必要のない場合はperform_asyncメソッドを使います。

これらのメソッドを呼び出すことでredisのqueueに依頼内容がpushされます。

DataUpdateWorkerクラスの場合ですと下記のような形式でredis上に保存されます。

{
  "class":"DataUpdateWorker",
  "args":[],
  "retry":5,
  "queue":"high_priority",
  "jid":"c3284c88b095273bfb54253f",
  "created_at":1601864875.0150988
}

処理の内容はWorkerクラスのperformメソッドに記述すると決まっているので、Workerクラス名だけ解れば十分なのですね。
また、DataUpdateWorkerクラスのperformメソッドは引数がないので、argsは空[]になっています。

retryqueueは、Workerクラスごとに設定できる値です。

  • retry: 処理中にエラーが発生した際に何度再実行するかの数
  • queue: バックグラウンド処理依頼をpushするqueueの名前

Sidekiqはqueueを複数持つができます。
上記ではqueue: high_priorityとなっているので、DataUpdateWorkerクラスはhigh_priority queueにpushされているということになります。
これはDataUpdateWorkerクラス内で下記のようにsidekiq_optionsを指定しているからです。

sidekiq_options queue: :high_priority, retry: 5

このようにWorkerクラスごとにどのqueueにpushするかを指定することができます。

devtoのqueueの設定は config/sidekiq.yml にあります。

:queues:
  - ["default", 1]
  - ["low_priority", 10]
  - ["medium_priority", 100]
  - ["high_priority", 1000]
  - ["scheduler", 1000]
  - ["mailers", 1000]

queueの名前の後ろにある数値は「queueのweight」でして、数値が高いqueueほど優先して処理されます。
このように複数のqueueを用意して優先度を設定しておくことで、バックグラウンド処理の依頼が多くなりqueueが膨れ上がってしまうような状況でも、重要な処理はすぐに実行されるようにすることができます。

詳しい情報は Sidekiqのドキュメント に記載されています。

バックグラウンドの処理依頼がredis上のqueueにpushされていてもSidekiqプロセスが稼働していないと処理は実行されません。
Sidekiqプロセスの起動はbundle exec sidekiqで行えます。

devtoの場合はプロセス管理ツールに foreman を使っているので、
development環境でのプロセス設定ファイルである Procfile.dev に下記のように記述されています。

web: bin/rails s -p 3000 -b 0.0.0.0
webpacker: ./bin/webpack-dev-server
sidekiq: bundle exec sidekiq

一番下の行にSidekiqプロセスを起動する設定がありますね。

まとめますと下記の要素によってバックグラウンド処理が実現されています。

  • バックグラウンドで行わせたい処理を表すWorkerクラス
  • queue形式でバックグラウンド処理に関するデータを保持するredis
  • 実際に処理を行うSidekiqプロセス

Workerクラスとモデル

ここまでは、rakeタスクからWorkerクラスの処理が呼び出されるまでの流れを見てきました。
あとはWorkerクラスでどのようにデータ更新処理が行われているかが解れば、「Data Update Scripts」の機能全体がどのように実装されているかを把握できると思います。

DataUpdateWorkerクラス

まずは、「Data Update Scripts」のデータ更新処理のメイン部分であるDataUpdateWorkerクラスのperformメソッドを見てみましょう。
処理の流れは下記のようになっています。

  1. モデル DataUpdateScriptクラスのクラスメソッドであるscript_to_runを呼び出して、実行する必要があるDataUpdateScriptインスタンスを取得
  2. DataUpdateScriptインスタンスを実行しようとしていることを記録 => mark_as_run!メソッド
  3. scriptを実行 => run_scriptメソッド

app/workers/data_update_worker.rb から一部抜粋

def perform
    # モデル「DataUpdateScript」のクラスメソッドのscript_to_runを呼び出し、
    # 実行する必要があるDataUpdateScriptインスタンスを取得
    DataUpdateScript.scripts_to_run.each do |script|
      
      # 実行しようとしていることを記録
      script.mark_as_run!
      log_status(script)

      # scriptの実行
      run_script(script)
    end
end

ここで、モデル DataUpdateScriptクラスが登場してきました。
DataUpdateScriptクラスのインスタンスはlib/data_update_scriptsディレクトリに配置されているそれぞれのscriptファイルに対応しています。

DataUpdateScriptクラスのクラスメソッドのscript_to_runは、実行する必要があるDataUpdateScriptインスタンスを返します。
実行する必要があるDataUpdateScriptインスタンスとは、具体的には「既に実行されたもの」と「実行中のもの」を省いたリストになります。
「実行中のもの」を判別するために、scriptを実行する前にmarks_as_run!メソッドを呼び出している形です。

その後、Workerクラスは受け取ったDataUpdateScriptクラスを通して各scriptを実行します。

次に、scriptを実行するメソッドであるrun_scriptメソッドを見てみましょう。
解りやすいように、前述したpull request「Update hotness score for Tags」のscriptファイル20200324133751_update_tag_hotness_scores.rbを実行していると想定してコメントを付け足してあります。

app/workers/data_update_worker.rb から一部抜粋

def run_script(script) # scriptはDataUpdateScriptインスタンスです
   
    # まずscriptファイルを読み込む
    # 例: file_path = `{プロジェクトディレクトリのpath}/lib/data_update_scripts/20200324133751_update_tag_hotness_scores.rb`
    require script.file_path
    
    # scriptファイル内で定義されているクラスをインスタンス化して、runメソッドを実行
    # 例: file_class = `DataUpdateScripts::UpdateTagHotnessScores`
    script.file_class.new.run
    
    # 実行が完了したことを記録
    script.mark_as_finished!
    
    # ...

end

このようにrubyファイルであるscriptファイルを実際にrequireして、内部で定義されているクラスをインスタンス化してrunメソッドを実行しています。

モデル DataUpdateScriptクラス

最後にモデル DataUpdateScriptクラスを見ていきましょう。

役割

Workerクラスの説明でも触れましたが、DataUpdateScriptクラスのインスタンスはlib/data_update_scriptsディレクトリに配置されている個々のscriptファイルに対応しています。
ですので、attributeとしてscriptファイルのファイル名を表すfile_nameが定義されています。

また、DataUpdateScriptクラスの主な役割として「lib/data_update_scriptsディレクトリから読み取ったscriptファイルの中から、実行する必要があるscriptファイルを識別する」というものがあります。
具体的には、「一度実行したもの」と「実行中のもの」は省いて、新たに追加されたもののみを「実行する必要があるscriptファイル」として識別します。

これを実現するために、「新たに追加(enqueued)」,「実行中(working)」,「成功(succeed)」,「失敗(failure)」のような状態を持つ必要があります。
ですので、attributeとしてstatusが定義されています。

statusにはenumというものが使われています。
enumstatusに対して使うことで、status周りの扱いがとても便利になります。
どういうことか詳しく見てみましょう。

enum

まず、statusに対するenumの設定は下記のようになっています。
app/models/data_update_script.rb から一部抜粋

class DataUpdateScript < ApplicationRecord
    # ...

    STATUSES = { enqueued: 0, working: 1, succeeded: 2, failed: 3 }.freeze
    enum status: STATUSES
    
    # ...
end

DBの定義は下記のようにintegerとして定義します。

t.integer :status, default: 0, null: false

これだけの設定でDataUpdateScriptクラスを下記のように便利に扱えるようになります。

# `status: :enqueued` のように、名前を指定できるようになる。
# enumを使っていない場合は `status: 0` となり、
# 0が何を表しているかは覚えていないと解らないので、可読性が下がる。
script = DataUpdateScript.new(file_name: :hoge, status: :enqueued)

# インスタンスの状態を判別するためのメソッドが付与される
script.status # => enqueued
script.enqueued? # => true
script.working? # => false

script.working! # statusをworkingに変更

# インスタンスだけではなく、クラス自体にも便利メソッドが付与される
DataUpdateScript.statuses
# => {"enqueued"=>0, "working"=>1, "succeeded"=>2, "failed"=>3}

# statusがenqueuedのscope
DataUpdateScript.enqueued
# 下記と同等
DataUpdateScript.where(status: :enqueued)

このようにstatusを0や1の数値で扱うのではなく、状態を表した名前の文字列で扱えるようになるので可読性がとても高くなります。

クラスメソッド script_to_run

次にWorkerクラスの説明にも登場したクラスメソッドscript_to_runを見てみましょう。

このメソッドはその名の通りに「実行する必要があるscript(DataUpdateScriptインスタンス)」を返します。
処理の流れとしましては下記のようになっています。

  1. lib/data_update_scriptsディレクトリからscriptファイルを読み込んで、DBにDataUpdateScriptクラスのレコードとして保存する
  2. 実行する必要があるものを抽出して返す

app/models/data_update_script.rb から一部抜粋

def scripts_to_run
  insert_new_scripts
  enqueued.where(id: ids).order(file_name: :asc)
end

scriptファイルをDataUpdateScriptクラスのレコードとしてDBに書き込んでいる処理はクラスメソッドinsert_new_scriptsの中に書かれています。

lib/data_update_scriptsディレクトリからのscriptファイルを読み込む処理はクラスメソッドfilenamesの中にあるDir.globメソッドで行なっています。

とても簡潔に書かれているのでファイルを取り扱うコードのサンプルとしても価値が高いと思います。
scriptファイルを取り込む処理に関連する部分を抜粋してみましので、ぜひ見てください。

app/models/data_update_script.rb から一部抜粋

class DataUpdateScript < ApplicationRecord
  DIRECTORY = Rails.root.join("lib/data_update_scripts").freeze
  
  # ...

  class << self
    
    # ...
      
    private

   
    def insert_new_scripts
      now = Time.current

      # filenamesメソッドからファイル名のリストを取得
      scripts_params = filenames.map do |fn|
        { file_name: fn, created_at: now, updated_at: now }
      end

      DataUpdateScript.insert_all(scripts_params)
    end
    
    def filenames
      Dir.glob("*.rb", base: DIRECTORY).map do |f|
        Pathname.new(f).basename(".rb").to_s
      end
    end
  end
  
  # ...

end

DBへの書き込みはRails6から追加された insert_allメソッド で行われています。
このメソッドはモデルのインスタンスを生成せずに、複数のレコードを一括でDBに書き込むものです。
インスタンスを生成しないので、after_createなどの各種コールバックやvalidationは機能しませんが、その分パフォーマンスが良いです。

また、DBのfile_nameにunique制約が付いているので、何度実行されても同じscriptファイルが重複登録されないようになっています。

DBへの書き込みの後は、実行する必要があるインスタンスを返すだけです。

enqueued.where(id: ids).order(file_name: :asc)

enqueuedは前述したenumによって追加されたクラスメソッドで、where(staus: :enqueued)と同等のものです。
そして、migrationファイルと同じように時系列の順に実行する必要があるのでfile_nameの昇順を指定しています。(file_nameの先頭に日時のタイムスタンプが付いている)

真ん中のwhere(id: ids)の記述がなぜ必要なのかは筆者には解りませんでしたので、どなたか解る方がいましたら連絡していただけるとありがたいです。

こちら、本書を読んでいただいた つららさん が調べてくださいました。
調べた結果、where(id: ids)の記述は不要とのことで、修正のPull Requestを送ってくださいました。
https://github.com/forem/forem/pull/11263