🚣

DynamoDBの中身をS3にCSVでNode.jsでストリームでアップロードする方法

2024/06/19に公開

Lambdaなどでメモリをなるべく消費しないように処理したい時ってありますよね
そんな時にNode.jsでストリームを扱う例としてDynamoDBをS3にCSVでアップロードする例を作ってみました

import {
  DynamoDBClient,
  ExecuteStatementCommand,
} from "@aws-sdk/client-dynamodb";
import { S3Client } from "@aws-sdk/client-s3";
import { Upload } from "@aws-sdk/lib-storage";
import { unmarshall } from "@aws-sdk/util-dynamodb";
import { stringify } from "csv-stringify";

const region = process.env.REGION;
const tableName = process.env.TABLE_NAME;
const bucketName = process.env.BUCKET_NAME;
const s3Key = process.env.S3_KEY;

const dynamodb = new DynamoDBClient({ region });
const s3 = new S3Client({ region });

// アップロード用のストリームを生成
const stream = stringify({ header: true });
const upload = new Upload({
  client: s3,
  params: {
    Bucket: bucketName,
    Key: s3Key,
    Body: stream,
  },
});

// DynamoDBからデータを取りつつストリームにデータを流す
let nextToken;
do {
  const response = await dynamodb.send(
    new ExecuteStatementCommand({
      Statement: [`SELECT *`, `FROM "${tableName}"`].join(" "),
      NextToken: nextToken,
    })
  );
  for (const item of response.Items) {
    stream.write(unmarshall(item));
  }
  nextToken = response.NextToken;
} while (nextToken);

// ストリームを閉じてアップロードの完了を待つ
stream.end();
await upload.done();

ストリームの作成

const stream = stringify({ header: true });
const upload = new Upload({
  client: s3,
  params: {
    Bucket: bucketName,
    Key: s3Key,
    Body: stream,
  },
});

今回はcsv-stringifyというライブラリを使用しています
関数stringifyはstream.Transformを継承したStringifierクラスのインスタンスを返します
これをストリームに対応したS3のアップローダーに渡します
今回はライブラリにストリームへの書き込み部分を任せていますが、アップローダーに渡したストリームに書き込みさえすればなんでもストリームでアップできます

ストリームにデータを流す

いじりやすいようPartiQLで書いてみました
覚えたらPartiQLのほうが楽に感じるようになりました
このライブラリはObjectでもArrayでも関数writeに渡せばCSV形式にしてストリームに書き込んでくれます

アップロード完了を待つ

ストリームを閉じることでアップローダーはアップロード処理を完了してくれます
関数doneが返すPromiseを待てばアップロードは完了です
ただストリームで処理されているので関数doneを待つのはほんの一瞬ですね

株式会社find | 落とし物クラウド

Discussion