📕

goでBigQueryにデータをアップロードする方法の比較

2021/08/25に公開

BigQueryにデータをアップロードする方法はいくつもありますが、基本的になアップロード方法である読み込みジョブとストリーミング挿入について調べたのでまとめました。

アップロードする方法は大きく2種類

  • 読み込みジョブ
  • ストリーミング挿入

特徴

読み込みジョブ

  • 無料で使える
  • Cloud Storage に置いたデータ (Avro, ORC,JSON, CSV, Parquet など)をスループット最適化のバッチジョブで読み込め、大きなファイルがたくさんある場合に向いている
  • データの洗い替え(WRITE_TRUNCATE)、データの追加(WRITE_APPEND)などのオプションを指定できる

ストリーミング挿入

  • 料金がかかる
  • 一度に 1 レコードずつ、または小さいバッチ単位でデータをストリーミング挿入する
  • データをリアルタイムでBigQueryに挿入するので、大量のデータをリアルタイムに収集するアプリがある場合に有効
  • 挿入できるパーティションは過去 31 日以内のパーティションと現在の日付から 16 日後まで
  • 行にinsertIDを付与するとなるべく重複しないようにしてくれる機能がある(ベストエフォート型の重複排除)
  • ストリーミング バッファの都合で、アップロード直後にデータが参照できない場合があったり、アップロードしてから最大90分間はデータの更新などできなかったりする

サンプルコード

読み込みジョブ

ローカルのcsvファイルをアップロードするサンプルコードです。
様々なファイル形式に対応していますし、CloudStorageにあるファイルをロードするこもできます。

package main

import (
	"context"
	"fmt"
	"os"

	"cloud.google.com/go/bigquery"
)

func importCSVFromFile(projectID, datasetID, tableID, filename string) error {
	ctx := context.Background()
	client, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("bigquery.NewClient: %v", err)
	}
	defer client.Close()

	f, err := os.Open(filename)
	if err != nil {
		return err
	}
	source := bigquery.NewReaderSource(f)
	source.AutoDetect = true
	source.SkipLeadingRows = 1

	loader := client.Dataset(datasetID).Table(tableID).LoaderFrom(source)
	// WriteTruncate(洗い替え)で書き込みする
	loader.LoadConfig.WriteDisposition = bigquery.WriteTruncate

	job, err := loader.Run(ctx)
	if err != nil {
		return err
	}
	status, err := job.Wait(ctx)
	if err != nil {
		return err
	}
	if err := status.Err(); err != nil {
		return err
	}
	return nil
}

func main() {
	projectID := "stock-data-dev"
	datasetID := "testing"
	tableID := "hoge"
	fileName := "path/to/your/file"

	if err := importCSVFromFile(projectID, datasetID, tableID, fileName); err != nil {
		fmt.Println(err)
	}
}

ストリーミング挿入

package main

import (
	"context"
	"fmt"

	"cloud.google.com/go/bigquery"
)

type Item struct {
	ID   string
	Name string
	Age  int
}

// ValueSaver interfaceを実装することで細かい設定が可能
// 実装しなくてもよくて、その場合はBigQueryがよしなにやってくる
func (i *Item) Save() (map[string]bigquery.Value, string, error) {
	return map[string]bigquery.Value{
		"id":        i.ID,
		"full_name": i.Name,
		"age":       i.Age,
	}, i.ID, nil
}

func insertItems(projectID, datasetID, tableID string, items []*Item) error {
	ctx := context.Background()
	client, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("bigquery.NewClient: %v", err)
	}
	defer client.Close()

	inserter := client.Dataset(datasetID).Table(tableID).Inserter()
	if err := inserter.Put(ctx, items); err != nil {
		return err
	}
	return nil
}

func main() {
	projectID := "your-project-id"
	datasetID := "your-dataset-id"
	tableID := "your-table-name"

	items := []*Item{
		{Name: "Phred Phlyntstone", Age: 32},
		{Name: "Wylma Phlyntstone", Age: 29},
	}

	if err := insertItems(projectID, datasetID, tableID, items); err != nil {
		fmt.Println(err)
	}
}

参照

Discussion