DynamoDB ExcluesiveStartKeyでページネーションしながら並列スキャン
DynamoDB 並列スキャン(Parallel Scan)というものがあるのを知りました。
アイテム数の多いDynamoDBテーブルを対象にバッチ処理をする際の高速化手段として便利に使えそうです。特に実行時間に15分制限があるAWS Lambdaバッチにおいて重宝しそうに思いました。
ExcluesiveStartKey
を使ったページネーション(Pagination)を伴うサンプルコードがあまり見つからなかったので、試した結果をサンプルコードつきで紹介したいと思います(Node.js + TypeScript + AWS SDK V3)。
環境
- M1 Mac
- node 16.15.0
- typescript 4.7.4
- esbuild 0.14.48
- esbuild-register 3.3.3
- @aws-sdk/{client-dynamodb,lib-dynamodb} 3.121.0
- @faker-js/faker 7.3.0
DynamoDBテーブル作成
AWSマネジメントコンソールから下記のテーブルを作成します。
- テーブル名:
testItems
- キャパシティ: オンデマンド
- プライマリキー
- パーティションキー:
id
(S
型) - ソートキー: なし
- パーティションキー:
テストデータ投入
以下のようなスクリプトを作成しテストデータを投入します。
import { DynamoDB } from "@aws-sdk/client-dynamodb"
import { DynamoDBDocument } from "@aws-sdk/lib-dynamodb"
import * as uuid from "uuid"
import { faker } from "@faker-js/faker"
const main = async () => {
const ddbDoc = DynamoDBDocument.from(
new DynamoDB({ region: "ap-northeast-1" })
)
// 10万件のデータを作成
for (let i = 0; i < 4000; i++) {
await ddbDoc.batchWrite({
RequestItems: {
testItems: Array.from({ length: 25 }).map(() => ({
PutRequest: {
Item: {
id: uuid.v4(),
name: faker.name.findName(),
age: Number(faker.random.numeric(2)),
content: faker.lorem.sentences(),
createdAt: faker.date.recent().toISOString(),
updatedAt: faker.date.recent().toISOString(),
},
},
})),
},
})
}
}
main()
軽く動作検証のためにまず25件投入、その後10万件投入したため合計100025件入っている状態になっています。
スキャンする
実行・計測
esbuild-registerを使って実行、計測には /usr/bin/time -l
を利用します。
/usr/bin/time -l node -r esbuild-register ./path/to/{FILE_NAME}.ts
通常の全件スキャン
まずは直列に全件スキャンしてみます。
import { DynamoDB } from "@aws-sdk/client-dynamodb"
import { DynamoDBDocument } from "@aws-sdk/lib-dynamodb"
const main = async () => {
// DynamoDB Document Client初期化
const ddbDoc = DynamoDBDocument.from(
new DynamoDB({ region: "ap-northeast-1" })
)
let exclusiveStartKey: any | null | undefined = null
let count = 0
while (exclusiveStartKey !== undefined) {
const result = await ddbDoc.scan({
TableName: "testItems",
ExclusiveStartKey: exclusiveStartKey ?? undefined,
})
exclusiveStartKey = result.LastEvaluatedKey
count += result.Items?.length ?? 0
console.log({ count })
}
}
main()
実行時出力は以下です。
{ count: 3364 }
{ count: 6697 }
{ count: 10065 }
{ count: 13419 }
{ count: 16767 }
{ count: 20133 }
{ count: 23480 }
{ count: 26829 }
{ count: 30176 }
{ count: 33515 }
{ count: 36882 }
{ count: 40224 }
{ count: 43568 }
{ count: 46914 }
{ count: 50281 }
{ count: 53612 }
{ count: 56949 }
{ count: 60302 }
{ count: 63645 }
{ count: 66990 }
{ count: 70348 }
{ count: 73706 }
{ count: 77036 }
{ count: 80387 }
{ count: 83721 }
{ count: 87075 }
{ count: 90412 }
{ count: 93754 }
{ count: 97097 }
{ count: 100025 }
約3300件ずつの取得になっています。
2並列で全件スキャン
DynamoDB でのスキャンの操作 - Amazon DynamoDB
並列スキャンの場合、 scan()
メソッドに
-
TotalSegment
: 並列させたい数を指定する。例えば、4並列したい場合は4
とする。 -
Segment
: 並列に動作するワーカー/セグメントに割り振るIDのようなもの(と自分は理解)。 例えばTotalSegment: 4
とした場合は0〜3を割り振る。
を渡すようにします。
注意点として、 ExclusiveStartKey
はそれぞれのセグメントで個別管理する必要があります。そのため以下のサンプルコードでは2並列スキャンに対して2つの ExclusiveStartKey
格納変数を用意しています。
また、もう一点注意として、 while
の exclusiveStartKey !== undefined
チェックとは別に、それぞれのスキャンにおいても exclusiveStartKey !== undefined
チェックが必要になります。なぜかというと、データの総件数と並列数の組み合わせによっては、繰り返されるスキャンの終盤においてそれぞれの exclusiveStartKey
の値にundefined
と { id: "{UUID}" }
が混在するケースがあります。最初の1回目を除いて exclusiveStartKey
に undefined
を渡してしまうと意図せず「スキャンが最初からやり直し」の挙動となり、無限(もしくはそれに近い)ループにつながる可能性があります。これを防止するためにそれぞれのスキャンにも exclusiveStartKey !== undefined
のチェックを入れるようにします。
import { DynamoDB } from "@aws-sdk/client-dynamodb"
import { DynamoDBDocument } from "@aws-sdk/lib-dynamodb"
const main = async () => {
// DynamoDB Document Client初期化
const ddbDoc = DynamoDBDocument.from(
new DynamoDB({ region: "ap-northeast-1" })
)
let exclusiveStartKey0: any | null | undefined = null
let exclusiveStartKey1: any | null | undefined = null
let count = 0
while (exclusiveStartKey0 !== undefined || exclusiveStartKey1 !== undefined) {
const scanResults = await Promise.all([
exclusiveStartKey0 !== undefined // 先に終わったSegmentはもうスキャンしない
? ddbDoc.scan({
TableName: "testItems",
ExclusiveStartKey: exclusiveStartKey0 ?? undefined,
TotalSegments: 2,
Segment: 0,
})
: undefined,
exclusiveStartKey1 !== undefined // 先に終わったSegmentはもうスキャンしない
? ddbDoc.scan({
TableName: "testItems",
ExclusiveStartKey: exclusiveStartKey1 ?? undefined,
TotalSegments: 2,
Segment: 1,
})
: undefined,
])
exclusiveStartKey0 = scanResults[0].LastEvaluatedKey
exclusiveStartKey1 = scanResults[1].LastEvaluatedKey
count += scanResults[0].Items?.length ?? 0
count += scanResults[1].Items?.length ?? 0
console.log({ count })
}
}
main()
実行時出力は以下です。
{ count: 6697 }
{ count: 13367 }
{ count: 20084 }
{ count: 26784 }
{ count: 33481 }
{ count: 40207 }
{ count: 46904 }
{ count: 53585 }
{ count: 60285 }
{ count: 66955 }
{ count: 73678 }
{ count: 80358 }
{ count: 87043 }
{ count: 93740 }
{ count: 100025 }
約6700件ずつ取得できており、ループ回数が少なくなったのがわかります。
並列スキャンコードをリファクタしてみる(そして10並列スキャン)
先ほどの2並列サンプルコードは並列数が増えるほど ExcluesiveStartKey
を格納する変数と scan()
の呼び出しを自分で追加定義しないといけないため、単純に手間がかかりますし書き間違いによる不具合の恐れも気になります。そのため配列で管理するようにリファクタしてみたいと思います。
それが以下です。今回は10並列にしてみました。
import { DynamoDB } from "@aws-sdk/client-dynamodb"
import { DynamoDBDocument } from "@aws-sdk/lib-dynamodb"
const main = async () => {
// DynamoDB Document Client初期化
const ddbDoc = DynamoDBDocument.from(
new DynamoDB({ region: "ap-northeast-1" })
)
// 並列数を指定
const parallelsCount = 10
let exclusiveStartKeys: (any | null | undefined)[] = Array.from(
{
length: parallelsCount,
},
() => null
)
let count = 0
while (
exclusiveStartKeys.some(
(exclusiveStartKey) => exclusiveStartKey !== undefined
)
) {
const scanResults = await Promise.all(
exclusiveStartKeys.map(async (exclusiveStartKey, i) => {
if (exclusiveStartKey === undefined) {
// 先に終わったSegmentはもうスキャンしない
return undefined
}
return ddbDoc.scan({
TableName: "testItems",
ExclusiveStartKey: exclusiveStartKey ?? undefined,
TotalSegments: parallelsCount,
Segment: i,
})
})
)
exclusiveStartKeys = scanResults.map(
(scanResult) => scanResult?.LastEvaluatedKey
)
scanResults.forEach(
(scanResult) => (count += scanResult?.Items?.length ?? 0)
)
console.log({ count })
}
}
main()
const parallelsCount = 10
の数値を変更するだけで並列数をコントールできるようになりました。
実行時出力は以下です。
{ count: 33450 }
{ count: 66935 }
{ count: 99797 }
{ count: 100025 }
約33000件ずつ取得でき、4回のループで全件取得できました。
並列スキャンで同じアイテムが重複することはない?
念の為、取得結果に重複がないか確認してみます。
取得アイテムを変数にすべて収集し、プライマリキーとしている id
に重複がないかチェックしてみます。
import { DynamoDB } from "@aws-sdk/client-dynamodb"
import { DynamoDBDocument } from "@aws-sdk/lib-dynamodb"
const main = async () => {
// DynamoDB Document Client初期化
const ddbDoc = DynamoDBDocument.from(
new DynamoDB({ region: "ap-northeast-1" })
)
// 並列数を指定
const parallelsCount = 10
let exclusiveStartKeys: (any | null | undefined)[] = Array.from(
{
length: parallelsCount,
},
() => null
)
let count = 0
+ let items: Record<string, any>[] = []
while (
exclusiveStartKeys.some(
(exclusiveStartKey) => exclusiveStartKey !== undefined
)
) {
const scanResults = await Promise.all(
exclusiveStartKeys.map(async (exclusiveStartKey, i) => {
if (exclusiveStartKey === undefined) {
// 先に終わったSegmentはもうスキャンしない
return undefined
}
return ddbDoc.scan({
TableName: "testItems",
ExclusiveStartKey: exclusiveStartKey ?? undefined,
TotalSegments: parallelsCount,
Segment: i,
})
})
)
exclusiveStartKeys = scanResults.map(
(scanResult) => scanResult?.LastEvaluatedKey
)
- scanResults.forEach(
- (scanResult) => (count += scanResult?.Items?.length ?? 0)
- )
+ scanResults.forEach((scanResult) => {
+ count += scanResult?.Items?.length ?? 0
+ items = items.concat(scanResult?.Items ?? [])
+ })
console.log({ count })
}
+ const ids = items.map((item) => item.id)
+ const uniqueIds = [...new Set(ids)]
+ console.log({ count, idsCount: ids.length, uniqueIdsCount: uniqueIds.length })
}
main()
実行時出力は以下です。
{ count: 33450 }
{ count: 66935 }
{ count: 99797 }
{ count: 100025 }
{ count: 100025, idsCount: 100025, uniqueIdsCount: 100025 }
count
idsCount
uniqueIdsCount
すべて 100025
で一致しているので重複はないと思われます。
結果
それぞれ要した時間を下記に示します。
ケース | 実行時間(秒) |
---|---|
通常スキャン | 107.76s |
2並列スキャン | 59.89s |
10並列スキャン | 14.22s |
並列にすると抜群に速くなることがわかります。
内容は以上になりますが、もし並列スキャンについて理解が誤っている点がありましたらコメント欄でご指摘頂けますと幸いです。
Discussion