🙆‍♀️

移行業務の改善 その2 並列処理による高速化

2023/08/01に公開

はじめに

弊社では ecforce というシステムをSaaSという形でサービスを提供しています。
ecforceの導入を希望されるクライアント様には、大きく2パターンがあります。

  • 新規で ECを立ち上げたいというクライアント様
  • 別のカートシステムを現在使っており、ecforceに乗り換えたいというクライアント様

後者につきまして、当然ながら顧客や注文データをリセットするわけにもいかないので、 既存データも引き継ぎたい という要望をいただくことが大多数です。
ただ他社のカートシステムとecforceではデータ構造が異なるので、データを抽出してそのままecforceのデータベースにインポートできるわけではありません。

この後者の要望を叶えるためにはデータを

  1. 抽出し
  2. 加工した上で
  3. ecforceのデータベースにインポートする

という手順を踏む必要があります。
これを社内では 移行業務 と呼んでおり、弊社にはこの移行業務を行う専門チームがあります。

移行業務は当初、注文件数が数十万件程度でしたので、特に何も仕組み化せずプレーンなRailsスクリプトのみで行っていました。
並列処理やSQLのチューニングもしていない状態です。
ただ注文件数が600万件レベルの当時最大規模の案件を対応するにあたり、SQLや仕組みの根本改善が必要となりました。
今回は 並列処理による高速化 という観点で話を進めていきます。

本題

並列処理と並行処理

まず並行処理(マルチスレッド)とは、1つのプロセスで擬似的に並列にしているような仕組みのことを指します。
ecforceのデータベースにインポートする際にはRubyを用いており、Rubyには Thread という並行処理を実現するためのクラスが用意されています。
ただし並行処理では、高速化はほぼ実現できません。
Ruby(や Python)は GIL と言われる、 グローバル領域を互いのスレッド同士が書き込まないよう、排他的にロックする仕組み が搭載されています。
これによって、例えばスレッドが4つあったとして、1つのスレッドが処理している間は、他の3スレッドはGILによって待機状態になってしまいます。
つまりスレッドを複数立てても、実際に稼働しているスレッドは1つというわけです。
またマルチスレッドは1プロセスしか生かすことができない仕組みなため、コンピュータのスペックを上げ、例えば8コアにしたとしても高速化には繋がらず、宝の持ち腐れ状態になってしまいます。

高速化を実現するためには、コンピュータに搭載されているコア数を活かし、同時に処理をさせる必要があります。
それを実現するための仕組みが並列処理(マルチプロセス)です。
並列処理(マルチプロセス)を実現するには、システムコール fork(2) を使ってプロセスをコピーし、子プロセスを増やします。
これをRuby上で実現するライブラリとして parallel が挙げられます。

並列処理するにあたり気をつけないといけないこと

親プロセスが確保したメモリ領域は、子プロセスのメモリ領域へのコピーが発生する

親プロセスが確保したメモリ領域は、子プロセスのメモリ領域へのコピーが発生します。
これにより一気に子プロセスのメモリが消失する可能性があるため、メモリを意識してコードを書く必要があります。

実際に近いコードを以下に展開します。
例えば100万件の注文データをインポートする場合、以下のような処理の流れになります。

  1. 親プロセスが100万件分のメモリを消費する
  2. 2000個を塊(チャンク) として、各子プロセスに渡す
  3. システムコール fork(2) によって子プロセスを16個コピーする
  4. 親子プロセス間で通信の経路 (ソケットのようなもの) を開いて、そこでデータの受け渡しをする
  5. このとき親プロセスは100万件分のメモリを消費したまま、各子プロセスはここで2000件分のメモリを消費
  6. ブロック内で変数定義やオブジェクトの生成をすると、さらに各子プロセスのメモリを消費する
# 親プロセスの領域
slice_size = 2000
chunks = csv.each_slice(slice_size).to_a # chunks: 1,000,000[rows]

parallel_map(chunks, progress: "Migrating EcForce::Order", parallel: 16) do |chunk|
  # 子プロセスの領域
  ActiveRecord::Base.transaction do
    models = []

    chunk.each do |r|
      models << EcForce::Order.new(r.to_hash)
    end

    EcForce::Order.import models, validate: false
  end
end

ちなみにメモリ不足に陥ると、OOM Killerによって子プロセスが殺され、 Parallel::DeadWorker という例外が返却されます。
本来は子プロセスが死んでも他は生きているはずですが、Gem parallel の仕様で、子プロセスから親プロセスに通知が行くような設計になっています。

`rescue in work': Parallel::DeadWorker (Parallel::DeadWorker)

またチャンクの件数 2000 という数字は、長年様々な値を試して一番最適とされた値です。
多すぎると Parallel::DeadWorker が発生しやすくなり、少なすぎるとインポートの時間が遅くなってしまいます。

親子でのオブジェクトをコピーしたものを更新するとCopy on Writeでオブジェクトがコピーされる

今回のインポートとは少し関係ないですが、Linuxには Copy on Write という機能がありますので、紹介します。
子プロセスから親プロセスのメモリーを参照できるのですが、そのメモリーにあるオブジェクトの値を変更する等の操作をすると、親プロセスから子プロセスへコピーがされます。

# 親プロセスの領域
slice_size = 2000
chunks = EcForce::Order.all.each_slice(slice_size).to_a # chunks: 1,000,000[rows]

parallel_map(chunks, progress: "Order Update", parallel: 16) do |chunk|
  # 子プロセスの領域
  ActiveRecord::Base.transaction do
    chunk.each{ |order| order.update(memo01: 'test') }
  end
end

データの順序や状態を持たせることはできない

並行処理でデータをインポートするということは、子プロセス同士どのようなデータがいるかなどのお互いの状況は知りません。
例えば以下のような処理をしたい場合、データのインポート時点では計算できないというわけです。

  • 顧客が通算で何回購入したか、やトータルでいくら購入かを計算する
  • 対象の受注が今定期何回目かを計算する

もしこのような計算をしたい場合、データをインポートした後に別のスクリプトを実行してデータを一括更新する必要があります。

またインポート順序を気にするような取り込みもできません。
例えばテーブルのidが Auto Increment で、かつIDを指定せずにバルクなINSERTでインポートする場合、idはインポートした順に割り当てられます。
さらにインポートし直すたびにidは変わるため、冪等性がないとも言えます。

まとめ

今回は並行処理と並列処理の違いを説明した上で、高速化を実現する上で必要な並列処理について記載しました。
今回の内容に加えて、前回のSQLの高速化と掛け合わせることで、データのインポートが劇的に改善しました。
次回はデータの加工とインポートの役割分担について書いていきます。

SUPER STUDIOの採用について

SUPER STUDIOでは、エンジニアを採用しています。
少しでも興味がありましたら、以下をご覧ください。
https://hrmos.co/pages/superstudio/jobs/0000400
https://hrmos.co/pages/superstudio/jobs/0000404

昨年12月に9期目を迎えたSUPER STUDIOのキックオフイベントで社内表彰されたエンジニア受賞インタビュー記事です。よりSUPER STUDIOのエンジニア組織を理解できる内容となっておりますので、ご一読ください。
https://www.wantedly.com/companies/super-studio/post_articles/497997
https://www.wantedly.com/companies/super-studio/post_articles/487617

SUPER STUDIOテックブログ

Discussion