📝

Go言語でGCPのpubsub→BigQueryへデータ送信する(トピックスキーマなし)

2023/06/12に公開

本記事の内容

今回実装する構成↓

ローカル環境からGo言語を使って

  • pubsubのメッセージをパブリッシュ
  • pubsubサブスクリプションを使ってBigQueryへデータ送信
    を行います

今回はトピックスキーマを用いません
※トピックスキーマを用いない場合BigQueryのテーブルには
「data」という列名、BYTES型でデータが格納されます
 トピックスキーマを使ったpubsub→BigQueryへのデータ格納はいずれ記事にしたいと思います

BigQueryテーブルの準備

{yoour_project_id}.test_dataset.pubsubTestというテーブルを作成します
スキーマ構成は列名:data、BYTES型 の1列のみです


テーブル作成画面

これで完了です

pubsubの準備

pubsubなどのメッセージングサービスは
・トピック
・サブスクリプション
というものを用意します

メッセージング配信においては
トピック:受信側
サブスクリプション:送信側
という認識でOKです

詳しくは他の記事もしくは公式ドキュメントを確認ください

トピックの作成

「test_pubsub」というトピックを作成します

サブスクリプションの作成

サブスクリプション側で「BigQueryへの書き込み」という項目があるのでそれを選択します

サブスクリプション作成画面

そうすると

サービス アカウント ********@gcp-sa-pubsub.iam.gserviceaccount.com に、次の BigQuery テーブルへの書き込みに必要な権限がありません: bigquery.tables.get、bigquery.tables.updateData。

というメッセージが表示されます。
後で設定が必要になりますが一旦保存を押しておきましょう

pubsubのサービスアカウントにBigQueryへ書き込み権限を付与

IAM画面に移動します。
アクセス権を付与する画面にてさきほどの
********@gcp-sa-pubsub.iam.gserviceaccount.com
にBigQueryの権限を付与します

今回は少し過剰気味ですが、「BigQueryデータ編集者」を与えておきます
pubsubサービスアカウントにBigQueryへのアクセス権を付与

※本番環境なら上記bigquery.tables.get、bigquery.tables.updateData権限のみ付与するロールを作成しましょう

クライアント側のメッセージ送信をGoで実装する

必須なことは下記2つ

  • アクセス権を持っていること
  • GoのAPIをダウンロードしていること

ローカルマシンへアクセス権の付与

ローカル環境からメッセージをパブリッシュするためには

まずはIAMの「サービスアカウント」画面にて「CREATE SERVICE ACCOUNT」を押します
サービスアカウントトップ画面

新しいサービスアカウント名を決めて、「Pub/Sub パブリッシャー」ロールを付与します

サービスアカウントロール選択画面

これによってこのサービスアカウントがpubsubへメッセージをパブリッシュすることが出来ます

あとはこのサービスアカウント(私の場合はPubsubPublisher)からjsonキーを発行します

jsonキー発行

このjsonキーをローカルにダウンロードしておきます。

Go言語での実装

まずはpubsubモジュールインストールしておきましょう

go get cloud.google.com/go/pubsub

早速公式サイトを見ていきましょう

下記のような記述になっています(2023/6/11時点)

import (
       "context"
       "fmt"
       "io"
       "strconv"
       "sync"
       "sync/atomic"

       "cloud.google.com/go/pubsub"
)

func publishThatScales(w io.Writer, projectID, topicID string, n int) error {
       // projectID := "my-project-id"
       // topicID := "my-topic"
       ctx := context.Background()
       client, err := pubsub.NewClient(ctx, projectID)
       if err != nil {
               return fmt.Errorf("pubsub.NewClient: %w", err)
       }
       defer client.Close()

       var wg sync.WaitGroup
       var totalErrors uint64
       t := client.Topic(topicID)

       for i := 0; i < n; i++ {
               result := t.Publish(ctx, &pubsub.Message{
                       Data: []byte("Message " + strconv.Itoa(i)),
               })

               wg.Add(1)
               go func(i int, res *pubsub.PublishResult) {
                       defer wg.Done()
                       // The Get method blocks until a server-generated ID or
                       // an error is returned for the published message.
                       id, err := res.Get(ctx)
                       if err != nil {
                               // Error handling code can be added here.
                               fmt.Fprintf(w, "Failed to publish: %v", err)
                               atomic.AddUint64(&totalErrors, 1)
                               return
                       }
                       fmt.Fprintf(w, "Published message %d; msg ID: %v\n", i, id)
               }(i, result)
       }

       wg.Wait()

       if totalErrors > 0 {
               return fmt.Errorf("%d of %d messages did not publish successfully", totalErrors, n)
       }
       return nil
}

実はこの関数を使って送信しようとすると、エラーが発生して送信できません
GCPへの認証が出来ていないからです
あとメッセージ総数からエラーメッセージ数を算出するように丁寧に書かれています
が、Hello worldには少し過剰な気がします

GCP認証を追加し全文をシンプルに書き直します

package main

import (
	"context"
	"fmt"
	"sync"
	"time"

	"cloud.google.com/go/pubsub"
	"google.golang.org/api/option"
)

const (
	projectId  = "********************"      // GCPのプロジェクトID
	topicId    = "test_pubsub"                // pubsubのトピックID
	pubsubKey  = "keys/PubsubPublishKey.json" // GCPのjsonキー保存先。先ほどGCPからダウンロードしたjsonキーの相対パスを記入
	bufferSize = 1000
	n          = 10 //メッセージ送信数
)

func publish_topic(
	ctx context.Context, //バックグランドで実行するためのcontext
	pubsubTopic *pubsub.Topic, //pubsubトピックのポインタ
	msg string, // pubsubへ送信するメッセージ内容
) {
	// 文字列をbytes変換する
	msgB := []byte(msg)
	// Pubsubにパブリッシュする
	msgP := &pubsub.Message{Data: msgB}
	_, err := pubsubTopic.Publish(ctx, msgP).Get(ctx)
	if err != nil {
		fmt.Println("Failed to publish response message: ", err)
		//他のエラーハンドリング
		return
	}
	fmt.Printf("パブリッシュされました%s", msg)
}

func main() {
	// pubsubでクライアント初期化
	ctx := context.Background()
	pubsub_client, err := pubsub.NewClient(ctx, projectId, option.WithCredentialsFile(pubsubKey))
	if err != nil {
		fmt.Printf("Failed to make pubsub client:%s ", err)
		return
	}
	defer pubsub_client.Close()

	pubsubTopic := pubsub_client.Topic(topicId)
	//10Kbytes毎に送信するバッチ設定
	//この処理を行っておくと高速にメッセージ送信する場合でもサイズ毎にバルク送信してくれます
	pubsubTopic.PublishSettings.ByteThreshold = bufferSize

	var wg sync.WaitGroup
	// メッセージを作って送信する部分を並行処理で記述
	func() {
		for i := 0; i < n+1; i++ {
			wg.Add(1)
			msg := fmt.Sprintf(`
			"time": %s,
			"message": "Hello World %v"
            `, time.Now().Format("2006-01-02 15:04:05.123456"), i)

			go func() {
				defer wg.Done()
				publish_topic(ctx, pubsubTopic, msg)
			}()
		}
	}()
	wg.Wait()
}

こんな感じに書くとメッセージが送信されることがローカルで確認できます

            パブリッシュされました
                        "time": 2023-06-12 01:02:16.61212166,
                        "message": "Hello World 1"
            パブリッシュされました
                        "time": 2023-06-12 01:02:16.61212166,
                        "message": "Hello World 5"
            パブリッシュされました
                        "time": 2023-06-12 01:02:16.61212166,
                        "message": "Hello World 2"
            パブリッシュされました
                        "time": 2023-06-12 01:02:16.61212166,
                        "message": "Hello World 4"
            パブリッシュされました
                        "time": 2023-06-12 01:02:16.61212166,
                        "message": "Hello World 7"
            パブリッシュされました
                        "time": 2023-06-12 01:02:16.61212166,
                        "message": "Hello World 0"
            パブリッシュされました
                        "time": 2023-06-12 01:02:16.61212166,
                        "message": "Hello World 6"
            パブリッシュされました
                        "time": 2023-06-12 01:02:16.61212166,
                        "message": "Hello World 10"
            パブリッシュされました
                        "time": 2023-06-12 01:02:16.61212166,
                        "message": "Hello World 3"
            パブリッシュされました
                        "time": 2023-06-12 01:02:16.61212166,
                        "message": "Hello World 9"
            パブリッシュされました
                        "time": 2023-06-12 01:02:16.61212166,
                        "message": "Hello World 8"

BigQueryへの格納状態の確認

BigQueryにはdata列にbyte形式でデータが格納されていました
BigQuery画面

これを文字列表示にしたい場合には下記で確認できます

SELECT data, CAST(data AS STRING) AS data_string
FROM `******.test_dataset.pubsubTest` LIMIT 1000

*****はGCPのプロジェクトIDです

これを実行するとしっかり送信した文字列が格納されていることを確認できました

まとめ

最後にいくつか躓きポイントをまとめておきます

  • GCP権限関係
    • pubsubサービスアカウントにBigQueryへの書き込み権限を付与する
    • ローカルマシンにpubsubパブリッシャー権限を持たせる(jsonキーなど)
  • BigQuery側の設定
    • data列という列名にする必要がある
    • data列はbyte型
  • クライアント側(Go言語)
    • (jsonキーでの認証の場合)client作成時にoption.WithCredentialsFile()を付与する
    • メッセージはbytes形式にエンコードしてpubsubメッセージ初期化関数に渡す

以上、参考になれば幸いです

GitHubで編集を提案

Discussion