📫

Pub/Subエミュレーターでスキーマの作成を試す with Go

に公開

はじめに

こんにちは、サーバーサイドエンジニアの工藤です。

先日、業務の中で Google Cloud Pub/Sub のスキーマ作成や更新をCIに組み込むようにしました。
こういったクラウド環境を利用する作業の中では、初期はクラウド環境へのデプロイやIAM権限の設定といった準備に手間を感じることがあります。
迅速に動作確認を進めたい状況では、これらの準備が一時的なボトルネックになることも少なくありません。

そのような場合に役立つのが、ローカル環境でPub/Subの動作をシミュレートできる「Pub/Subエミュレーター」です。

この記事では、Pub/Subエミュレーターを利用し、Go言語を用いてスキーマの作成と更新を行う具体的な手順と、その過程で得られた知見を紹介します。

Pub/Sub スキーマとは

Pub/Subスキーマは、トピックに送受信されるメッセージの構造をAvroやProtocol Buffersで定義する仕組みです。スキーマを使うことで、Pub/Subはメッセージを自動で検証し、予期しない構造のデータを排除できます。
https://cloud.google.com/pubsub/docs/schemas?hl=ja

今回はAvroスキーマを定義、更新する流れを紹介します。
スキーマが紐づけられたトピックへのPubish、Subscribeについては確認しないのでご了承ください!

Pub/Sub エミュレーターとは

Google Cloud が提供しているローカル環境で Pub/Sub の動きを再現してくれる便利なツールです。これを使えば、ローカルでの開発やテストを素早く行うことができます。

ただし注意点が2点あります。

  1. 全ての機能をサポートしているわけではない
  2. gcloud pubsub コマンドでは操作できない

2番については、結構不便を感じますね😢

セットアップ方法

エミュレーターのセットアップ方法を公式ドキュメントでは2つ紹介しています。

この記事ではローカルで起動する方法で書いていますが、CI/CDパイプラインに組み込んだり、テスト環境で使ったりするなら、docker-compose でアプリケーションと一緒にコンテナとして動かすのも良いのではないかと思います。

実際に動かす

上述の通り、今回はローカルで起動したPub/Subエミュレーターに対して、Avro形式のスキーマを作成/更新する方法を紹介します。
(ちなみに記事執筆時点では Protocol Buffer形式のスキーマはサポートされていません)

今回の実装は全て以下のリポジトリにプッシュしています。
https://github.com/hrkd1316/pubsub-emulator-example

開発環境

Go 1.24
cloud.google.com/go/pubsub v1.49.0

エミュレーターの起動

なにはともあれ、まずはインストール&起動してみましょう。

gcloud components install pubsub-emulator
gcloud components update

gcloud beta emulators pubsub start --project=example-project

以下のようなメッセージと共に起動されます。

Executing: /opt/homebrew/share/google-cloud-sdk/platform/pubsub-emulator/bin/cloud-pubsub-emulator --host=localhost --port=8085
[pubsub] This is the Google Pub/Sub fake.
[pubsub] Implementation may be incomplete or differ from the real system.
[pubsub] 5月 05, 2025 5:07:55 午後 com.google.cloud.pubsub.testing.v1.Main main
[pubsub] 情報: IAM integration is disabled. IAM policy methods and ACL checks are not supported
[pubsub] SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
[pubsub] SLF4J: Defaulting to no-operation (NOP) logger implementation
[pubsub] SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
[pubsub] 5月 05, 2025 5:07:55 午後 com.google.cloud.pubsub.testing.v1.Main main
[pubsub] 情報: Server started, listening on 8085

環境変数の設定

クライアントを作る際に、接続先のエンドポイントを上書きするために使うだけなので、正直環境変数にする必要はないです。
しかし、クライアントの中には自動でこのセットされた環境変数を読み取ってくれるものもあるので、とりあえず叩いておいて良いんじゃないかと思います。

$(gcloud beta emulators pubsub env-init)

ローカル用のクライアントの作成

ここからは上記のエミュレーターに対する操作を行うGoのコードを書いていきます。
まずは、スキーマの操作を行うための SchemaClient を作成します。

ポイントとしては以下の2点です。

  • WithoutAuthentication() を使用して認証のスキップが必要

    • エミュレーターは認証プロセスを持っていないため
  • insecure.NewCredentials() を使用して、insecureなgRPC接続の許可が必要

    • エミュレーターは暗号化なしの接続を期待し、デフォルトの SchemaClient は暗号化接続を試みるため
    • ちなみに pubsub.NewClient() (topicなどを取り扱うクライアント)の方は、環境変数がセットされていると自動でinsecureな通信でクライアントを作るようになってます 実装
	// エミュレータ接続時は任意のプロジェクトIDを指定できる
	projectID := "example-project"

	emulatorHost := os.Getenv("PUBSUB_EMULATOR_HOST")
	if emulatorHost == "" {
		log.Fatal("PUBSUB_EMULATOR_HOST environment variable not set.")
	}

	// エミュレータ接続用のクライアントオプションを作成する
	opts := []option.ClientOption{
		option.WithEndpoint(emulatorHost),
		option.WithoutAuthentication(), // 認証を無効化
		option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())),
	}

	// Pub/Subスキーマクライアントを作成する
	client, err := pubsub.NewSchemaClient(ctx, projectID, opts...)
	if err != nil {
		log.Fatalf("Failed to create Pub/Sub schema client: %v", err)
	}
	defer client.Close()

スキーマの作成

こちらについては特に解説もなく、リファレンス通りに実装するのみです。

func createSchema(ctx context.Context, client *pubsub.SchemaClient, schemaID string) (*pubsub.SchemaConfig, error) {
	config := pubsub.SchemaConfig{
		Type: pubsub.SchemaAvro,
		Definition: `{
			"type": "record",
			"name": "MyRecord",
			"fields": [
				{
					"name": "id",
					"type": "string"
				},
				{
					"name": "name",
					"type": "string"
				}
			]
		}`,
	}

	s, err := client.CreateSchema(ctx, schemaID, config)
	if err != nil {
		return nil, err
	}
	return s, nil
}

実行結果

2025/05/05 18:38:10 Successfully connected to Pub/Sub emulator (SchemaClient) at localhost:8085 with project ID example-project
2025/05/05 18:38:11 Schema created: &{projects/example-project/schemas/example 2 {
			"type": "record",
			"name": "MyRecord",
			"fields": [
				{
					"name": "id",
					"type": "string"
				},
				{
					"name": "name",
					"type": "string"
				}
			]
		} 70976f45 1970-01-08 18:14:48.247518333 +0000 UTC}

スキーマのリビジョンの更新

更新については注意すべきポイントが1つあります。

  • 作成には Name フィールドは不要だが、更新には必要
    • 作成と更新のpubsub.SchemaConfig のフィールドを比べてみるとわかるのですが、更新の際には Name フィールドに、projects/ から始まるパスが必要です
func updateSchema(ctx context.Context, client *pubsub.SchemaClient, projcetID, schemaID string) (*pubsub.SchemaConfig, error) {
	config := pubsub.SchemaConfig{
		Name: fmt.Sprintf("projects/%s/schemas/%s", projcetID, schemaID),
		Type: pubsub.SchemaAvro,
		Definition: `{
			"type": "record",
			"name": "MyRecord",
			"fields": [
				{
					"name": "address",
					"type": "string"
				},
				{
					"name": "email",
					"type": "string"
				}
			]
		}`,
	}

	s, err := client.CommitSchema(ctx, schemaID, config)
	if err != nil {
		return nil, err
	}
	return s, nil
}

実行結果

2025/05/05 18:39:06 Successfully connected to Pub/Sub emulator (SchemaClient) at localhost:8085 with project ID example-project
2025/05/05 18:39:06 Schema updated: &{projects/example-project/schemas/example 2 {
			"type": "record",
			"name": "MyRecord",
			"fields": [
				{
					"name": "id",
					"type": "int"
				},
				{
					"name": "name",
					"type": "string"
				},
				{
					"name": "age",
					"type": "int"
				}
			]
		} db3b2f55 1970-01-08 18:15:43.500200833 +0000 UTC}

更新時の互換性チェックについて

本物のPub/Subでは互換性のチェックをやってくれるのですが、エミュレーターにはその機能はない(らしい)です。

  • 公式の記述を見つけられなかったので(らしい)とつけてます
  • 以下の実行結果のようにまるっきり異なるスキーマを渡してもOKだったので、そのように推測しました。

互換性についてはクラウド環境にデプロイするか、ローカルで自前で確認する必要があるようです。

2025/05/05 18:41:13 Schema updated: &{projects/example-project/schemas/example 2 {
			"type": "record",
			"name": "MyRecord",
			"fields": [
				{
					"name": "address",
					"type": "string"
				},
				{
					"name": "email",
					"type": "string"
				}
			]
		} c1202438 1970-01-08 18:17:50.447724458 +0000 UTC}

まとめ

Pub/Subエミュレーターを使用することで、ローカル環境でPub/Subの機能を試すことができます。本記事では特にスキーマ関連の機能に焦点を当てました。

機能が完全に再現されているわけではない点、機能がないことを明言されていないものもある点からくる手探り感はすこし不便に感じました。
一方で、認証のような準備部分をスキップして、機能についてのコーディングに集中できるのはありがたい点でした。

これからも積極的に活用していきたいと思います。

参考サイト

We are hiring!

ブルーモでは次世代の金融システムを作る仲間を募集しています!
エンジニア、デザイナー、PdM、事業開発などさまざまポジションで募集をしているので興味がある方は下記の採用ページを覗いてみてください!

https://careers.bloomo.co.jp/

ブルーモ証券 プロダクトチームブログ

Discussion