Go言語でGCPのpubsub→BigQueryへデータ送信する(トピックスキーマなし)
本記事の内容
今回実装する構成↓
ローカル環境からGo言語を使って
- pubsubのメッセージをパブリッシュ
- pubsubサブスクリプションを使ってBigQueryへデータ送信
を行います
今回はトピックスキーマを用いません
※トピックスキーマを用いない場合BigQueryのテーブルには
「data」という列名、BYTES型でデータが格納されます
トピックスキーマを使ったpubsub→BigQueryへのデータ格納はいずれ記事にしたいと思います
BigQueryテーブルの準備
{yoour_project_id}.test_dataset.pubsubTestというテーブルを作成します
スキーマ構成は列名:data、BYTES型 の1列のみです
これで完了です
pubsubの準備
pubsubなどのメッセージングサービスは
・トピック
・サブスクリプション
というものを用意します
メッセージング配信においては
トピック:受信側
サブスクリプション:送信側
という認識でOKです
詳しくは他の記事もしくは公式ドキュメントを確認ください
トピックの作成
「test_pubsub」というトピックを作成します
サブスクリプションの作成
サブスクリプション側で「BigQueryへの書き込み」という項目があるのでそれを選択します
そうすると
サービス アカウント ********@gcp-sa-pubsub.iam.gserviceaccount.com に、次の BigQuery テーブルへの書き込みに必要な権限がありません: bigquery.tables.get、bigquery.tables.updateData。
というメッセージが表示されます。
後で設定が必要になりますが一旦保存を押しておきましょう
pubsubのサービスアカウントにBigQueryへ書き込み権限を付与
IAM画面に移動します。
アクセス権を付与する画面にてさきほどの
********@gcp-sa-pubsub.iam.gserviceaccount.com
にBigQueryの権限を付与します
今回は少し過剰気味ですが、「BigQueryデータ編集者」を与えておきます
※本番環境なら上記bigquery.tables.get、bigquery.tables.updateData権限のみ付与するロールを作成しましょう
クライアント側のメッセージ送信をGoで実装する
必須なことは下記2つ
- アクセス権を持っていること
- GoのAPIをダウンロードしていること
ローカルマシンへアクセス権の付与
ローカル環境からメッセージをパブリッシュするためには
まずはIAMの「サービスアカウント」画面にて「CREATE SERVICE ACCOUNT」を押します
新しいサービスアカウント名を決めて、「Pub/Sub パブリッシャー」ロールを付与します
これによってこのサービスアカウントがpubsubへメッセージをパブリッシュすることが出来ます
あとはこのサービスアカウント(私の場合はPubsubPublisher)からjsonキーを発行します
このjsonキーをローカルにダウンロードしておきます。
Go言語での実装
まずはpubsubモジュールインストールしておきましょう
go get cloud.google.com/go/pubsub
早速公式サイトを見ていきましょう
下記のような記述になっています(2023/6/11時点)
import (
"context"
"fmt"
"io"
"strconv"
"sync"
"sync/atomic"
"cloud.google.com/go/pubsub"
)
func publishThatScales(w io.Writer, projectID, topicID string, n int) error {
// projectID := "my-project-id"
// topicID := "my-topic"
ctx := context.Background()
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %w", err)
}
defer client.Close()
var wg sync.WaitGroup
var totalErrors uint64
t := client.Topic(topicID)
for i := 0; i < n; i++ {
result := t.Publish(ctx, &pubsub.Message{
Data: []byte("Message " + strconv.Itoa(i)),
})
wg.Add(1)
go func(i int, res *pubsub.PublishResult) {
defer wg.Done()
// The Get method blocks until a server-generated ID or
// an error is returned for the published message.
id, err := res.Get(ctx)
if err != nil {
// Error handling code can be added here.
fmt.Fprintf(w, "Failed to publish: %v", err)
atomic.AddUint64(&totalErrors, 1)
return
}
fmt.Fprintf(w, "Published message %d; msg ID: %v\n", i, id)
}(i, result)
}
wg.Wait()
if totalErrors > 0 {
return fmt.Errorf("%d of %d messages did not publish successfully", totalErrors, n)
}
return nil
}
実はこの関数を使って送信しようとすると、エラーが発生して送信できません
GCPへの認証が出来ていないからです
あとメッセージ総数からエラーメッセージ数を算出するように丁寧に書かれています
が、Hello worldには少し過剰な気がします
GCP認証を追加し全文をシンプルに書き直します
package main
import (
"context"
"fmt"
"sync"
"time"
"cloud.google.com/go/pubsub"
"google.golang.org/api/option"
)
const (
projectId = "********************" // GCPのプロジェクトID
topicId = "test_pubsub" // pubsubのトピックID
pubsubKey = "keys/PubsubPublishKey.json" // GCPのjsonキー保存先。先ほどGCPからダウンロードしたjsonキーの相対パスを記入
bufferSize = 1000
n = 10 //メッセージ送信数
)
func publish_topic(
ctx context.Context, //バックグランドで実行するためのcontext
pubsubTopic *pubsub.Topic, //pubsubトピックのポインタ
msg string, // pubsubへ送信するメッセージ内容
) {
// 文字列をbytes変換する
msgB := []byte(msg)
// Pubsubにパブリッシュする
msgP := &pubsub.Message{Data: msgB}
_, err := pubsubTopic.Publish(ctx, msgP).Get(ctx)
if err != nil {
fmt.Println("Failed to publish response message: ", err)
//他のエラーハンドリング
return
}
fmt.Printf("パブリッシュされました%s", msg)
}
func main() {
// pubsubでクライアント初期化
ctx := context.Background()
pubsub_client, err := pubsub.NewClient(ctx, projectId, option.WithCredentialsFile(pubsubKey))
if err != nil {
fmt.Printf("Failed to make pubsub client:%s ", err)
return
}
defer pubsub_client.Close()
pubsubTopic := pubsub_client.Topic(topicId)
//10Kbytes毎に送信するバッチ設定
//この処理を行っておくと高速にメッセージ送信する場合でもサイズ毎にバルク送信してくれます
pubsubTopic.PublishSettings.ByteThreshold = bufferSize
var wg sync.WaitGroup
// メッセージを作って送信する部分を並行処理で記述
func() {
for i := 0; i < n+1; i++ {
wg.Add(1)
msg := fmt.Sprintf(`
"time": %s,
"message": "Hello World %v"
`, time.Now().Format("2006-01-02 15:04:05.123456"), i)
go func() {
defer wg.Done()
publish_topic(ctx, pubsubTopic, msg)
}()
}
}()
wg.Wait()
}
こんな感じに書くとメッセージが送信されることがローカルで確認できます
パブリッシュされました
"time": 2023-06-12 01:02:16.61212166,
"message": "Hello World 1"
パブリッシュされました
"time": 2023-06-12 01:02:16.61212166,
"message": "Hello World 5"
パブリッシュされました
"time": 2023-06-12 01:02:16.61212166,
"message": "Hello World 2"
パブリッシュされました
"time": 2023-06-12 01:02:16.61212166,
"message": "Hello World 4"
パブリッシュされました
"time": 2023-06-12 01:02:16.61212166,
"message": "Hello World 7"
パブリッシュされました
"time": 2023-06-12 01:02:16.61212166,
"message": "Hello World 0"
パブリッシュされました
"time": 2023-06-12 01:02:16.61212166,
"message": "Hello World 6"
パブリッシュされました
"time": 2023-06-12 01:02:16.61212166,
"message": "Hello World 10"
パブリッシュされました
"time": 2023-06-12 01:02:16.61212166,
"message": "Hello World 3"
パブリッシュされました
"time": 2023-06-12 01:02:16.61212166,
"message": "Hello World 9"
パブリッシュされました
"time": 2023-06-12 01:02:16.61212166,
"message": "Hello World 8"
BigQueryへの格納状態の確認
BigQueryにはdata列にbyte形式でデータが格納されていました
これを文字列表示にしたい場合には下記で確認できます
SELECT data, CAST(data AS STRING) AS data_string
FROM `******.test_dataset.pubsubTest` LIMIT 1000
*****はGCPのプロジェクトIDです
これを実行するとしっかり送信した文字列が格納されていることを確認できました
まとめ
最後にいくつか躓きポイントをまとめておきます
- GCP権限関係
- pubsubサービスアカウントにBigQueryへの書き込み権限を付与する
- ローカルマシンにpubsubパブリッシャー権限を持たせる(jsonキーなど)
- BigQuery側の設定
- data列という列名にする必要がある
- data列はbyte型
- クライアント側(Go言語)
- (jsonキーでの認証の場合)client作成時にoption.WithCredentialsFile()を付与する
- メッセージはbytes形式にエンコードしてpubsubメッセージ初期化関数に渡す
以上、参考になれば幸いです
Discussion