👋
【Go/GCP】Cloud SQLとBigQueryへのCSVファイルインポート
概要
非機能試験用に大量データをCloudSQLとBigQueryに投入するJobを構築したときのことを共有。
CloudSQLとBigQueryは、共にCSVファイルデータのインポートをサポートしています。
CloudSQL: https://cloud.google.com/sql/docs/mysql/import-export/import-export-csv?hl=ja#import_data_to
BigQuery: https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-csv?hl=ja
CloudSQLへのインポート
各言語向けのSDKが用意されているみたいです。
package repository
import (
"context"
"fmt"
"time"
"google.golang.org/api/googleapi"
sqladmin "google.golang.org/api/sqladmin/v1beta4"
)
type (
// SQL is abstract sql io.
SQL interface {
ImportCSVTransactionsToDB(ctx context.Context, objectName string) error
}
// CloudSQL Model of SQL.
CloudSQL struct {
projectID string
instanceID string
storageBucketName string
dbName string
transactionTableName string
sqlClient *sqladmin.Service
operationStatusCheckDuration time.Duration
}
// CloudSQLConfig is cloud sql config.
CloudSQLConfig struct {
ProjectID string
InstanceID string
StorageBucketName string
DBName string
TransactionTableName string
OperationStatusCheckDuration time.Duration
}
)
const (
gsURIFmt = "gs://%s/%s/%s.csv"
csvFileType = "CSV"
conflictErrorRetryDuration = 10 * time.Second
)
// NewCloudSQL creates new instance of cloud sql.
func NewCloudSQL(config CloudSQLConfig) (SQL, error) {
sqladminService, err := sqladmin.NewService(context.Background())
if err != nil {
return nil, err
}
return &CloudSQL{
projectID: config.ProjectID,
instanceID: config.InstanceID,
storageBucketName: config.StorageBucketName,
sqlClient: sqladminService,
dbName: config.DBName,
transactionTableName: config.TransactionTableName,
operationStatusCheckDuration: config.OperationStatusCheckDuration,
}, nil
}
// ImportCSVTransactionsToDB import csv transactions into db.
func (r *CloudSQL) ImportCSVTransactionsToDB(ctx context.Context, objectName string) error {
rb := &sqladmin.InstancesImportRequest{
ImportContext: &sqladmin.ImportContext{
FileType: csvFileType,
Uri: fmt.Sprintf(gsURIFmt, r.storageBucketName, TransactionSQLFilePath, objectName),
Database: r.dbName,
CsvImportOptions: &sqladmin.ImportContextCsvImportOptions{
Table: r.transactionTableName,
},
},
}
if err := r.importCSVFileToDB(ctx, rb); err != nil {
return fmt.Errorf("failed to import csv transactions to db: %v", err)
}
return nil
}
func (r *CloudSQL) importCSVFileToDB(ctx context.Context, rb *sqladmin.InstancesImportRequest) error {
var operationID string
for {
importRes, err := r.sqlClient.Instances.Import(r.projectID, r.instanceID, rb).Context(ctx).Do()
if err != nil {
// 前のOperationがDONEになっていても、直後だと409が返ってくることがあるため
googleErr, ok := err.(*googleapi.Error)
if ok && googleErr.Code == 409 {
fmt.Println("Conflict error occurred.")
time.Sleep(conflictErrorRetryDuration)
continue
}
fmt.Printf("failed to import csv to db: %v", err)
return err
}
operationID = importRes.Name
break
}
// 前のOperationが実行中だと、Conflictエラーが返ってくるため
for {
resp, err := r.sqlClient.Operations.Get(r.projectID, operationID).Context(ctx).Do()
if err != nil {
fmt.Printf("failed to get operation, OperationID: %s", operationID)
return err
}
if resp.Status == "PENDING" || resp.Status == "RUNNING" {
time.Sleep(r.operationStatusCheckDuration)
continue
} else if resp.Status != "DONE" {
return fmt.Errorf("sql import status not DONE: %s", resp.Status)
}
break
}
return nil
}
個人的な反省になりますが、ガイドに軽く目を通した段階で各言語のSDKは提供されていないと決めつけてしまいました。早とちりってやつです。
httpパッケージを使ってAPIにリクエスト投げるしかなさそうだと、gke-metadata-serverからアクセストークンを取得して、それをヘッダーに付けてimportのリクエストを投げて......みたいなことを愚直にして動作検証まで終えたところでSDKが提供されていることを発見しました。早とちりってやつです。
BigQueryへのインポート
CSVファイルをインポートする場合、スキーマ自動検出をオンにするか、最初の行をどう扱うかなどをよく考えずに脳死でインポートすると期待通りにマッピングされないことがあります。トライアンドエラーで修正していくのも全然ありだと思いますが、以下のガイドに一度目を通しておくと良さそうです。
package repository
import (
"context"
"fmt"
"time"
"cloud.google.com/go/bigquery"
"google.golang.org/api/option"
"google.golang.org/grpc"
)
const (
bigqueryConnectRetry = 10
bigqueryConnectRetryWait = 3 * time.Second
)
type (
// Store is abstract store io
Store interface {
ImportCSVTransactionsToBq(ctx context.Context, objectName string) error
Close()
}
// BqStore Model of Store.
BqStore struct {
bqClient *bigquery.Client
transactionTable *bigquery.Table
storageBucketName string
}
// BqStoreConfig is cloud store config
BqStoreConfig struct {
ProjectID string
Dataset string
TransactionTable string
StorageBucketName string
}
)
// NewBqStore is create new instance of bq based Store
func NewBqStore(config BqStoreConfig) (Store, error) {
opts := []option.ClientOption{
option.WithGRPCDialOption(grpc.WithBlock()),
}
bqClient, err := connectBigquery(bigqueryConnectRetry, config.ProjectID, opts...)
if err != nil {
return nil, err
}
return &BqStore{
bqClient: bqClient,
transactionTable: bqClient.Dataset(config.Dataset).Table(config.TransactionTable),
storageBucketName: config.StorageBucketName,
}, nil
}
// ImportCSVTransactionsToBq loads CSV data from Cloud Storage into a BigQuery
// table and providing an explicit schema for the data.
func (t *BqStore) ImportCSVTransactionsToBq(ctx context.Context, objectName string) error {
fmt.Println("ImportCSVTransactionsToBq start")
gcsRef := bigquery.NewGCSReference(fmt.Sprintf("gs://%s/%s/%s.csv", t.storageBucketName, TransactionBqFilePath, objectName))
gcsRef.SkipLeadingRows = 1
gcsRef.AllowJaggedRows = true
loader := t.transactionTable.LoaderFrom(gcsRef)
job, err := loader.Run(ctx)
if err != nil {
return fmt.Errorf("failed loader run: %v", err)
}
status, err := job.Wait(ctx)
if err != nil {
return err
}
if status.Err() != nil {
return fmt.Errorf("job completed with error: %v", status.Err())
}
fmt.Println("ImportCSVTransactionsToBq end")
return nil
}
// Close is implement Store
func (t *BqStore) Close() {
_ = t.bqClient.Close()
}
func connectBigquery(retry int, projectID string, opts ...option.ClientOption) (*bigquery.Client, error) {
if retry <= 0 {
return nil, ErrRetryOver
}
bq, err := func() (*bigquery.Client, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
return bigquery.NewClient(ctx, projectID, opts...)
}()
if err == nil {
return bq, nil
}
time.Sleep(bigqueryConnectRetryWait)
return connectBigquery(retry-1, projectID)
}
終わりに
ではまた!
Discussion