😊

Cloud SQLへ、Storage上の複数のCSVファイルをインポートするなら、Google API経由が良さげ

2024/09/16に公開

結論

  • Cloud SQL に、複数の CSV ファイルをインポートするなら、Google API 経由の方がすんなり行った!
  • 最初は Cloud Run x NodeJS で頑張ろうとしたが、複数ファイルを読み込もうとすると何故か処理が止まってしまう

実現したいこと

  • Cloud Storage 上の特定のバケット・ディレクトリにある、複数の CSV ファイルを Cloud SQL にまとめてインポートしたい
  • (自分の場合)5.5MB くらいの CSV ファイルが 150 個(全体で一億行)ある

Cloud Run で頑張ろうとした痕跡

  • 以下が CSV インポートに使ったコード
import { parse } from "csv-parse";
import admin from "firebase-admin";
import pg from "pg";

const connectionConfig: pg.PoolConfig = {
  host: process.env.PG_HOST,
  user: process.env.PG_USER,
  password: process.env.PG_PASSWORD,
  database: process.env.PG_DATABASE,
  port: 5432,
};

// Eventarcから受け取ったCSV情報を表示したい
const bucketName = process.argv[2];

const main = async () => {
  if (!bucketName) {
    console.error("Invalid arguments");
    process.exit(1);
  }

  console.time("Cloud SQLへのインポート");

  const bucket = admin.storage().bucket(bucketName);

  // バケット内の CSV ファイル一覧を取得
  const [csvFiles] = await bucket.getFiles({ matchGlob: `*.csv` });

  const pool = new pg.Pool(connectionConfig);

  const csvParser = parse({
    columns: true,
    skip_empty_lines: true,
    delimiter: [","],
  });

  const failedFiles: string[] = [];
  let totalRowCount = 0;

  /**
   * 複数のCSVファイルを読み込む
   */
  const filePromises = csvFiles.map(async (csvFile) => {
    try {
      await new Promise<void>((resolve, reject) => {
        csvFile
          .createReadStream()
          .pipe(csvParser)
          .on("readable", async () => {
            const client = await pool.connect();

            client.on("error", (err) => {
              console.error("Error connecting to Cloud SQL:!!", err);
              reject(err);
            });

            try {
              await client.query("BEGIN");
              let record;
              let rowCount = 0;
              let bulkInsertValues = [];
              const chunkSize = 100000;

              while ((record = csvParser.read())) {
                bulkInsertValues.push([
                  record.id,
                  record.userId,
                  record.createdAt,
                  record.point,
                ]);

                rowCount++;
                totalRowCount++;

                if (bulkInsertValues.length >= chunkSize) {
                  const query = {
                    text: `INSERT INTO "ImportTest" ("id", "userId", "createdAt", "point") 
                           VALUES ${bulkInsertValues
                             .map(
                               (_, index) =>
                                 `($${index * 4 + 1}, $${index * 7 + 2}, $${
                                   index * 4 + 3
                                 }, $${index * 4 + 4})`
                             )
                             .join(", ")}`,
                    values: bulkInsertValues.flat(),
                  };
                  await client.query(query);
                  bulkInsertValues = [];
                }
              }

              if (bulkInsertValues.length > 0) {
                const query = {
                  text: `INSERT INTO "ImportTest" ("id", "userId", "createdAt", "point") 
                         VALUES ${bulkInsertValues
                           .map(
                             (_, index) =>
                               `($${index * 4 + 1}, $${index * 4 + 2}, $${
                                 index * 4 + 3
                               }, $${index * 4 + 4})`
                           )
                           .join(", ")}`,
                  values: bulkInsertValues.flat(),
                };
                await client.query(query);
              }

              await client.query("COMMIT");

              console.log(
                `${rowCount}行のデータをCloud SQLにインポートしました。`
              );
            } catch (err) {
              await client.query("ROLLBACK");
              console.error("Error connecting to Cloud SQL:", err);
              reject(err);
            } finally {
              client.release(); // 接続をプールに返却
            }
          })
          .on("error", (err) => {
            console.error("CSVパースエラー:", err);
            reject(err);
          })
          .on("end", async () => {
            resolve();
          });
      });
    } catch (err) {
      failedFiles.push(csvFile.name);
    }
  });

  // 並列でファイルを読み込む
  await Promise.all(filePromises);

  await pool.end(); // 接続プールを終了
  console.log("Pool.end called!");

  if (failedFiles.length > 0) {
    console.error("インポートに失敗したファイル: ", failedFiles);
  } else {
    console.log(
      `${csvFiles.length}個のCSVファイルのインポートが完了しました。`
    );
  }

  console.timeEnd(`${competitionRoundId}から、Cloud SQLへのインポート`);
};

main();
  • 最初は CSV ファイルが大きすぎて、メモリ的な制約に引っかかってしまったのかもしれないと思ったが。
  • ただファイル数を2つに減らしても、それでも処理が止まってしまったので、いまいち釈然としない
  • そこで、Google API を使ってみることにした

Google API を使ってみる

  • こちらの記事を参考に、Cloud Workflow の中で、Google API を使って CSV を読み込んだ

https://book.st-hakky.com/hakky/from-big-query-to-cloud-sql/

  • 🚨注意1️ これで記事を書こうとも思ったくらいだが、API経由で CSV を Cloud SQL にインポートする際には、APIを実行するサービスアカウントだけではなく、Cloud SQL のサービスアカウントにも、バケットに対して適切な権限を与える必要があるソース
    • 自分はここに詰まって、だいぶん時間を溶かしてしまった...
  • 🚨注意2️ API経由でCSVインポートする場合、CSVにヘッダーは含めないようにすること
    • API経由だと、ヘッダーを無視するといったオプションがない
    • なので、出力時のスキーマを別で管理しておいて、インポート時にそのスキーマを指定するようにした
  • 以下は、Cloud Workflow のワークフローのコード
main:
  params: [event]
  steps:
    - init:
        assign:
          - projectId: 'project-id'
          - instance: 'instance-id'
          - datasetId: 'dataset-id'
          - tableId: 'table-id'
          - bucket: ${event.bucket}
    - log_event:
        call: sys.log
        args:
          data: ${event}
    - list_csv_files_to_delete:
        call: googleapis.storage.v1.objects.list
        args:
          bucket: ${bucket}
        result: files_to_delete
    - delete_csv_files_if_exist:
        for:
          value: file
          in: ${files_to_delete.items}
          steps:
            - delete_file:
                call: googleapis.storage.v1.objects.delete
                args:
                  bucket: ${bucket}
                  # オブジェクトがディレクトリにある場合、URLエンコードしないと404エラーになってしまうことに注意
                  # https://cloud.google.com/workflows/docs/reference/googleapis/storage/v1/objects/delete
                  object: ${text.url_encode(file.name)}
    - assign_match_glob:
        assign:
          - csvMatchGlob: "*.csv"
    - export_csv_from_bq:
        call: googleapis.bigquery.v2.jobs.insert
        args:
          projectId: ${projectId}
          body:
            configuration:
              extract:
                sourceTable:
                  projectId: ${projectId}
                  datasetId: ${datasetId}
                  tableId: ${tableId}
                destinationUri: ${"gs://" + bucket + "/" + csvMatchGlob}
                # ヘッダーは含めない
                printHeader: false
    - count_storage_files:
        call: googleapis.storage.v1.objects.list
        args:
          bucket: ${bucket}
          matchGlob: ${csvMatchGlob}
        result: listResult
    - check_if_files_exist:
        switch:
          - condition: ${"items" in listResult}
            next: import_csv_to_sql
        next: no_export
    - import_csv_to_sql:
        call: list_file_to_import
        args:
            bucket: ${bucket}
            matchGlob: ${csvMatchGlob}
            projectId: ${projectId}
            instance: ${instance}
            databaseSchema: "postgres"
            importTable: "\"ImportTest\""
        next: finish
    - no_export:
        return: 'No CSV Export'
    - finish:
        return: ${listResult}

# 複数のCSVファイルをインポートする
list_file_to_import:
  params:
    - bucket
    - matchGlob
    - projectId
    - instance
    - databaseSchema
    - importTable
  steps:
    - list-files:
        call: googleapis.storage.v1.objects.list
        args:
          bucket: ${bucket}
          matchGlob: ${matchGlob}
        result: list_result
    - process-files:
        for:
          value: file
          in: ${list_result.items}
          steps:
            - wait-import:
                call: import_file
                args:
                  projectId: ${projectId}
                  instance: ${instance}
                  databaseSchema: ${databaseSchema}
                  importTable: ${importTable}
                  file: ${"gs://" + bucket + "/" + file.name}
    - return-step:
        return: ${list_result}

# 1つのCSVファイルをインポートする
import_file:
  params:
    - projectId
    - instance
    - databaseSchema
    - importTable
    - file
  steps:
    - callImport:
        call: http.post
        args:
          url: ${"https://sqladmin.googleapis.com/v1/projects/" + projectId + "/instances/" + instance + "/import"}
          auth:
            type: OAuth2
          body:
            importContext:
              uri: ${file}
              database: ${databaseSchema}
              fileType: CSV
              # どのカラムの順番でインポートするかを指定する
              csvImportOptions:
                table: ${importTable}
                columns:
                    - "id"
                    - "userId"
                    - "createdAt"
                    - "point"
        result: operation
    - chekoperation:
        switch:
          - condition: ${operation.body.status != "DONE"}
            next: wait
        next: completed
    - completed:
        return: "done"
    - wait:
        call: sys.sleep
        args:
          seconds: 1
        next: getoperation
    - getoperation:
        call: http.get
        args:
          url: ${operation.body.selfLink}
          auth:
            type: OAuth2
        result: operation
        next: chekoperation
  • このやり方だと、時間はかかっても、最後まで Cloud SQLにインポートすることができた
  • Cloud Workflow ならタイムアウトもないので、仮にデータがスケールしても、このやり方なら安定したインポート基盤になりそう
  • 以下は今回の計測結果
    Cloud SQLのインスタンス構成 インポート時間
    2 vCPU・8GBメモリ 約1時間30分
    4 vCPU・16GBメモリ 約1時間

おわりに

  • 大量のデータを Cloud SQL にインポートする記事が少なかったので苦労したので、この記事が誰かの助けになれば嬉しいです!
GitHubで編集を提案

Discussion