💟

Indexed Job でバッチ処理を並列化する

2024/05/27に公開

課題

バッチ処理を並列実行したい場合 (複数の Pod で処理を行いたい場合)、きちんと設計してあげないと以下のようなことが起こる可能性があります。

  • トランザクションが競合してロック待機が生じてしまい、結果的に全く効率的に並列処理できない
  • 重複処理が発生し、それによってデータの不整合やリソースの無駄が生じる

今回はそのような問題を Indexed job を用いて解決したいと思います。

アプローチ

アプローチとしては以下のようにそれぞれ Pod に与えられた Index によって処理担当範囲を分けるという形です。並列バッチを実行する上では基本的な方針です。

ポイントは以下 2 つです。

  • DB と AP (Job) が疎結合になっている。
  • 担当処理範囲がバランシング (分散) される。

DB と AP (Job) が疎結合になっていれば、基本的にはどちらかに変更を加えた場合もう一方を変更する必要がなく、変更容易性が担保できます。例えば、今回のやり方とは別に「indexごとにテーブルをわける」, 「index用のカラムを持たせる」などの方法も考えましたが、どちらも AP (Job) に変更加える場合、DB側にも変更を加える必要がでてきます。(例えば、Jobの並列実行数を変更したい場合、テーブル数を増減させたり、index用カラムに入れる値の範囲も変更する必要がでてきます。)今回の場合は PK (勝手に UUID としていますが)の先頭文字を利用して振り分ける形にしており、特にDBに対して Job に振り分ける用の実装は行なっていません。また、このような構成にしておけば、既存のシステムにバッチの機構を機能追加する際も DB への変更しなくて良いのでその点も嬉しいです。

担当処理範囲がバランシング (分散) されるかどうかについてですが、これは、担当範囲が特定の Pod に偏ってしまうことがないようにということですね。今回は UUID の構成用の中でもランダム性が担保されている Node ID を利用して分散させるようにしています。

UUID の構成はおおまかに以下のようになっており、

UUID versoion 4 では Clock Sequence と Node がランダムまたは擬似ランダムに生成されるようになっています。今回は Node ID の先頭文字を利用して担当範囲を分けるような形にしています。(例えば並列実行数 3 の場合、Job1 は Node ID の先頭文字が 0-4 のうちどれかから始まるものを, Job2 は 5-9, Job3 は a-f のような形です。)

For UUID version 4, the node field is a randomly or pseudo-randomly
generated 48-bit value as described in Section 4.4.

https://datatracker.ietf.org/doc/html/rfc4122#section-4.1.6

一応本アプローチは PK に UUID を利用している場合、特に追加実装必要ないので嬉しいですが、UUID が16進数なので最大並列実行数は 16 となってしまう点、拡張性に課題はあるので並列実行数をそれ以上にしたい場合は別の方式を考える必要がありそうです。ただ、マシンリソースを調整したり、Cron の間隔を短くしてあげれば余程のことでない限り事足りる気はするので、その点どうかは見てあげたいです。

試してみる

実際に Indexed Job を利用してどの程度処理を効率化できるのか試してみました。

その前に

Indexed Job を試してみる前に、本当に UUID の Node ID 部分できちんと分散できるかを確かめてみます。全体のレコード数は 1000 件です。

文字 レコード数
0 59
1 71
2 64
3 51
4 72
5 56
6 58
7 50
8 50
9 75
a 64
b 56
c 59
d 80
e 71
f 61
レコード数カウント SQL
count_records_each_string.sql
-- 全体のレコード数をカウント
SELECT COUNT(*) FROM Sales;

-- 各文字のレコード数をカウント
SELECT COUNT(Id)
    FROM Sales 
    WHERE SUBSTR(Id, 25, 1) = '0';

SELECT COUNT(Id)
    FROM Sales 
    WHERE SUBSTR(Id, 25, 1) = '1';

SELECT COUNT(Id)
    FROM Sales 
    WHERE SUBSTR(Id, 25, 1) = '2';

SELECT COUNT(Id)
    FROM Sales 
    WHERE SUBSTR(Id, 25, 1) = '3';

SELECT COUNT(Id)
    FROM Sales 
    WHERE SUBSTR(Id, 25, 1) = '4';

SELECT COUNT(Id)
    FROM Sales 
    WHERE SUBSTR(Id, 25, 1) = '5';

SELECT COUNT(Id)
    FROM Sales 
    WHERE SUBSTR(Id, 25, 1) = '6';

SELECT COUNT(Id)
    FROM Sales 
    WHERE SUBSTR(Id, 25, 1) = '7';

SELECT COUNT(Id)
    FROM Sales 
    WHERE SUBSTR(Id, 25, 1) = '8';

SELECT COUNT(Id)
    FROM Sales 
    WHERE SUBSTR(Id, 25, 1) = '9';

SELECT COUNT(Id)
    FROM Sales 
    WHERE SUBSTR(Id, 25, 1) = 'a';

SELECT COUNT(Id)
    FROM Sales 
    WHERE SUBSTR(Id, 25, 1) = 'b';

SELECT COUNT(Id)
    FROM Sales 
    WHERE SUBSTR(Id, 25, 1) = 'c';

SELECT COUNT(Id)
    FROM Sales 
    WHERE SUBSTR(Id, 25, 1) = 'd';

SELECT COUNT(Id)
    FROM Sales 
    WHERE SUBSTR(Id, 25, 1) = 'e';

SELECT COUNT(Id)
    FROM Sales 
    WHERE SUBSTR(Id, 25, 1) = 'f';

1000/16 = 62.5 なので良い感じに分散されていそうです。

シナリオ

シナリオ 説明
単一 単一の Pod で処理する。
並列 複数の Pod で並列処理する。特に各 Pod の担当処理範囲を分けことはしていない。並列実行数は 16 とする。
並列 - Indexed 複数の Pod で並列処理する。Indexed Job を利用して、各 Pod の担当処理範囲を分ける。並列実行数は 16 とする。

その他

  • レコード数: 1000 件
  • 処理内容
    • 読み取り
    • 各レコードごとに 100ms sleep (外部サービスとの連携を想定)
    • 全レコード結果書き込み

また、具体的な AP 処理内容や Pod リソース、各種実装は以下リポジトリを参照ください。
https://github.com/ppluuums-jp/indexed-job

結果

結果としては以下の通り、Indexed Job を利用して並列に処理担当を分けた場合、単一実行の場合と比較して 13分の1 程度の処理時間に収まりました。16並列で実行したので、単純に考えれば16分の1の処理時間になりそうですが、Cloud Spannerへの負荷もその分もかかるので若干レイテンシが出たのかなと思います。全体として見れば大きくバッチ処理時間を短縮できました。

また、処理担当範囲を分けていない並列処理の場合、今回の実装では予想外に特にトランザクションの競合による abort は発生しませんでした。しかし、当然ですが、処理担当範囲をわけていないので、重複処理は発生しています。その分 k8s や Spanner のリソースを無駄遣いしてしまっています。

シナリオ 処理時間 COMMIT_ATTEMPT_COUNT COMMIT_ABORT_COUNT AVG_COMMIT_LATENCY_SECONDS COMMIT_RETRY_COUNT
単一 101.238s 1 0 0.012092027813196182 0
並列 101.552s (102.548s) 16 0 0.013781769201159477 0
並列 - Indexed 7.6355s (9.489s) 16 0 0.004317594226449728 0

*並列シナリオの処理時間は平均処理時間を記載。最も時間のかかった Pod のものを () 内に記載。

作業 SQL
debug.sql
---
SELECT count(*) FROM Sales;
---
UPDATE Sales
SET IsDone = FALSE
WHERE IsDone = TRUE;
---
SELECT count(*) FROM Sales WHERE IsDone = TRUE;
---
DROP TABLE Sales;
---
CREATE TABLE Sales (
    Id STRING(36) DEFAULT (GENERATE_UUID()),
    Name STRING(MAX),
    IsDone Bool DEFAULT (False)
) PRIMARY KEY (Id);
---


--- 16:26: serial
--- INFO 2024-05-27T07:35:00.974540432Z [resource.labels.containerName: worker] indexed job: 1:41.238 (m:ss.mmm)
SELECT
  *
FROM
  SPANNER_SYS.TXN_STATS_TOTAL_10MINUTE;
---
-- COMMIT_ATTEMPT_COUNT
-- COMMIT_ABORT_COUNT
-- AVG_COMMIT_LATENCY_SECONDS
-- COMMIT_RETRY_COUNT
-- 1
-- 0
-- 0.012092027813196182
-- 0

--- 16:50: parallel (16) indexed ---
--- INFO 2024-05-27T07:51:28.450202463Z [resource.labels.containerName: worker] indexed job: 7.353s
--- INFO 2024-05-27T07:51:29.130909766Z [resource.labels.containerName: worker] indexed job: 5.836s
--- INFO 2024-05-27T07:51:30.809836693Z [resource.labels.containerName: worker] indexed job: 6.940s
--- INFO 2024-05-27T07:51:31.149085280Z [resource.labels.containerName: worker] indexed job: 8.365s
--- INFO 2024-05-27T07:51:44.501828752Z [resource.labels.containerName: worker] indexed job: 8.715s
--- INFO 2024-05-27T07:51:44.826816184Z [resource.labels.containerName: worker] indexed job: 6.990s
--- INFO 2024-05-27T07:51:45.228491579Z [resource.labels.containerName: worker] indexed job: 6.733s
--- INFO 2024-05-27T07:51:45.370318447Z [resource.labels.containerName: worker] indexed job: 8.743s
--- INFO 2024-05-27T07:51:45.635838040Z [resource.labels.containerName: worker] indexed job: 7.678s
--- INFO 2024-05-27T07:51:45.810083504Z [resource.labels.containerName: worker] indexed job: 9.489s
--- INFO 2024-05-27T07:51:45.840324766Z [resource.labels.containerName: worker] indexed job: 8.592s
--- INFO 2024-05-27T07:51:46.547308083Z [resource.labels.containerName: worker] indexed job: 6.828s
--- INFO 2024-05-27T07:51:46.570282403Z [resource.labels.containerName: worker] indexed job: 7.014s
--- INFO 2024-05-27T07:51:47.016316638Z [resource.labels.containerName: worker] indexed job: 7.280s
--- INFO 2024-05-27T07:51:47.017240225Z [resource.labels.containerName: worker] indexed job: 7.901s
--- INFO 2024-05-27T07:51:47.479320786Z [resource.labels.containerName: worker] indexed job: 7.711s
---
SELECT
  *
FROM
  SPANNER_SYS.TXN_STATS_TOTAL_10MINUTE;
---
-- COMMIT_ATTEMPT_COUNT
-- COMMIT_ABORT_COUNT
-- AVG_COMMIT_LATENCY_SECONDS
-- COMMIT_RETRY_COUNT
-- 16
-- 0
-- 0.004317594226449728
-- 0

-- 17:13: parallel (16) ---
-- INFO 2024-05-27T08:13:29.123649472Z [resource.labels.containerName: worker] indexed job: 1:41.281 (m:ss.mmm)
-- INFO 2024-05-27T08:13:29.128432162Z [resource.labels.containerName: worker] indexed job: 1:41.297 (m:ss.mmm)
-- INFO 2024-05-27T08:13:36.240241724Z [resource.labels.containerName: worker] indexed job: 1:41.809 (m:ss.mmm)
-- INFO 2024-05-27T08:13:36.549657984Z [resource.labels.containerName: worker] indexed job: 1:41.518 (m:ss.mmm)
-- INFO 2024-05-27T08:13:36.720948011Z [resource.labels.containerName: worker] indexed job: 1:41.508 (m:ss.mmm)
-- INFO 2024-05-27T08:13:36.925296831Z [resource.labels.containerName: worker] indexed job: 1:41.391 (m:ss.mmm)
-- INFO 2024-05-27T08:13:37.431115399Z [resource.labels.containerName: worker] indexed job: 1:41.119 (m:ss.mmm)
-- INFO 2024-05-27T08:13:41.640130213Z [resource.labels.containerName: worker] indexed job: 1:41.731 (m:ss.mmm)
-- INFO 2024-05-27T08:13:43.239867157Z [resource.labels.containerName: worker] indexed job: 1:42.548 (m:ss.mmm)
-- INFO 2024-05-27T08:13:43.693843196Z [resource.labels.containerName: worker] indexed job: 1:42.079 (m:ss.mmm)
-- INFO 2024-05-27T08:13:44.498824943Z [resource.labels.containerName: worker] indexed job: 1:41.628 (m:ss.mmm)
-- INFO 2024-05-27T08:13:44.770492825Z [resource.labels.containerName: worker] indexed job: 1:41.939 (m:ss.mmm)
-- INFO 2024-05-27T08:13:45.580262146Z [resource.labels.containerName: worker] indexed job: 1:41.648 (m:ss.mmm)
-- INFO 2024-05-27T08:13:46.060149674Z [resource.labels.containerName: worker] indexed job: 1:41.305 (m:ss.mmm)
-- INFO 2024-05-27T08:13:46.076869894Z [resource.labels.containerName: worker] indexed job: 1:41.044 (m:ss.mmm)
-- INFO 2024-05-27T08:13:46.163547838Z [resource.labels.containerName: worker] indexed job: 1:40.994 (m:ss.mmm)
---
SELECT
  *
FROM
  SPANNER_SYS.TXN_STATS_TOTAL_10MINUTE;
---
-- COMMIT_ATTEMPT_COUNT
-- COMMIT_ABORT_COUNT
-- AVG_COMMIT_LATENCY_SECONDS
-- COMMIT_RETRY_COUNT
-- 16
-- 0
-- 0.013781769201159477
-- 0

さいごに

今回は Indexed job を用いてバッチ処理の並列化を試してみました。もともとは Pod 間通信が必要なモデルトレーニングのような重たいワークロード用の機能みたいですが、効率の良い並列処理のためにも使えそうということがわかりました。1 Pod では辛いバッチワークロードにぜひ利用してみてください。

Discussion