😊
Cloud SQLへ、Storage上の複数のCSVファイルをインポートするなら、Google API経由が良さげ
結論
- Cloud SQL に、複数の CSV ファイルをインポートするなら、Google API 経由の方がすんなり行った!
- 最初は Cloud Run x NodeJS で頑張ろうとしたが、複数ファイルを読み込もうとすると何故か処理が止まってしまう
実現したいこと
- Cloud Storage 上の特定のバケット・ディレクトリにある、複数の CSV ファイルを Cloud SQL にまとめてインポートしたい
- (自分の場合)5.5MB くらいの CSV ファイルが 150 個(全体で一億行)ある
Cloud Run で頑張ろうとした痕跡
- 以下が CSV インポートに使ったコード
import { parse } from "csv-parse";
import admin from "firebase-admin";
import pg from "pg";
const connectionConfig: pg.PoolConfig = {
host: process.env.PG_HOST,
user: process.env.PG_USER,
password: process.env.PG_PASSWORD,
database: process.env.PG_DATABASE,
port: 5432,
};
// Eventarcから受け取ったCSV情報を表示したい
const bucketName = process.argv[2];
const main = async () => {
if (!bucketName) {
console.error("Invalid arguments");
process.exit(1);
}
console.time("Cloud SQLへのインポート");
const bucket = admin.storage().bucket(bucketName);
// バケット内の CSV ファイル一覧を取得
const [csvFiles] = await bucket.getFiles({ matchGlob: `*.csv` });
const pool = new pg.Pool(connectionConfig);
const csvParser = parse({
columns: true,
skip_empty_lines: true,
delimiter: [","],
});
const failedFiles: string[] = [];
let totalRowCount = 0;
/**
* 複数のCSVファイルを読み込む
*/
const filePromises = csvFiles.map(async (csvFile) => {
try {
await new Promise<void>((resolve, reject) => {
csvFile
.createReadStream()
.pipe(csvParser)
.on("readable", async () => {
const client = await pool.connect();
client.on("error", (err) => {
console.error("Error connecting to Cloud SQL:!!", err);
reject(err);
});
try {
await client.query("BEGIN");
let record;
let rowCount = 0;
let bulkInsertValues = [];
const chunkSize = 100000;
while ((record = csvParser.read())) {
bulkInsertValues.push([
record.id,
record.userId,
record.createdAt,
record.point,
]);
rowCount++;
totalRowCount++;
if (bulkInsertValues.length >= chunkSize) {
const query = {
text: `INSERT INTO "ImportTest" ("id", "userId", "createdAt", "point")
VALUES ${bulkInsertValues
.map(
(_, index) =>
`($${index * 4 + 1}, $${index * 7 + 2}, $${
index * 4 + 3
}, $${index * 4 + 4})`
)
.join(", ")}`,
values: bulkInsertValues.flat(),
};
await client.query(query);
bulkInsertValues = [];
}
}
if (bulkInsertValues.length > 0) {
const query = {
text: `INSERT INTO "ImportTest" ("id", "userId", "createdAt", "point")
VALUES ${bulkInsertValues
.map(
(_, index) =>
`($${index * 4 + 1}, $${index * 4 + 2}, $${
index * 4 + 3
}, $${index * 4 + 4})`
)
.join(", ")}`,
values: bulkInsertValues.flat(),
};
await client.query(query);
}
await client.query("COMMIT");
console.log(
`${rowCount}行のデータをCloud SQLにインポートしました。`
);
} catch (err) {
await client.query("ROLLBACK");
console.error("Error connecting to Cloud SQL:", err);
reject(err);
} finally {
client.release(); // 接続をプールに返却
}
})
.on("error", (err) => {
console.error("CSVパースエラー:", err);
reject(err);
})
.on("end", async () => {
resolve();
});
});
} catch (err) {
failedFiles.push(csvFile.name);
}
});
// 並列でファイルを読み込む
await Promise.all(filePromises);
await pool.end(); // 接続プールを終了
console.log("Pool.end called!");
if (failedFiles.length > 0) {
console.error("インポートに失敗したファイル: ", failedFiles);
} else {
console.log(
`${csvFiles.length}個のCSVファイルのインポートが完了しました。`
);
}
console.timeEnd(`${competitionRoundId}から、Cloud SQLへのインポート`);
};
main();
- 最初は CSV ファイルが大きすぎて、メモリ的な制約に引っかかってしまったのかもしれないと思ったが。
- ただファイル数を2つに減らしても、それでも処理が止まってしまったので、いまいち釈然としない
- そこで、Google API を使ってみることにした
Google API を使ってみる
- こちらの記事を参考に、Cloud Workflow の中で、Google API を使って CSV を読み込んだ
-
🚨注意1️ これで記事を書こうとも思ったくらいだが、API経由で CSV を Cloud SQL にインポートする際には、APIを実行するサービスアカウントだけではなく、Cloud SQL のサービスアカウントにも、バケットに対して適切な権限を与える必要がある(ソース)
- 自分はここに詰まって、だいぶん時間を溶かしてしまった...
-
🚨注意2️ API経由でCSVインポートする場合、CSVにヘッダーは含めないようにすること
- API経由だと、ヘッダーを無視するといったオプションがない
- なので、出力時のスキーマを別で管理しておいて、インポート時にそのスキーマを指定するようにした
- 以下は、Cloud Workflow のワークフローのコード
main:
params: [event]
steps:
- init:
assign:
- projectId: 'project-id'
- instance: 'instance-id'
- datasetId: 'dataset-id'
- tableId: 'table-id'
- bucket: ${event.bucket}
- log_event:
call: sys.log
args:
data: ${event}
- list_csv_files_to_delete:
call: googleapis.storage.v1.objects.list
args:
bucket: ${bucket}
result: files_to_delete
- delete_csv_files_if_exist:
for:
value: file
in: ${files_to_delete.items}
steps:
- delete_file:
call: googleapis.storage.v1.objects.delete
args:
bucket: ${bucket}
# オブジェクトがディレクトリにある場合、URLエンコードしないと404エラーになってしまうことに注意
# https://cloud.google.com/workflows/docs/reference/googleapis/storage/v1/objects/delete
object: ${text.url_encode(file.name)}
- assign_match_glob:
assign:
- csvMatchGlob: "*.csv"
- export_csv_from_bq:
call: googleapis.bigquery.v2.jobs.insert
args:
projectId: ${projectId}
body:
configuration:
extract:
sourceTable:
projectId: ${projectId}
datasetId: ${datasetId}
tableId: ${tableId}
destinationUri: ${"gs://" + bucket + "/" + csvMatchGlob}
# ヘッダーは含めない
printHeader: false
- count_storage_files:
call: googleapis.storage.v1.objects.list
args:
bucket: ${bucket}
matchGlob: ${csvMatchGlob}
result: listResult
- check_if_files_exist:
switch:
- condition: ${"items" in listResult}
next: import_csv_to_sql
next: no_export
- import_csv_to_sql:
call: list_file_to_import
args:
bucket: ${bucket}
matchGlob: ${csvMatchGlob}
projectId: ${projectId}
instance: ${instance}
databaseSchema: "postgres"
importTable: "\"ImportTest\""
next: finish
- no_export:
return: 'No CSV Export'
- finish:
return: ${listResult}
# 複数のCSVファイルをインポートする
list_file_to_import:
params:
- bucket
- matchGlob
- projectId
- instance
- databaseSchema
- importTable
steps:
- list-files:
call: googleapis.storage.v1.objects.list
args:
bucket: ${bucket}
matchGlob: ${matchGlob}
result: list_result
- process-files:
for:
value: file
in: ${list_result.items}
steps:
- wait-import:
call: import_file
args:
projectId: ${projectId}
instance: ${instance}
databaseSchema: ${databaseSchema}
importTable: ${importTable}
file: ${"gs://" + bucket + "/" + file.name}
- return-step:
return: ${list_result}
# 1つのCSVファイルをインポートする
import_file:
params:
- projectId
- instance
- databaseSchema
- importTable
- file
steps:
- callImport:
call: http.post
args:
url: ${"https://sqladmin.googleapis.com/v1/projects/" + projectId + "/instances/" + instance + "/import"}
auth:
type: OAuth2
body:
importContext:
uri: ${file}
database: ${databaseSchema}
fileType: CSV
# どのカラムの順番でインポートするかを指定する
csvImportOptions:
table: ${importTable}
columns:
- "id"
- "userId"
- "createdAt"
- "point"
result: operation
- chekoperation:
switch:
- condition: ${operation.body.status != "DONE"}
next: wait
next: completed
- completed:
return: "done"
- wait:
call: sys.sleep
args:
seconds: 1
next: getoperation
- getoperation:
call: http.get
args:
url: ${operation.body.selfLink}
auth:
type: OAuth2
result: operation
next: chekoperation
- このやり方だと、時間はかかっても、最後まで Cloud SQLにインポートすることができた
- Cloud Workflow ならタイムアウトもないので、仮にデータがスケールしても、このやり方なら安定したインポート基盤になりそう
- 以下は今回の計測結果
Cloud SQLのインスタンス構成 インポート時間 2 vCPU・8GBメモリ 約1時間30分 4 vCPU・16GBメモリ 約1時間
おわりに
- 大量のデータを Cloud SQL にインポートする記事が少なかったので苦労したので、この記事が誰かの助けになれば嬉しいです!
Discussion