📈

Rails から Cloud Tasks へのタスク投入に Parallel を使った / 1万件投入の計測記録

6 min read 2

DBのレコードに保存されたテーブルの全レコード(数万件規模)を抽出して、そのひとつずつに対して処理を行うという状況に遭遇しました。例えば… DBに保存されているユーザープロフィールテキストからHTMLメールを作成して、一件ずつ送信する、といったようなケースです。

クラウドアーキテクチャの観点では、Google Cloud Tasks などのキューを利用するのが便利です。キューにデータを詰めさえすれば、Cloud Tasks の場合流量は細かく調整できます。ざっくり以下のような構成で考えてみます。

graph TB
    enqueue(タスク投入アプリ / App Engine) -->|1.全件抽出| db[(User Database)]
    enqueue -- 2.本記事で言及する部分 -->cloud_tasks[[Cloud Tasks]]
    cloud_tasks-->|流量調整可能|mail(メール送信アプリ / App Engine + SaaS)

冪等性やスケーリングの観点から、1つの Cloud Task タスクに対して、決まったアウトプットを出力する設計が安全です。DBの全データを対象にする場合、配列のかたまりをひとつのタスクに詰めてメール送信させたくなりますが、1つのタスクに対して1つメール送信する 方針で考えます。

Cloud Tasks に詰めさえすれば負荷の調整は融通が効く

少し順番が前後しますが、Cloud Tasks にすべてのタスクデータが詰め込まれた後の話をします。当然、メール送信アプリはサーバーを持ち、サーバーの能力には限界があります。SaaS を使っていてレートリミットを考えなければいけない状況もあります。この手の一括処理はある程度時間がかかってもいいので負荷をかけずゆるゆると進めたいことも往々にしてあります。Cloud Tasks ではそのような状況に応じて同時実行数や1秒あたりのディスパッチ(処理待ち列のようなもの)数を調整できます。こちらのブログがとても参考になりますので、もっと詳しく知りたい方はぜひ御覧ください。

https://zenn.dev/nananaoto/articles/c62d63223e669af63c80

課題:投入側どうしよう

さて、この記事で言及したいのはタスク投入側です。大量のデータを投入するとき、考慮しなければならない資源が「時間」と「計算リソース」です。特に急ぐ処理でない場合、時間がかかってもいいのでゆっくり全部投入すればええやん…ということで、以下のような擬似コードで構成される投入ロジックを使いました。

  def enqueue
    tasks = CloudTasks.new
    User.all.find_each do |user|
        create_params = {
          queue: "send-mail",
          target_path: "/mail/sender",
          payload: {
            user_id: user.id,
            user_name: user.name,
            email: user.email
          }
        }
        tasks.create(**create_params)
      end
    end
  end

find_eachを使っているので1000件ずつSELECTされ繰り返し実行されます。1万件のダミーユーザーを用意してこのコードを実行したところ、すべて Cloud Tasks へ投入するのに 15分 かかりました。[1]

別に急ぐ処理じゃないからいいじゃん、という話ではあるのですが、Google App Engine フレキシブル環境のリクエスト実行時間の上限が60分です。これをこえるとタイムアウトし、投入が中断してしまいます。このままでは今後ユーザーが増えていったときに、この上限に引っかかる可能性があります。改善が必要です。

投入側の改善案1️⃣:Rakeタスクにする

そもそもHTTPSリクエストする形ではなくRakeタスクで投入する作戦です。大いにアリですが、以下2点の理由によ私はりRakeタスクは最後の手段にしたいです。

  • アプリをコンテナとしてデプロイする機会が増えてきており、Rakeタスクを採用する場合はSSHできるようにするかRakeタスク専用コンテナを検討しなければならない
  • 処理を開始して確認を完了する一連の流れを開発者以外に委譲しづらくなる[2]

要するに考えること・メンテ対象が増えちゃいがちなのでいったん候補からは外します。

投入側の改善案2️⃣:二重注入パターンを採用する

Cloud Tasks のドキュメントを見ると、エンキューについても言及してくれていました。

数百万または数十億のような多数のタスクを追加する必要がある場合、二重注入パターンが便利です。1 つのジョブからタスクを作成する代わりに、インジェクタ キューを使用します。インジェクタ キューに追加された各タスクは拡散して、100 個のタスクを目的のキューまたはキューグループに追加します。
https://cloud.google.com/tasks/docs/manage-cloud-task-scaling?hl=ja#large-scalebatch_task_enqueues

つまり、Cloud Tasks へ個別のタスクを投入するための Cloud Tasks キューを用意せよ ということですね。

graph TB
    enqueue(タスク投入アプリ / App Engine) -->|1.全件抽出| db[(User Database)]
    enqueue -- いくつかのかたまり -->injector[[インジェクター用キュー / Cloud Tasks]]
    injector-->injector_app(分割投入アプリ / App Engine)-->cloud_tasks[[Cloud Tasks]]
    cloud_tasks-->mail(メール送信アプリ / App Engine + SaaS)
    style injector fill:#f9f
    style injector_app fill:#f9f

言ってることはわかりますが大掛かりで厳しいです。今回は数万件規模ですので、ここまでやりたくありません。別の方法を考えます。ただ、この方法だと時間さえあれば100万件規模のデータにも対応できる点は魅力です。爆発的に増えるようなら検討の余地があります。

投入側の改善3️⃣:並列に投入する

同僚からのアドバイスにより、今回はこちらを検討します。gemの Parallel を使ってみることにしました。ただ、どの程度改善するか読めないところもあり、実測した記録がありますので、残しておきます。

Parallel の導入

https://github.com/grosser/parallel

導入はシンプルですぐ終わりました。先の Cloud Tasks へ投入する部分を、以下のように書き換えます。

  BULK_ENQUEUE_TASKS_THREADS = ???
  
  def enqueue
    tasks = CloudTasks.new
+    User.all.select(:id).find_in_batches do |items| # ①
+      Parallel.each(items, in_threads: BULK_ENQUEUE_TASKS_THREADS) do |item| # ②
        create_params = {
          queue: "send-mail",
          target_path: "/mail/sender",
          payload: {
            user_id: user.id,
            user_name: user.name,
            email: user.email
          }
        }
        tasks.create(**create_params)
      end
    end
  end
  1. Parallel は配列を受け取って並列処理を行うため、find_eachfind_in_batches に書き換えました
  2. スレッド数を指定します。この値を決めたいです

実行環境

項目名
Google App Engine Flexible インスタンス cpu:1, memory_gb: 2, disk_size_gb: 10
Railsバージョン 6.1
Parallel バージョン 1.20.1
RAILS_MAX_THREADS 5

実測

処理速度そのものはお使いの環境や条件によって大きく変わります。実測値は参考程度でお願いします。

では、Parallel の論理スレッド数を変化させて計っていきます。

1万件投入に要した時間(秒)

論理スレッド数 1回目 2回目 3回目 4回目 5回目 中央値(秒)
Parallel 未使用 900.00 900.00(15分)
5 159.07 159.12 160.66 160.97 161.62 160.66
20 40.45 40.45 40.45 40.66 40.15 40.45
50 17.21 17.21 18.52 18.52 18.52 18.52
100 11.67 11.86 11.76 11.39 12.17 11.76
500 11.82 10.90 11.73 12.50 10.89 11.73
1000 13.50 13.50 12.78 12.98 12.59 12.98

めちゃくちゃ速くなってますね。正直驚きました。15分がなんとか半分くらいになってくれると嬉しいなというところだったのですが、まさかの12秒くらいで完了してしまいました。論理スレッド数 100 ~ 500 がスループットの上限のようで、そこからはスレッド生成のオーバーヘッドのほうが上回ってしまうようです。今回はスレッド数100で設定しました。

Cloud Tasks 側は大丈夫なの?

タスクを漏れなく投入できてるの?と不安になるかもしれません。問題なく全投入できています。どれだけスレッド数を増やしても1万件登録されています。マルチスレッドで効率化できた点もすごいですが、それに全く動じない Cloud Tasks もすごいですね。レートリミットなどのドキュメントを探したのですが、投入側に関する上限は特に見当たりませんでした。なにか情報持っている方いらっしゃいましたら教えて下さい。[3]


Cloud Tasks に1万件投入できた状態

おわりに:Parallel の採用で 900秒の投入時間が12秒に

単純計算で1万件の投入に15分かかると、4万件になったらタイムアウトを迎えてしまいます。マルチスレッドを活用することで12秒にまで短縮できたので、しばらくは投入時間のタイムアウトを気にしなくて良さそうです。クラウド・コンテナの流れで負荷の水平スケールがトレンドと言えますが、少し立ち返るのもいいな、というのが今回の収穫でした。いかにしてデータを割るかの前に、マルチスレッド・マルチプロセスでマシンリソース限界まで使ってみるのも面白いと思います。

参考

https://www.xmisao.com/2018/07/22/how-to-use-ruby-parallel-gem.html

https://zenn.dev/nananaoto/articles/bd1584c77e46f128a41a
脚注
  1. Cloud Tasks 側に batch_send 的なAPIがあると嬉しいのですが、今の所なさそうです ↩︎

  2. HTTPSでやりとりできるようにしておけば、管理画面からリクエストしたり、チャットボットと統合もできますね ↩︎

  3. コメント で教えていただきました。6,000,000r/m(100,000r/s) だそうです。 ↩︎

Discussion

レートリミットなどのドキュメント

https://cloud.google.com/tasks/docs/quotas
このページにある API requests がそれだと思います。
6,000,000r/m(100,000r/s)なので、まだまだ大丈夫ですね!

ほんとだ、たしかに記載あります!これが投入側のリミットの話なのですね。見落としてました。ありがとうございます。

ログインするとコメントできます