📦

DynamoDB ExcluesiveStartKeyでページネーションしながら並列スキャン

2022/07/17に公開

DynamoDB 並列スキャン(Parallel Scan)というものがあるのを知りました。

アイテム数の多いDynamoDBテーブルを対象にバッチ処理をする際の高速化手段として便利に使えそうです。特に実行時間に15分制限があるAWS Lambdaバッチにおいて重宝しそうに思いました。

ExcluesiveStartKey を使ったページネーション(Pagination)を伴うサンプルコードがあまり見つからなかったので、試した結果をサンプルコードつきで紹介したいと思います(Node.js + TypeScript + AWS SDK V3)。

環境

  • M1 Mac
  • node 16.15.0
  • typescript 4.7.4
  • esbuild 0.14.48
  • esbuild-register 3.3.3
  • @aws-sdk/{client-dynamodb,lib-dynamodb} 3.121.0
  • @faker-js/faker 7.3.0

DynamoDBテーブル作成

AWSマネジメントコンソールから下記のテーブルを作成します。

  • テーブル名: testItems
  • キャパシティ: オンデマンド
  • プライマリキー
    • パーティションキー: idS 型)
    • ソートキー: なし

テストデータ投入

以下のようなスクリプトを作成しテストデータを投入します。

save-test-items.ts
import { DynamoDB } from "@aws-sdk/client-dynamodb"
import { DynamoDBDocument } from "@aws-sdk/lib-dynamodb"
import * as uuid from "uuid"
import { faker } from "@faker-js/faker"

const main = async () => {
  const ddbDoc = DynamoDBDocument.from(
    new DynamoDB({ region: "ap-northeast-1" })
  )

  // 10万件のデータを作成
  for (let i = 0; i < 4000; i++) {
    await ddbDoc.batchWrite({
      RequestItems: {
        testItems: Array.from({ length: 25 }).map(() => ({
          PutRequest: {
            Item: {
              id: uuid.v4(),
              name: faker.name.findName(),
              age: Number(faker.random.numeric(2)),
              content: faker.lorem.sentences(),
              createdAt: faker.date.recent().toISOString(),
              updatedAt: faker.date.recent().toISOString(),
            },
          },
        })),
      },
    })
  }
}

main()

軽く動作検証のためにまず25件投入、その後10万件投入したため合計100025件入っている状態になっています。

スキャンする

実行・計測

esbuild-registerを使って実行、計測には /usr/bin/time -l を利用します。

/usr/bin/time -l node -r esbuild-register ./path/to/{FILE_NAME}.ts

通常の全件スキャン

まずは直列に全件スキャンしてみます。

scan-test-items.ts
import { DynamoDB } from "@aws-sdk/client-dynamodb"
import { DynamoDBDocument } from "@aws-sdk/lib-dynamodb"

const main = async () => {
  // DynamoDB Document Client初期化
  const ddbDoc = DynamoDBDocument.from(
    new DynamoDB({ region: "ap-northeast-1" })
  )

  let exclusiveStartKey: any | null | undefined = null
  let count = 0
  while (exclusiveStartKey !== undefined) {
    const result = await ddbDoc.scan({
      TableName: "testItems",
      ExclusiveStartKey: exclusiveStartKey ?? undefined,
    })
    exclusiveStartKey = result.LastEvaluatedKey
    count += result.Items?.length ?? 0
    console.log({ count })
  }
}

main()

実行時出力は以下です。

{ count: 3364 }
{ count: 6697 }
{ count: 10065 }
{ count: 13419 }
{ count: 16767 }
{ count: 20133 }
{ count: 23480 }
{ count: 26829 }
{ count: 30176 }
{ count: 33515 }
{ count: 36882 }
{ count: 40224 }
{ count: 43568 }
{ count: 46914 }
{ count: 50281 }
{ count: 53612 }
{ count: 56949 }
{ count: 60302 }
{ count: 63645 }
{ count: 66990 }
{ count: 70348 }
{ count: 73706 }
{ count: 77036 }
{ count: 80387 }
{ count: 83721 }
{ count: 87075 }
{ count: 90412 }
{ count: 93754 }
{ count: 97097 }
{ count: 100025 }

約3300件ずつの取得になっています。

2並列で全件スキャン

DynamoDB でのスキャンの操作 - Amazon DynamoDB

並列スキャンの場合、 scan() メソッドに

  • TotalSegment: 並列させたい数を指定する。例えば、4並列したい場合は 4 とする。
  • Segment : 並列に動作するワーカー/セグメントに割り振るIDのようなもの(と自分は理解)。 例えば TotalSegment: 4 とした場合は0〜3を割り振る。

を渡すようにします。

注意点として、 ExclusiveStartKey はそれぞれのセグメントで個別管理する必要があります。そのため以下のサンプルコードでは2並列スキャンに対して2つの ExclusiveStartKey 格納変数を用意しています。

また、もう一点注意として、 whileexclusiveStartKey !== undefined チェックとは別に、それぞれのスキャンにおいても exclusiveStartKey !== undefined チェックが必要になります。なぜかというと、データの総件数と並列数の組み合わせによっては、繰り返されるスキャンの終盤においてそれぞれの exclusiveStartKey の値にundefined{ id: "{UUID}" } が混在するケースがあります。最初の1回目を除いて exclusiveStartKeyundefined を渡してしまうと意図せず「スキャンが最初からやり直し」の挙動となり、無限(もしくはそれに近い)ループにつながる可能性があります。これを防止するためにそれぞれのスキャンにも exclusiveStartKey !== undefined のチェックを入れるようにします。

parallel-scan-test-items.ts
import { DynamoDB } from "@aws-sdk/client-dynamodb"
import { DynamoDBDocument } from "@aws-sdk/lib-dynamodb"

const main = async () => {
  // DynamoDB Document Client初期化
  const ddbDoc = DynamoDBDocument.from(
    new DynamoDB({ region: "ap-northeast-1" })
  )

  let exclusiveStartKey0: any | null | undefined = null
  let exclusiveStartKey1: any | null | undefined = null
  let count = 0
  while (exclusiveStartKey0 !== undefined || exclusiveStartKey1 !== undefined) {
    const scanResults = await Promise.all([
      exclusiveStartKey0 !== undefined // 先に終わったSegmentはもうスキャンしない
        ? ddbDoc.scan({
            TableName: "testItems",
            ExclusiveStartKey: exclusiveStartKey0 ?? undefined,
            TotalSegments: 2,
            Segment: 0,
          })
        : undefined,
      exclusiveStartKey1 !== undefined // 先に終わったSegmentはもうスキャンしない
        ? ddbDoc.scan({
            TableName: "testItems",
            ExclusiveStartKey: exclusiveStartKey1 ?? undefined,
            TotalSegments: 2,
            Segment: 1,
          })
        : undefined,
    ])
    exclusiveStartKey0 = scanResults[0].LastEvaluatedKey
    exclusiveStartKey1 = scanResults[1].LastEvaluatedKey
    count += scanResults[0].Items?.length ?? 0
    count += scanResults[1].Items?.length ?? 0
    console.log({ count })
  }
}

main()

実行時出力は以下です。

{ count: 6697 }
{ count: 13367 }
{ count: 20084 }
{ count: 26784 }
{ count: 33481 }
{ count: 40207 }
{ count: 46904 }
{ count: 53585 }
{ count: 60285 }
{ count: 66955 }
{ count: 73678 }
{ count: 80358 }
{ count: 87043 }
{ count: 93740 }
{ count: 100025 }

約6700件ずつ取得できており、ループ回数が少なくなったのがわかります。

並列スキャンコードをリファクタしてみる(そして10並列スキャン)

先ほどの2並列サンプルコードは並列数が増えるほど ExcluesiveStartKey を格納する変数と scan() の呼び出しを自分で追加定義しないといけないため、単純に手間がかかりますし書き間違いによる不具合の恐れも気になります。そのため配列で管理するようにリファクタしてみたいと思います。

それが以下です。今回は10並列にしてみました。

parallel-scan-test-items.ts
import { DynamoDB } from "@aws-sdk/client-dynamodb"
import { DynamoDBDocument } from "@aws-sdk/lib-dynamodb"

const main = async () => {
  // DynamoDB Document Client初期化
  const ddbDoc = DynamoDBDocument.from(
    new DynamoDB({ region: "ap-northeast-1" })
  )

  // 並列数を指定
  const parallelsCount = 10

  let exclusiveStartKeys: (any | null | undefined)[] = Array.from(
    {
      length: parallelsCount,
    },
    () => null
  )
  let count = 0
  while (
    exclusiveStartKeys.some(
      (exclusiveStartKey) => exclusiveStartKey !== undefined
    )
  ) {
    const scanResults = await Promise.all(
      exclusiveStartKeys.map(async (exclusiveStartKey, i) => {
        if (exclusiveStartKey === undefined) {
          // 先に終わったSegmentはもうスキャンしない
          return undefined
        }

        return ddbDoc.scan({
          TableName: "testItems",
          ExclusiveStartKey: exclusiveStartKey ?? undefined,
          TotalSegments: parallelsCount,
          Segment: i,
        })
      })
    )
    exclusiveStartKeys = scanResults.map(
      (scanResult) => scanResult?.LastEvaluatedKey
    )
    scanResults.forEach(
      (scanResult) => (count += scanResult?.Items?.length ?? 0)
    )
    console.log({ count })
  }
}

main()

const parallelsCount = 10 の数値を変更するだけで並列数をコントールできるようになりました。

実行時出力は以下です。

{ count: 33450 }
{ count: 66935 }
{ count: 99797 }
{ count: 100025 }

約33000件ずつ取得でき、4回のループで全件取得できました。

並列スキャンで同じアイテムが重複することはない?

念の為、取得結果に重複がないか確認してみます。

取得アイテムを変数にすべて収集し、プライマリキーとしている id に重複がないかチェックしてみます。

parallel-scan-test-items.ts
import { DynamoDB } from "@aws-sdk/client-dynamodb"
import { DynamoDBDocument } from "@aws-sdk/lib-dynamodb"

const main = async () => {
  // DynamoDB Document Client初期化
  const ddbDoc = DynamoDBDocument.from(
    new DynamoDB({ region: "ap-northeast-1" })
  )

  // 並列数を指定
  const parallelsCount = 10

  let exclusiveStartKeys: (any | null | undefined)[] = Array.from(
    {
      length: parallelsCount,
    },
    () => null
  )
  let count = 0
+ let items: Record<string, any>[] = []
  while (
    exclusiveStartKeys.some(
      (exclusiveStartKey) => exclusiveStartKey !== undefined
    )
  ) {
    const scanResults = await Promise.all(
      exclusiveStartKeys.map(async (exclusiveStartKey, i) => {
        if (exclusiveStartKey === undefined) {
          // 先に終わったSegmentはもうスキャンしない
          return undefined
        }

        return ddbDoc.scan({
          TableName: "testItems",
          ExclusiveStartKey: exclusiveStartKey ?? undefined,
          TotalSegments: parallelsCount,
          Segment: i,
        })
      })
    )
    exclusiveStartKeys = scanResults.map(
      (scanResult) => scanResult?.LastEvaluatedKey
    )
-   scanResults.forEach(
-     (scanResult) => (count += scanResult?.Items?.length ?? 0)
-   )
+   scanResults.forEach((scanResult) => {
+     count += scanResult?.Items?.length ?? 0
+     items = items.concat(scanResult?.Items ?? [])
+   })
    console.log({ count })
  }
  
+ const ids = items.map((item) => item.id)
+ const uniqueIds = [...new Set(ids)]
+ console.log({ count, idsCount: ids.length, uniqueIdsCount: uniqueIds.length })
}

main()

実行時出力は以下です。

{ count: 33450 }
{ count: 66935 }
{ count: 99797 }
{ count: 100025 }
{ count: 100025, idsCount: 100025, uniqueIdsCount: 100025 }

count idsCount uniqueIdsCount すべて 100025 で一致しているので重複はないと思われます。

結果

それぞれ要した時間を下記に示します。

ケース 実行時間(秒)
通常スキャン 107.76s
2並列スキャン 59.89s
10並列スキャン 14.22s

並列にすると抜群に速くなることがわかります。

内容は以上になりますが、もし並列スキャンについて理解が誤っている点がありましたらコメント欄でご指摘頂けますと幸いです。

参考

Discussion