👋

【Go/GCP】Cloud SQLとBigQueryへのCSVファイルインポート

2023/06/18に公開

概要

非機能試験用に大量データをCloudSQLとBigQueryに投入するJobを構築したときのことを共有。

CloudSQLとBigQueryは、共にCSVファイルデータのインポートをサポートしています。

CloudSQL: https://cloud.google.com/sql/docs/mysql/import-export/import-export-csv?hl=ja#import_data_to
BigQuery: https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-csv?hl=ja

CloudSQLへのインポート

各言語向けのSDKが用意されているみたいです。

package repository

import (
	"context"
	"fmt"
	"time"

	"google.golang.org/api/googleapi"
	sqladmin "google.golang.org/api/sqladmin/v1beta4"
)

type (
	// SQL is abstract sql io.
	SQL interface {
		ImportCSVTransactionsToDB(ctx context.Context, objectName string) error
	}

	// CloudSQL Model of SQL.
	CloudSQL struct {
		projectID                    string
		instanceID                   string
		storageBucketName            string
		dbName                       string
		transactionTableName         string
		sqlClient                    *sqladmin.Service
		operationStatusCheckDuration time.Duration
	}

	// CloudSQLConfig is cloud sql config.
	CloudSQLConfig struct {
		ProjectID                    string
		InstanceID                   string
		StorageBucketName            string
		DBName                       string
		TransactionTableName         string
		OperationStatusCheckDuration time.Duration
	}
)

const (
	gsURIFmt                   = "gs://%s/%s/%s.csv"
	csvFileType                = "CSV"
	conflictErrorRetryDuration = 10 * time.Second
)

// NewCloudSQL creates new instance of cloud sql.
func NewCloudSQL(config CloudSQLConfig) (SQL, error) {
	sqladminService, err := sqladmin.NewService(context.Background())
	if err != nil {
		return nil, err
	}

	return &CloudSQL{
		projectID:                    config.ProjectID,
		instanceID:                   config.InstanceID,
		storageBucketName:            config.StorageBucketName,
		sqlClient:                    sqladminService,
		dbName:                       config.DBName,
		transactionTableName:         config.TransactionTableName,
		operationStatusCheckDuration: config.OperationStatusCheckDuration,
	}, nil
}

// ImportCSVTransactionsToDB import csv transactions into db.
func (r *CloudSQL) ImportCSVTransactionsToDB(ctx context.Context, objectName string) error {
	rb := &sqladmin.InstancesImportRequest{
		ImportContext: &sqladmin.ImportContext{
			FileType: csvFileType,
			Uri:      fmt.Sprintf(gsURIFmt, r.storageBucketName, TransactionSQLFilePath, objectName),
			Database: r.dbName,
			CsvImportOptions: &sqladmin.ImportContextCsvImportOptions{
				Table: r.transactionTableName,
			},
		},
	}

	if err := r.importCSVFileToDB(ctx, rb); err != nil {
		return fmt.Errorf("failed to import csv transactions to db: %v", err)
	}

	return nil
}

func (r *CloudSQL) importCSVFileToDB(ctx context.Context, rb *sqladmin.InstancesImportRequest) error {
	var operationID string
	for {
		importRes, err := r.sqlClient.Instances.Import(r.projectID, r.instanceID, rb).Context(ctx).Do()
		if err != nil {
			// 前のOperationがDONEになっていても、直後だと409が返ってくることがあるため
			googleErr, ok := err.(*googleapi.Error)
			if ok && googleErr.Code == 409 {
				fmt.Println("Conflict error occurred.")
				time.Sleep(conflictErrorRetryDuration)
				continue
			}

			fmt.Printf("failed to import csv to db: %v", err)
			return err
		}
		operationID = importRes.Name
		break
	}

	// 前のOperationが実行中だと、Conflictエラーが返ってくるため
	for {
		resp, err := r.sqlClient.Operations.Get(r.projectID, operationID).Context(ctx).Do()
		if err != nil {
			fmt.Printf("failed to get operation, OperationID: %s", operationID)
			return err
		}
		if resp.Status == "PENDING" || resp.Status == "RUNNING" {
			time.Sleep(r.operationStatusCheckDuration)
			continue
		} else if resp.Status != "DONE" {
			return fmt.Errorf("sql import status not DONE: %s", resp.Status)
		}
		break
	}
	return nil
}

個人的な反省になりますが、ガイドに軽く目を通した段階で各言語のSDKは提供されていないと決めつけてしまいました。早とちりってやつです。
httpパッケージを使ってAPIにリクエスト投げるしかなさそうだと、gke-metadata-serverからアクセストークンを取得して、それをヘッダーに付けてimportのリクエストを投げて......みたいなことを愚直にして動作検証まで終えたところでSDKが提供されていることを発見しました。早とちりってやつです。

BigQueryへのインポート

CSVファイルをインポートする場合、スキーマ自動検出をオンにするか、最初の行をどう扱うかなどをよく考えずに脳死でインポートすると期待通りにマッピングされないことがあります。トライアンドエラーで修正していくのも全然ありだと思いますが、以下のガイドに一度目を通しておくと良さそうです。

https://cloud.google.com/bigquery/docs/schema-detect?hl=ja

package repository

import (
	"context"
	"fmt"
	"time"

	"cloud.google.com/go/bigquery"
	"google.golang.org/api/option"
	"google.golang.org/grpc"
)

const (
	bigqueryConnectRetry     = 10
	bigqueryConnectRetryWait = 3 * time.Second
)

type (
	// Store is abstract store io
	Store interface {
		ImportCSVTransactionsToBq(ctx context.Context, objectName string) error
		Close()
	}

	// BqStore Model of Store.
	BqStore struct {
		bqClient          *bigquery.Client
		transactionTable  *bigquery.Table
		storageBucketName string
	}

	// BqStoreConfig is cloud store config
	BqStoreConfig struct {
		ProjectID         string
		Dataset           string
		TransactionTable  string
		StorageBucketName string
	}
)

// NewBqStore is create new instance of bq based Store
func NewBqStore(config BqStoreConfig) (Store, error) {
	opts := []option.ClientOption{
		option.WithGRPCDialOption(grpc.WithBlock()),
	}
	bqClient, err := connectBigquery(bigqueryConnectRetry, config.ProjectID, opts...)
	if err != nil {
		return nil, err
	}

	return &BqStore{
		bqClient:          bqClient,
		transactionTable:  bqClient.Dataset(config.Dataset).Table(config.TransactionTable),
		storageBucketName: config.StorageBucketName,
	}, nil
}

// ImportCSVTransactionsToBq loads CSV data from Cloud Storage into a BigQuery
// table and providing an explicit schema for the data.
func (t *BqStore) ImportCSVTransactionsToBq(ctx context.Context, objectName string) error {
	fmt.Println("ImportCSVTransactionsToBq start")
	gcsRef := bigquery.NewGCSReference(fmt.Sprintf("gs://%s/%s/%s.csv", t.storageBucketName, TransactionBqFilePath, objectName))
	gcsRef.SkipLeadingRows = 1
	gcsRef.AllowJaggedRows = true

	loader := t.transactionTable.LoaderFrom(gcsRef)

	job, err := loader.Run(ctx)
	if err != nil {
		return fmt.Errorf("failed loader run: %v", err)
	}
	status, err := job.Wait(ctx)
	if err != nil {
		return err
	}

	if status.Err() != nil {
		return fmt.Errorf("job completed with error: %v", status.Err())
	}
	fmt.Println("ImportCSVTransactionsToBq end")
	return nil
}

// Close is implement Store
func (t *BqStore) Close() {
	_ = t.bqClient.Close()
}

func connectBigquery(retry int, projectID string, opts ...option.ClientOption) (*bigquery.Client, error) {
	if retry <= 0 {
		return nil, ErrRetryOver
	}
	bq, err := func() (*bigquery.Client, error) {
		ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
		defer cancel()
		return bigquery.NewClient(ctx, projectID, opts...)
	}()
	if err == nil {
		return bq, nil
	}
	time.Sleep(bigqueryConnectRetryWait)
	return connectBigquery(retry-1, projectID)
}

終わりに

ではまた!

Discussion