💡

大量のデータは少しずつ処理しよう

2024/08/21に公開

やりたいこと

S3に置かれたサイズが大きめなCSVデータをLambdaで処理して再度S3にアップロードしたい。
しかし、一度にファイルを取得してしまうと、メモリ不足になって処理ができません。
そういう時は、S3からデータをストリームとして読み込み、都度処理しながら、別のS3バケットにストリームとしてアップロードするようにしましょう。

ストリーム形式で随時処理する

(元コードから抜粋しています)

const uploadStream = () => {
  const pass = new PassThrough();
  return {
    writeStream: pass,
    promise: new Upload({
      client: s3Client,
      params: {
        Bucket: BUCKET_NAME_A,
        Key: "sample.txt",
        Body: pass,
      },
    }).done(),
  };
};
const { writeStream, promise } = uploadStream();


const getObjectResponse = await s3Client.send(
  new GetObjectCommand({
    Bucket: BUCKET_NAME_B,
    Key: objectName,
  })
);
const stream = getObjectResponse.Body as Readable;
stream.pipe(csvParser()).on("data", (data) => {
  const convertedData = convertData(data); //データ処理
  writeStream.write(convertedData + "\r\n");
});
stream.on("end", () => {
  writeStream.end();
});
await promise;

前半と後半に分けて説明します。

S3にデータをアップロードするためのストリームを作成

const uploadStream = () => {
  const pass = new PassThrough();
  return {
    writeStream: pass,
    promise: new Upload({
      client: s3Client,
      params: {
        Bucket: BUCKET_NAME_A,  // アップロード先のS3バケット
        Key: "sample.txt", // アップロード先のファイル名
        Body: pass, // データをアップロードするためのストリーム
      },
    }).done(),
  };
};
const { writeStream, promise } = uploadStream();
  • uploadStream関数は、S3にデータをアップロードするための「ストリーム」(データの流れ)を準備しています。
  • PassThroughは、データをそのまま通過させるストリームです。
  • writeStreamは、データを書き込むためのストリームです。
  • promiseは、アップロードが完了したら結果を返すPromiseです。これにより、処理が非同期的に行われます。

S3からデータを取得し、データを読み込みながら処理し、結果をアップロード

const getObjectResponse = await s3Client.send(
  new GetObjectCommand({
    Bucket: BUCKET_NAME_B,
    Key: objectName,
  })
);
const stream = getObjectResponse.Body as Readable;
stream.pipe(csvParser()).on("data", (data) => {
  const convertedData = convertData(data); // データ加工処理
  writeStream.write(convertedData + "\r\n"); // 変換後のデータをアップロード用ストリームに書き込む。今回は改行を入れたかった
});
stream.on("end", () => {
  writeStream.end(); // 全てのデータを書き終わったらストリームを閉じる
});
await promise; // アップロードの完了を待つ
  • GetObjectCommandを使って、指定したS3バケットからファイルを取得します。
  • getObjectResponse.Bodyに取得したファイルの内容が入っています。これは、ストリームとして処理できます。
  • stream.pipe(csvParser())で、S3から取得したファイルをCSVとして解析し、その内容を1行ずつ処理します。
  • on("data", ...)の部分で、各行のデータを処理して、convertDataという関数で変換を行っています。
  • writeStream.write(convertedData + "\r\n")で、変換したデータを先ほど準備したS3アップロード用のストリームに書き込みます。
  • 全てのデータが読み込まれたら、writeStream.end()で書き込みを終了し、await promiseでアップロードが完了するのを待ちます。

さいごに

普段大きめのデータを扱うことがないのでメモリを気にしたことがありませんでしたが、良い勉強になりました。
それでも15分を越える処理の場合は処理を見直すか、Lambdaを諦めるか、元データの量を減らしてもらいましょう。。。。

参考

https://dev.classmethod.jp/articles/s3-csv-nodejs-streaming-s3-json/#toc-8

NCDCエンジニアブログ

Discussion