🐷

goのsliceを読み込みジョブでBigQueryにアップロードする方法

2021/08/25に公開

背景

機能的には読み込みジョブを利用したかったのですが、いちいちローカルでtmpファイルを作成したり、ファイルをGCSにアップロードしたりしてからBigQueryにアップロードするのは嫌だったので、goのアプリケーション内でsliceとして保持してるデータをそのまま読み込みジョブに突っ込んでみました。

BQにアップロードする方法の種類と特徴についてはこちらに詳しく書きました
https://zenn.dev/nekoshita/articles/6d5cb3aa7e4305

方法

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"io"
	"reflect"

	"cloud.google.com/go/bigquery"
)

type Item struct {
	ID   string
	Name string
	Age  int
}

func jobInsert(projectID, datasetID, tableID string, rows interface{}) error {
	ctx := context.Background()
	client, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("bigquery.NewClient: %v", err)
	}
	defer client.Close()

	pr, pw := io.Pipe()
	var errs []error
	go func() {
		jsonEncoder := json.NewEncoder(pw)
		s := reflect.ValueOf(rows)
		for i := 0; i < s.Len(); i++ {
			err := jsonEncoder.Encode(s.Index(i).Interface())
			if err != nil {
				errs = append(errs, err)
			}
		}
		pw.Close()
	}()
	if len(errs) != 0 {
		return fmt.Errorf("%v", errs)
	}

	source := bigquery.NewReaderSource(pr)
	source.AutoDetect = true
	source.SkipLeadingRows = 1

	loader := client.Dataset(datasetID).Table(tableID).LoaderFrom(source)
	loader.LoadConfig.WriteDisposition = bigquery.WriteTruncate

	job, err := loader.Run(ctx)
	if err != nil {
		return err
	}
	status, err := job.Wait(ctx)
	if err != nil {
		return err
	}
	if err := status.Err(); err != nil {
		return err
	}
	return nil
}

func main() {
	projectID := "stock-data-dev"
	datasetID := "testing"
	tableID := "hoge"
	
	items := []*Item{
		{Name: "Phred Phlyntstone", Age: 32},
		{Name: "Wylma Phlyntstone", Age: 29},
	}

	if err := jobInsert(projectID, datasetID, tableID, items); err != nil {
		fmt.Println(err)
	}
}

解説

引数で渡したstructのスライスをio.Pipejson.NewEncoderを利用してio.Readerに変換します。
io.Readerに変換してしまえば、bigquery.NewReaderSourceの引数として利用できるので、読み込みジョブとしてBigQueryにアップロードすることができます。

メリット

goでsliceからアップロードする場合はストリーミング挿入を利用するのが一般的な気がしますが、あえて読み込みジョブを利用するメリットとして以下のようなものがあります。

  • データの洗い替え(WRITE_TRUNCATE)、データの追加(WRITE_APPEND)のオプションを利用できる
  • ストリーミング挿入と比べてインサートに時間はかかってしまうが、インサート完了した時には確実にデータが挿入されている
    • ストリーミング挿入だとインサートが完了した直後にクエリするとデータをまだクエリできない場合がある
  • ストリーミング挿入ではインサート後にしばらくデータの更新や削除が制限されるが、読み込みジョブだとそれがない
  • 挿入できるパーティションの制限がない。ストリーミング挿入では過去 31 日以内のパーティションと現在の日付から 16 日後まで

Discussion