🐷
goのsliceを読み込みジョブでBigQueryにアップロードする方法
背景
機能的には読み込みジョブを利用したかったのですが、いちいちローカルでtmpファイルを作成したり、ファイルをGCSにアップロードしたりしてからBigQueryにアップロードするのは嫌だったので、goのアプリケーション内でsliceとして保持してるデータをそのまま読み込みジョブに突っ込んでみました。
BQにアップロードする方法の種類と特徴についてはこちらに詳しく書きました
方法
package main
import (
"context"
"encoding/json"
"fmt"
"io"
"reflect"
"cloud.google.com/go/bigquery"
)
type Item struct {
ID string
Name string
Age int
}
func jobInsert(projectID, datasetID, tableID string, rows interface{}) error {
ctx := context.Background()
client, err := bigquery.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("bigquery.NewClient: %v", err)
}
defer client.Close()
pr, pw := io.Pipe()
var errs []error
go func() {
jsonEncoder := json.NewEncoder(pw)
s := reflect.ValueOf(rows)
for i := 0; i < s.Len(); i++ {
err := jsonEncoder.Encode(s.Index(i).Interface())
if err != nil {
errs = append(errs, err)
}
}
pw.Close()
}()
if len(errs) != 0 {
return fmt.Errorf("%v", errs)
}
source := bigquery.NewReaderSource(pr)
source.AutoDetect = true
source.SkipLeadingRows = 1
loader := client.Dataset(datasetID).Table(tableID).LoaderFrom(source)
loader.LoadConfig.WriteDisposition = bigquery.WriteTruncate
job, err := loader.Run(ctx)
if err != nil {
return err
}
status, err := job.Wait(ctx)
if err != nil {
return err
}
if err := status.Err(); err != nil {
return err
}
return nil
}
func main() {
projectID := "stock-data-dev"
datasetID := "testing"
tableID := "hoge"
items := []*Item{
{Name: "Phred Phlyntstone", Age: 32},
{Name: "Wylma Phlyntstone", Age: 29},
}
if err := jobInsert(projectID, datasetID, tableID, items); err != nil {
fmt.Println(err)
}
}
解説
引数で渡したstructのスライスをio.Pipe
とjson.NewEncoder
を利用してio.Reader
に変換します。
io.Reader
に変換してしまえば、bigquery.NewReaderSource
の引数として利用できるので、読み込みジョブとしてBigQueryにアップロードすることができます。
メリット
goでsliceからアップロードする場合はストリーミング挿入を利用するのが一般的な気がしますが、あえて読み込みジョブを利用するメリットとして以下のようなものがあります。
- データの洗い替え(WRITE_TRUNCATE)、データの追加(WRITE_APPEND)のオプションを利用できる
- ストリーミング挿入と比べてインサートに時間はかかってしまうが、インサート完了した時には確実にデータが挿入されている
- ストリーミング挿入だとインサートが完了した直後にクエリするとデータをまだクエリできない場合がある
- ストリーミング挿入ではインサート後にしばらくデータの更新や削除が制限されるが、読み込みジョブだとそれがない
- 挿入できるパーティションの制限がない。ストリーミング挿入では過去 31 日以内のパーティションと現在の日付から 16 日後まで
Discussion