💬

Go言語でBigQueryにレコードを投入する

2022/05/04に公開約5,500字

Google Cloud Client Libraries for Goを用いてBigQueryにデータを投入する方法。

データ読み込み方針

Go言語用ライブラリ以外も含めたBigQueryへのデータ投入方法は データの読み込みの概要に記載されている。Go言語用ライブラリに限定すると、以下の方式がある。

バッチ読み込み - 読み込みジョブ

読み込みジョブはGoogle Cloud Storageに、またはローカルに置いたファイルを指定してバッチ読み込みする。読み込むファイルフォーマットはCSVなどが選択できる。

GCS上のファイルを読み込むサンプルコードは以下の通り。
特別なことは何もしておらず、GCS上のファイル参照を作成し、ローダ→ジョブを生成してジョブが完了するまで待っている。これによりGCS上のファイルをBigQueryに読み込むことができる。当然ながらBigQueryに読み込み後もGCS上にはファイルは残るのでBigQueryにデータを投入するためだけにGCSを利用する場合は、ファイルの配置と削除の操作が必要になる。

func loading(projectID string, bqDataset string, bqTable string, bucket string, objectkey string) int {
	ctx := context.Background()
	bqCli, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		log.Printf("failed to create bigquery client: %v", err)
		return 2
	}
	dataset := bqCli.Dataset(bqDataset)
	table := dataset.Table(bqTable)

	gcsRef := bigquery.NewGCSReference(fmt.Sprintf("gs://%s/%s", bucket, objectkey))
	gcsRef.AllowJaggedRows = true // treat missing row as nil
	loader := table.LoaderFrom(gcsRef)
	loader.CreateDisposition = bigquery.CreateNever // treat error if table not exist
	job, err := loader.Run(ctx)

	for {
		status, err := job.Wait(ctx)
		if err != nil {
			log.Printf("failed to poll job status: %v", err)
			return 2
		}
		if status.Done() {
			if status.Err() != nil {
				log.Printf("Job failed: %v", status.Err())
				return 2
			}
			break
		}
	}

	log.Print("Sucessfully load data into BigQuery")

	return 0
}

ローカルファイルのファイルを読み込むサンプルコードは以下の通り。
処理の流れはGCSから読み込むときとほぼ同じで、loaderを生成するときに引数にgcs refを指定するかローカルのファイルを指定するかの違いのみ。

func loadingLocal(projectID string, bqDataset string, bqTable string, filepath string) int {
	ctx := context.Background()
	bqCli, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		log.Printf("failed to create bigquery client: %v", err)
		return 2
	}
	dataset := bqCli.Dataset(bqDataset)
	table := dataset.Table(bqTable)

	log.Printf("load rows from local file %s", filepath)

	f, err := os.Open(filepath)
	if err != nil {
		log.Printf("failed to open file %s: %v", filepath, err)
		return 2
	}
	source := bigquery.NewReaderSource(f)
	// source.AutoDetect = true
	source.SkipLeadingRows = 1
	loader := table.LoaderFrom(source)
	job, err := loader.Run(ctx)

	for {
		status, err := job.Wait(ctx)
		if err != nil {
			log.Printf("failed to poll job status: %v", err)
			return 2
		}
		if status.Done() {
			if status.Err() != nil {
				log.Printf("Job failed: %v", status.Err())
				return 2
			}
			break
		}
	}

	log.Print("Sucessfully load data into BigQuery")

	return 0
}

ストリーミング - ストリーミングAPI

ストリーミングAPIは既に非推奨となった方法。現状もサポートされているがStorage Write APIの利用を推奨している。ただし、Go言語のSDKではStorage Write APIがプレビュー扱い(2022-05-01現在)になっており、まだストリーミングAPIを活用する場面はありそう。

ストリーミングAPIを用いたサンプルコードは以下の通り。
いわゆるinsertAllメソッドを用いた方法(Google側のAPIがinsertAllなことに由来、SDK上はinsertAllではなくPut)で、構造体のスライスを渡している。また、構造体に対するメソッドとしてSave(ValueSaverインタフェース)を定義することでベストエフォート型の重複排除が利用可能になる。Saveは実装しなくてもよい。

type Item struct {
	Name  string
	Size  float64
	Count int
}

func (i *Item) Save() (map[string]bigquery.Value, string, error) {
	return map[string]bigquery.Value{
		"name": i.Name,
		"size": i.Size,
		"count": i.Count,
	}, i.Name, nil
}

func uploading(projectID string, bqDataset string, bqTable string) int {
	ctx := context.Background()
	bqCli, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		log.Printf("failed to create bigquery client: %v", err)
		return 2
	}
	dataset := bqCli.Dataset(bqDataset)
	table := dataset.Table(bqTable)

	log.Printf("load rows from stream")

	inserter := table.Inserter()
	items := []*Item{
		{Name: "s1", Size: 32.6, Count: 7},
		{Name: "s2", Size: 4, Count: 2},
		{Name: "s3", Size: 101.5, Count: 1},
	}
	if err := inserter.Put(ctx, items); err != nil {
		log.Printf("failed to put items: %v", err)
		return 2
	}

	return 0
}

Storage Write API

Storage Write APIはバッチ読み込みにもストリーミングにも対応できる新しいAPI。ただし、Go言語では2022-05-01現在はプレビュー扱いになっており、今後どのように利用されるか不明。

Read APIはgolang-samplesにサンプルコードがある。しかし、Write APIのサンプルコードは断片のみで整備されておらずどのように実装すればよいか不明。StackOverflowに同様の質問が挙げられており(2021-12)、しかも Protocol Buffer の定義が必要だったりと、やりたい事に対して大分too muchな感じがする。他言語のドキュメント・サンプルを見てもProtocol Bufferは利用しているのでProtocol Bufferは必須かもしれない。

その他

その他の方法としてSQL(DML)を用いる方法などもあるが省略。

参考

Discussion

ログインするとコメントできます