[AWS] DynamoDBの並列スキャンをTypescriptでやってみた
最近プロジェクトで通常スキャンとは異なる、並列スキャンを使用する機会があったので検証を含め試してみました。
以下の公式を参考に進めます。
では、初めていきます。
DynamoDBのスキャンとは
DynamoDBのスキャンは、テーブル内のアイテムを一括で取得する操作のことです。
基本的なスキャンは、条件なしで全アイテムを取得するため、大規模なデータセットではコストがかかります。
通常スキャンと並列スキャンの違い
通常のスキャンは、単一のスレッドでテーブルを処理します。
これに対して、並列スキャンは複数のスレッドを同時に使用し、テーブルを分割して効率的に処理します。
もう少し噛み砕いてみます。
スキャンは1MBまでの最大容量を持ち、その容量内でデータの取得を行います。
そのため、通常スキャン(シーケンシャルスキャン)は1MBのスキャンが完了したら、次の1MB分のデータをスキャンといったように、スキャンが順次実行されるため、データ数が多いほどパフォーマンスが下がってしまいます。
しかし、並列スキャンの場合は、スキャンするためのスレッドを一つではなく複数使用して、同時にスキャンを実行します。
そのため、データ量が少ない場合では、あまり効果を感じることはできませんが、データ量が増えるにつれて、スキャンのパフォーマンスが向上し、並列スキャンの恩恵を感じやすくなります。
ここまでは「通常スキャンよりも並列スキャンの方がいいじゃん。」と思うかもしれませんが、
デメリットも当然あります。
以下ではメリットとデメリットで分けて記載します。
並列スキャンのデメリット
- コストの増加
- 並列処理には追加のスレッドや計算リソースが必要であり、これがアプリケーションの運用コストに影響を与える可能性があります。特に、リソースの追加が常に必要なわけではない場合でも、コストが発生するため、注意が必要です。
- スキャン後のデータ出力順がバラバラ
- 各スレッドごとにスキャンを実行し、結果を返すため、出力の順番が考慮されずに返されます。そのため、データの格納順で取得したい場合はアプリケーション側でソートのロジックをかける必要があります。
- スキャン効率の低下(以外かもしれませんがケースバイケースで発生することもあるそうです)
- データが均等に分散されていない場合や、各スレッドごとの処理時間にばらつきがある場合、一部のスレッドが遅れて終了する可能性があります。これにより、全体のスキャン効率が低下する可能性があります。従って、適切なスレッド数やデータ分布を考慮することが重要です。
並列スキャンのメリット
- 処理速度の向上
- 大規模なデータセットでも、同時に複数のパーティションでスキャンを行うことで、全体の処理速度が向上します。これにより、クエリの結果をより迅速に取得できます。
- スケーラビリティの向上
- データベースのサイズや負荷が増加しても、追加のリソースを使用して処理を拡張することができます。これにより、大規模なシステムや要求が発生する瞬間にもスムーズに対応できます。
- リソースの効率的な利用
- 通常のスキャンでは単一のスレッドしか使用できないため、システムがアイドル状態になることがあります。しかし、並列処理により複数のスレッドが同時に動作することで、CPUやネットワークなどのリソースを最大限に利用でき、全体の効率が向上します。
Typescriptで実際に書いてみる
※ 今回はDynamoDB Localを使用しています。
また、コードの記載だけだと面白くないので、
「どれくらい時間を短縮できるのか?」も検証しようと思います。
ダミーデータ投入(1万件のデータを投入)
事前にデータを11件投入していたため、合計は10011件のになります。
export const generateSampleData = async (
dynamoDBClient: DynamoDBClient,
tableName: string,
count: number,
) => {
const rand = (min: number, max: number): string => {
const randNum = (Math.floor(Math.random() * (max - min + 1)) + min);
return String(randNum)
};
for(let i = 0; i <= count; i++) {
const params = {
Item: {
// ダミーテーブルのパーテションキー
sample1: {S: `sample-${i}`}
// 以下に必要なデータ追加
},
TableName: tableName,
};
try {
const putParamas = new PutItemCommand(params);
dynamoDBClient.send(putParamas);
} catch(err) {
console.error(err);
};
};
console.log("put completed!");
};
通常スキャン記述
const getAllDataByNormalScan = async (
dynamoDBClient: DynamoDBClient,
tableName: string,
): Promise<unknown[]> => {
let resultData: unknown[] = [];
let exclusiveStartKey: any | null | undefined = null;
while(exclusiveStartKey !== undefined) {
const scanParams = new ScanCommand({
TableName: tableName,
ExclusiveStartKey: exclusiveStartKey ?? undefined
});
try {
const result = await dynamoDBClient.send(scanParams);
const marshallResult = result.Items?.map((item) => unmarshall(item));
marshallResult?.map((item) => resultData.push(item));
exclusiveStartKey = result.LastEvaluatedKey;
} catch (err) {
console.error(err);
};
};
console.log(`取得データ数: ${resultData.length}件`);
return resultData;
};
並列スキャン記述
const getAllDataBySegmentScan = async (
dynamoDBClient: DynamoDBClient,
tableName: string,
totalSegments: number,
): Promise<unknown[]> => {
let resultData: unknown[] = [];
const scanPromises = Array.from({ length: totalSegments }, (_, segment) => {
const scanParams = new ScanCommand({
TableName: tableName,
TotalSegments: totalSegments,
Segment: segment,
ReturnConsumedCapacity: "TOTAL"
});
const result = dynamoDBClient.send(scanParams)
.then(result => result.Items?.map(item => unmarshall(item)) || []);
console.log(result);
return result;
});
try {
const scanResults = await Promise.all(scanPromises);
resultData = scanResults.flat();
} catch (err) {
console.error(err);
};
console.log(`取得データ数: ${resultData.length}件`);
return resultData;
};
並列スキャンの解説
-
totalSegments
: テーブルデータの分割数を指定し、一つ一つにsegment番号が付与される。(totalSegment=4
なら4つのデータに分割され、0,1,2,3の番号がそれぞれのsegmentに付与される) -
segment
: totalSegmentによって分割されたsegment番号を指定し、処理を実行させるためのもの。 -
unmarshall(item)
: 得た結果をそれぞれDynamoDBで出力される型から、通常のオブジェクト型に変換
実行時間検証
まずは通常スキャンから。
// 通常スキャン
const run = async (): Promise<void> => {
try {
const startTime = performance.now();
await getAllDataByNormalScan(
ddbClient,
"sample-table"
);
const endTime = performance.now();
console.log(`実行時間: ${~~(endTime - startTime) / 1000}秒`);
console.log("Done!");
} catch (err) {
console.error(err);
};
}();
結果
取得データ数: 10011件
実行時間: 2.556秒
Done!
次に並列スキャン。
// 通常スキャン
const run = async (): Promise<void> => {
try {
const startTime = performance.now();
await getAllDataBySegmentScan(
ddbClient,
"sample-table"
11 // 私の投入データの場合、1segmentあたり979件が最大取得件数となります。
);
const endTime = performance.now();
console.log(`実行時間: ${~~(endTime - startTime) / 1000}秒`);
console.log("Done!");
} catch (err) {
console.error(err);
};
}();
結果
取得データ数: 10011件
実行時間: 0.858秒
Done!
最終結果
1.698秒 並列スキャンの方が取得が早い(単純に2倍早い)
意識するべき点
- 適切なセグメント数の選択
- セグメント数はデータセットとテーブルのスループットによって決まるため、1セグメントが該当テーブルのデータを何件取得できるのかを把握しておく必要があります。そうすることで、セグメントを有効活用でき、効率的なスキャンが可能となります。
- 本当に並列スキャンが必要か?
- 取得時間を気にする必要がない場合は、1MBを使い切って順次データを取得してくれる通常のスキャンを選択することもありかと思います。なぜなら、並列スキャンはセグメント数の選択を誤ると、1MBを使いきれず、コスト効率を考えるといまいちになってしまう可能性があるためです。
まとめ
並列スキャンはメリットばかり目が移りがちですが、デメリットもしっかりと理解することで、
コードのリファクタリングや、要件に合致する最適解かどうかを検討できる良い機会になったと感じてます。
私の場合は、「制限時間のある中で数十万件のデータを一括で取得&更新する」というもので、
「更新処理の時間を最大限確保するため、データ取得時間の短縮」を検討する必要がありました。
また、最終結果を見るに、検証用データの数が現在は1万件と少ないですが、
これが増えるにつれて、処理時間も比例して増加するため、より効果を発揮しそうだと感じました。
今回の記事が誰かのお役に立てれば幸いです。
NCDC株式会社( ncdc.co.jp/ )のエンジニアチームです。 募集中のポジションや、採用している技術スタックの紹介などはこちらをご覧ください! github.com/ncdcdev/recruitment
Discussion