🦀

Rustによる並列処理でDynamoDBへのデータ投入を20倍高速化してみた

2023/11/24に公開

はじめに

言語として高速だと謳われているRust。そのRustを使用してDynamoDBへのデータ登録処理を直列処理と複数の並列アルゴリズム処理で速度比較してみました。

DynamoDB

DynamoDBは公式で以下のように謳われています。

Amazon DynamoDB の応答時間は 1 桁ミリ秒で、最も要求の厳しいアプリケーションでも一貫してこのパフォーマンスを発揮できます。例を挙げると、2022 年の Amazon プライムデーに Amazon DynamoDB は、1 桁ミリ秒のパフォーマンスで、数兆回の API コールに対して 1 秒あたり 1 億 520 万件のリクエストを確実に処理しました。

https://aws.amazon.com/jp/dynamodb/features/#:~:text=Amazon DynamoDB の応答時間,パフォーマンスを発揮できます。

上記だけみると爆速のようにも思われますが、読み込みと書き込み双方に以下の制限があります。
https://docs.aws.amazon.com/ja_jp/amazondynamodb/latest/developerguide/ServiceQuotas.html#limits-api

BatchWriteItemのデータ投入は1回で25リクエストまで、Queryのデータ取得は1回の呼び出しあたり1MBまでという制限があります。このため少量のデータについては爆速を誇りますが、大量のデータを扱うOLTP系のシステムには不向きという特徴があります。

以前DynamoDBの技術選定をした時に、並列化したらどれくらいの速度が出るんだろう?という疑問をもち、今回時間が取れた為に試してみた感じになります。

前提条件

今回データ投入をBatchWriteItemで25件ずつ行います。
投入データはid列(primary key)とsork key列のみの最小限のデータ構造となっています。
モードは従量課金モードで行い、結果整合性(トランザクション)は考慮外とします。

またそれぞれの並列処理から共有で使用する変数の参照はありません。
(排他制御ロックして変数参照することがない)

このようにトランザクションを考慮しなかったり、排他制御がなかったりと実業務では要件を満たさない可能性がある処理で性能検証を行っていることをご留意ください。

並列処理

並列処理については以下の処理を試しています。

  • チャンネル(Channels)
    • アイテムの数に基づいてチャネルを作成し、各チャンク(25個のアイテムごと)に対してメッセージを送信。レシーバーはメッセージの受信を集約して並列に実行する。CSP(Communicating Sequential Processes)モデルがこれに該当する。
  • フォークジョイン(Fork-Join)
    • タスクを小さなサブタスクに分割し(フォーク)、それらを並列に処理する。処理が完了したら、結果を統合する(ジョイン)。Javaのjava.util.concurrentパッケージなどで利用されている。
  • 並列ループ(Parallel Loops)
    • ループの各イテレーションを並列に実行する。OpenMPなどの並列処理ライブラリでサポートされている。
  • マップリデュース(MapReduce)
    • ReduceステップでMapステップで生成された結果を統合し、最終結果を生成する。Hadoopなどのビッグデータ処理で広く使用されている。
  • ワーカー/マスターパターン(Worker/Master Pattern)
    • マスタープロセスがタスクを管理し、ワーカープロセスがこれらのタスクを実行する。

ソース

https://github.com/chikugoy/dynamodb_client_rs/

結果

データの投入は以下のような結果になりました。それぞれ1000件のデータ投入を10回試行した時の平均時間(ms)となっています。

処理 実行時間(ms)
直列処理 619
Channel parallel processing 71
Fork join parallel processing 38
Map reduce parallel processing 43
Parallel loop processing 146
Worker/Master Pattern 46

最速のFork joinとかだったら直列の20倍くらいはやいです。千件で38msなら、1万件で370ms。100万件なら38秒とRDBに比べてもかなり早い!

と思いましたが、1万件のデータ投入をしてみると約20%前後くらいのデータの処理落ちが発生しました。

以下コードのように処理落ちしたコードを取得して再実行することは可能ですが、unprocessed_itemsがある場合はawsにデータを再度参照にいくため、直列の10倍くらい遅くなってしまいます。

https://github.com/chikugoy/dynamodb_client_rs/blob/5cda81ed5af4c70ade090706124514313157547b/src/module/aggregate_result.rs#L36-L51

スリープ

データの処理落ちを解決させる為、10000件のfork joinでの並列登録処理に1から500msの間でランダムにスリープさせる処理を入れてみました。

use rand::Rng;
use rand::rngs::StdRng;
use rand::SeedableRng;
use std::sync::{Arc, Mutex};
use tokio::time::{sleep, Duration};
...

    let rng = Arc::new(Mutex::new(StdRng::from_entropy()));

    for chunk in numbers.chunks(25).map(|c| c.to_vec()) {
        let rng_clone = Arc::clone(&rng);
        let client = client.clone();
        let task = task::spawn(async move {
            let mut requests = Vec::new();
            for i in &chunk {
                requests.push(generate_request::create_write_request(i.to_string(), "SortKeyValue".to_string()));
            }

            let number = {
                let mut rng_guard = rng_clone.lock().unwrap();
                let number = rng_guard.gen_range(1..=500);
                drop(rng_guard);
                number
            };

            sleep(Duration::from_millis( number as u64)).await;

            client
                .batch_write_item()
                .request_items("books".to_string(), requests)
                .send()
                .await
        });

        tasks.push(task);
    }
    

StdRng::from_entropy()は、システムエントロピーを使用して乱数生成器をシードします。この乱数生成器はMutexでラップされ、さらにArc(Atomic Reference Counted)で包まれています。Mutexは同時に一つのスレッドのみがデータにアクセスできるようにし、Arcは複数のスレッド間で安全にデータを共有できるようにします。

let rng = Arc::new(Mutex::new(StdRng::from_entropy()));

下記ブロックではrng_clone.lock().unwrap()を呼び出して、Mutexをロックし、MutexGuardを取得して同期的にスレッドセーフに動作させます。乱数生成後は即座にdropによって明示的にMutexのロックを開放しています。

let number = {
    let mut rng_guard = rng_clone.lock().unwrap();
    let number = rng_guard.gen_range(1..=500);
    drop(rng_guard);
    number
};

これらのsleep処理を入れてみましたが、約20%前後くらいのデータの処理落ちに変化はありませんでした。

そこで現状1〜500msとなっているsleep間隔を調整してみます。
色々調整してみた所、以下の間隔だと1万件のデータ投入でもデータの処理落ちがなく成功しました。
かかった時間は2026msです。

let number = rng_guard.gen_range(100..=2000);

直列で1万件投入してみると5163msかかりました。並列(2026ms)に比べると2倍くらい遅いです。

このsleep処理を入れ込んだ状態で、fork joinの並列登録処理で10万件投入してみます。
以下の結果となりました。

Execution time: 4353ms
Number of successful items: 16405
Number of unprocessed items: 82070

実行時間は遅くないですが、8割以上が処理落ちという結果になっています。

下記のように直列で10万件投入してみると処理に1分近くかかりました。大体1万件投入の10倍くらいですね。

Execution time: 61232ms

現状は並列処理の全体でまるっとsleepがかかっているので、この実行粒度を変えれば件数が多くなっても成功すると思われます。1000件の並列では成功するので1000件の並列を直列で実行するように調整してみます。

fork_join_loopを追加して10回直列で繰り返すようにしました。

         Some("fork_join") => module::parallel::fork_join::batch_write_items(&client, item_count).await?,
        Some("fork_join_loop") => {
            for _ in 0..10 {
                module::parallel::fork_join::batch_write_items(&client, item_count).await?
            }
        },

実行時間は2秒ほどで終了していますが、処理落ちも発生しています。

$ cargo run fork_join_loop 1000
Execution time: 213ms
Number of successful items: 1000
Execution time: 30ms
Number of successful items: 1000
Execution time: 24ms
Number of successful items: 1000
Execution time: 19ms
Number of successful items: 1000
Execution time: 862ms
Number of successful items: 642
Number of unprocessed items: 358
Execution time: 20ms
Number of successful items: 1000
Execution time: 22ms
Number of successful items: 1000
Execution time: 28ms
Number of successful items: 1000
Execution time: 949ms
Number of successful items: 690
Number of unprocessed items: 310
Execution time: 23ms
Number of successful items: 1000
All Execution time: 2217ms

処理間隔の調整の為、直列でのループの中にsleepを入れてみます。

        Some("fork_join_loop") => {
            for _ in 0..10 {
                sleep(Duration::from_millis(200)).await;
                module::parallel::fork_join::batch_write_items(&client, item_count).await?
            }
        },

200msの実行間隔を取るようにしてみます。
実行結果は以下のように処理落ちがなく実行できました。

$ cargo run fork_join_loop 1000
Execution time: 189ms
Number of successful items: 1000
Execution time: 24ms
Number of successful items: 1000
Execution time: 40ms
Number of successful items: 1000
Execution time: 21ms
Number of successful items: 1000
Execution time: 20ms
Number of successful items: 1000
Execution time: 26ms
Number of successful items: 1000
Execution time: 26ms
Number of successful items: 1000
Execution time: 19ms
Number of successful items: 1000
Execution time: 37ms
Number of successful items: 1000
Execution time: 39ms
Number of successful items: 1000
All Execution time: 2484ms

ループ回数を100回に変更して、10万件の登録にトライしてみます。

Execution time: 208ms
Number of successful items: 1000
Execution time: 18ms
Number of successful items: 1000
Execution time: 20ms
Number of successful items: 1000
Execution time: 26ms
Number of successful items: 1000
Execution time: 19ms
Number of successful items: 1000
Execution time: 20ms
Number of successful items: 1000
Execution time: 30ms
Number of successful items: 1000
Execution time: 29ms
Number of successful items: 1000
Execution time: 25ms
Number of successful items: 1000
Execution time: 19ms
Number of successful items: 1000
Execution time: 20ms
Number of successful items: 1000
Execution time: 19ms
Number of successful items: 1000
Execution time: 21ms
Number of successful items: 1000
Execution time: 19ms
Number of successful items: 1000
Execution time: 19ms
Number of successful items: 1000
Execution time: 20ms
Number of successful items: 1000
Execution time: 21ms
Number of successful items: 1000
Execution time: 20ms
Number of successful items: 1000
Execution time: 19ms
Number of successful items: 1000
Execution time: 20ms
Number of successful items: 995
Number of unprocessed items: 5
Execution time: 19ms
Number of successful items: 957
Number of unprocessed items: 43
Execution time: 22ms
Number of successful items: 956
Number of unprocessed items: 44
Execution time: 21ms
Number of successful items: 958
Number of unprocessed items: 42
Execution time: 22ms
Number of successful items: 950
Number of unprocessed items: 50

処理途中からデータ落ちが発生しだしたので、処理を途中で終了させます。
この結果より一定の処理間隔を空ければOKではなく、単位時間辺りの試行回数の制限が単位時間が大きくなるごとにきつくなっていると思われます。

200msから300ms処理間隔を開けるようにしてみます。
今度はデータ落ちは発生せずに30秒ほどで終了しました。

All Execution time: 32478ms

まとめてみると以下の結果となりました。

処理 件数 実行時間(ms)
直列処理 1万件 5,163
10万件 61,232
Fork join 1万件 2,026
10万件 32,478

1万件以上になると並列に実行しても直列の大体半分くらいの実行時間に抑えられるみたいです。
データ落ちのリスクを勘案すると、登録データ量が多くなると並列で実行するメリットは少ないという結果になりました。

Rust vs Ruby

プログラミング言語の速度目安によるとRubyはインタープリタ言語で遅い言語の部類に入り、最速の部類のRustより10〜100倍遅いと記述されています。

そのRubyでRustと同じように並列でDynammoDBに1000件のデータ登録をした時に何秒かかるのか比較してみました。

require 'aws-sdk-dynamodb'
require 'concurrent'

client = Aws::DynamoDB::Client.new(
  region: 'ap-northeast-1',
  access_key_id: 'dummy_id',
  secret_access_key: 'dummy_key'
)

item_count = 1000
batch_size = 25

# DynamoDBにデータを投入するメソッド
def write_to_dynamodb(client, chunk)
  requests = chunk.map do |i|
    { put_request: { item: { "id" => i.to_s, "sort" => "SortKeyValue" } } }
  end

  client.batch_write_item(request_items: { 'books' => requests })
end

start_time = Time.now

# データをバッチに分割して並列処理
futures = (0...item_count).each_slice(batch_size).map do |chunk|
  Concurrent::Future.execute { write_to_dynamodb(client, chunk) }
end

# すべてのタスクが完了するのを待つ
futures.each(&:wait)

end_time = Time.now
puts "Execution time: #{end_time - start_time} seconds"

Rubyのversionは以下で、YJITは有効化された状態で実行しています。

# ruby -v
ruby 3.2.2 (2023-03-30 revision e51014f9c0) +YJIT [x86_64-linux]
# ruby client.rb 
Execution time: 0.496955628 seconds

結果は以下の通りで、RustはRubyの10倍前後の速度を出せているようです。
実行する前は、「いやいや10倍は流石にないでしょ」と思っていたので、予想外の結果でした。

言語 実行時間(ms)
Ruby 496
Rust 38

まとめ

結果として、DynamoDBで大量のデータを投入する場合、大人しく直列でやったほうが無難そうという結果となりました。

Rustによる並列処理は、最大で直列の20倍の高速化という夢がある結果となりましたが、他にCPUなどのリソースを使用する処理がない無風状況化での検証であるため、本番運用を想定するような性能検証は並列をさらに並列で実行させるような検証を行う必要があるかなと思います。

参考

https://www.amazon.co.jp/dp/4873119782

https://www.amazon.co.jp/dp/4873119596

https://blog.cloud-partner.jp/dynamodb-parallel-query-with-lambda/

https://zenn.dev/dyoshikawa/articles/dad27d49ade45a

https://zenn.dev/kboss/articles/f94e7ff5d80494

Discussion