📕
goでBigQueryにデータをアップロードする方法の比較
BigQueryにデータをアップロードする方法はいくつもありますが、基本的になアップロード方法である読み込みジョブとストリーミング挿入について調べたのでまとめました。
アップロードする方法は大きく2種類
- 読み込みジョブ
- ストリーミング挿入
特徴
読み込みジョブ
- 無料で使える
- Cloud Storage に置いたデータ (Avro, ORC,JSON, CSV, Parquet など)をスループット最適化のバッチジョブで読み込め、大きなファイルがたくさんある場合に向いている
- データの洗い替え(WRITE_TRUNCATE)、データの追加(WRITE_APPEND)などのオプションを指定できる
ストリーミング挿入
- 料金がかかる
- 一度に 1 レコードずつ、または小さいバッチ単位でデータをストリーミング挿入する
- データをリアルタイムでBigQueryに挿入するので、大量のデータをリアルタイムに収集するアプリがある場合に有効
- 挿入できるパーティションは過去 31 日以内のパーティションと現在の日付から 16 日後まで
- 行にinsertIDを付与するとなるべく重複しないようにしてくれる機能がある(ベストエフォート型の重複排除)
- ストリーミング バッファの都合で、アップロード直後にデータが参照できない場合があったり、アップロードしてから最大90分間はデータの更新などできなかったりする
サンプルコード
読み込みジョブ
ローカルのcsvファイルをアップロードするサンプルコードです。
様々なファイル形式に対応していますし、CloudStorageにあるファイルをロードするこもできます。
package main
import (
"context"
"fmt"
"os"
"cloud.google.com/go/bigquery"
)
func importCSVFromFile(projectID, datasetID, tableID, filename string) error {
ctx := context.Background()
client, err := bigquery.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("bigquery.NewClient: %v", err)
}
defer client.Close()
f, err := os.Open(filename)
if err != nil {
return err
}
source := bigquery.NewReaderSource(f)
source.AutoDetect = true
source.SkipLeadingRows = 1
loader := client.Dataset(datasetID).Table(tableID).LoaderFrom(source)
// WriteTruncate(洗い替え)で書き込みする
loader.LoadConfig.WriteDisposition = bigquery.WriteTruncate
job, err := loader.Run(ctx)
if err != nil {
return err
}
status, err := job.Wait(ctx)
if err != nil {
return err
}
if err := status.Err(); err != nil {
return err
}
return nil
}
func main() {
projectID := "stock-data-dev"
datasetID := "testing"
tableID := "hoge"
fileName := "path/to/your/file"
if err := importCSVFromFile(projectID, datasetID, tableID, fileName); err != nil {
fmt.Println(err)
}
}
ストリーミング挿入
package main
import (
"context"
"fmt"
"cloud.google.com/go/bigquery"
)
type Item struct {
ID string
Name string
Age int
}
// ValueSaver interfaceを実装することで細かい設定が可能
// 実装しなくてもよくて、その場合はBigQueryがよしなにやってくる
func (i *Item) Save() (map[string]bigquery.Value, string, error) {
return map[string]bigquery.Value{
"id": i.ID,
"full_name": i.Name,
"age": i.Age,
}, i.ID, nil
}
func insertItems(projectID, datasetID, tableID string, items []*Item) error {
ctx := context.Background()
client, err := bigquery.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("bigquery.NewClient: %v", err)
}
defer client.Close()
inserter := client.Dataset(datasetID).Table(tableID).Inserter()
if err := inserter.Put(ctx, items); err != nil {
return err
}
return nil
}
func main() {
projectID := "your-project-id"
datasetID := "your-dataset-id"
tableID := "your-table-name"
items := []*Item{
{Name: "Phred Phlyntstone", Age: 32},
{Name: "Wylma Phlyntstone", Age: 29},
}
if err := insertItems(projectID, datasetID, tableID, items); err != nil {
fmt.Println(err)
}
}
参照
- データの読み込みの概要
- ストリーミング挿入
- Cloud Storage からの JSON データの読み込み
- 料金
Discussion