🫧

GCP Cloud Pub/Subの受信フロー制御を検証する

2024/05/23に公開

はじめに

GCPのPub/Subは送信(Publish)・受信(Subscribe)共にフロー制御の機能があり、これにより何らかの障害や予期しない振る舞いによる高負荷を回避できます。最近Subscribe側の制御を検討する機会があり細かい振る舞い・オプション機能を調べたので備忘として残したいと思います。

動作検証

環境構築

以下の環境変数をセットして下さい。
手動でセットしてもいいですがdirenvを使うのがオススメです。

.envrc
export PUBSUB_PROJECT_ID=<your project id>
export PUBSUB_TOPIC_ID=sample-topic
export PUBSUB_SUBSCRIPTION_ID=sample-subscription

以下の内容でMakefileを用意して下さい。
その後 make all を叩くとTopicとSubscriptionが作成されます。

Makefile
all:create-topic create-subscription

create-topic:
	gcloud --project $$PUBSUB_PROJECT_ID pubsub topics create $$PUBSUB_TOPIC_ID

create-subscription:
	gcloud --project $$PUBSUB_PROJECT_ID pubsub subscriptions create $$PUBSUB_SUBSCRIPTION_ID \
		--topic $$PUBSUB_TOPIC_ID \
		--topic-project $$PUBSUB_PROJECT_ID \
		--ack-deadline 60 \
		--message-retention-duration 7d \
		--max-retry-delay 3s \
		--min-retry-delay 3s

publish-msg:
	gcloud --project $$PUBSUB_PROJECT_ID pubsub topics publish $$PUBSUB_TOPIC_ID --message "Hello, Pub/Sub!"

publish-msg-10:
	for i in `seq 1 10`; do gcloud --project $$PUBSUB_PROJECT_ID pubsub topics publish $$PUBSUB_TOPIC_ID --message "Hello, Pub/Sub! ($$i)"; done

clean:
	-gcloud --project $$PUBSUB_PROJECT_ID pubsub subscriptions delete $$PUBSUB_SUBSCRIPTION_ID
	-gcloud --project $$PUBSUB_PROJECT_ID pubsub topics delete $$PUBSUB_TOPIC_ID

検証コード

Go 1.22で動作確認していますが多分最近のバージョンなら大体大丈夫だと思います。
適当なディレクトリで go mod init しておきましょう。

この内容を `main.go` として保存
main.go
package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"sync"
	"time"

	"cloud.google.com/go/pubsub"
)

func main() {
	if err := run(); err != nil {
		fmt.Fprintln(os.Stderr, err)
		os.Exit(1)
	}
}

func run() error {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	cli, err := pubsub.NewClient(ctx, os.Getenv("PUBSUB_PROJECT_ID"))
	if err != nil {
		return err
	}

	var wg sync.WaitGroup
	wg.Add(1)

	go func() {
		defer wg.Done()
		if err := subscribe(ctx, cli); err != nil {
			fmt.Println(err.Error())
		}
	}()

	wg.Wait()

	return nil
}

func subscribe(ctx context.Context, cli *pubsub.Client) error {
	sub := cli.Subscription(os.Getenv("PUBSUB_SUBSCRIPTION_ID"))
	log.Printf("subscribing: %s\n", sub.ID())

	for {
		select {
		case <-ctx.Done():
			log.Println(ctx.Err())
			return nil
		default:
			err := sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
				time.Sleep(1 * time.Second) // フロー制御の様子をわかりやすくするため1秒待つ
				log.Printf("%s - msg received: %s\n", sub.ID(), string(msg.Data))
				msg.Ack()
			})
			if err != nil {
				return err
			}
		}
	}
}

その後 go mod downloadgo run main.go を叩いてみてSubscribe中のログがでればOKです。

$ go run main.go
2024/04/25 01:20:01 subscribing: sample-subscription

デフォルトの動き

下準備として make publish-msg-10 でTopicにメッセージを10個放り込みます。

$ make publish-msg-10
for i in `seq 1 10`; do gcloud --project $PUBSUB_PROJECT_ID pubsub topics publish $PUBSUB_TOPIC_ID --message "Hello, Pub/Sub! ($i)"; done
messageIds:
- '11037052970312501'
messageIds:
- '11037180407981531'
messageIds:
- '11036861119038720'
messageIds:
- '11036875891334784'
messageIds:
- '11036828894285646'
messageIds:
- '11036818322383007'
messageIds:
- '10947455877609178'
messageIds:
- '11037023039362599'
messageIds:
- '11036925065451370'
messageIds:
- '9480570970935854'
$

その後デフォルト設定のままSubscribeしてみると先ほどPublishした10個のメッセージがほぼ同時に受信されるはずです。これはデフォルトでは10並行でSubscriptionからStearming Pullしつつ且つ1,000個まで or 1GiBまでメッセージを同時処理する設定となっているためです。

$ go run main.go
2024/04/25 01:28:03 subscribing: sample-subscription
2024/04/25 01:28:07 sample-subscription - msg received: Hello, Pub/Sub! (2)
2024/04/25 01:28:07 sample-subscription - msg received: Hello, Pub/Sub! (9)
2024/04/25 01:28:07 sample-subscription - msg received: Hello, Pub/Sub! (10)
2024/04/25 01:28:07 sample-subscription - msg received: Hello, Pub/Sub! (7)
2024/04/25 01:28:07 sample-subscription - msg received: Hello, Pub/Sub! (8)
2024/04/25 01:28:07 sample-subscription - msg received: Hello, Pub/Sub! (1)
2024/04/25 01:28:07 sample-subscription - msg received: Hello, Pub/Sub! (3)
2024/04/25 01:28:07 sample-subscription - msg received: Hello, Pub/Sub! (4)
2024/04/25 01:28:07 sample-subscription - msg received: Hello, Pub/Sub! (5)
2024/04/25 01:28:07 sample-subscription - msg received: Hello, Pub/Sub! (6)

デフォルト設定箇所: https://github.com/googleapis/google-cloud-go/blob/4067f4eabbba9e5e3265754339709fb273b73e4c/pubsub/subscription.go#L956

なおPub/Subはデフォルトでは配信順序が保証されないのでバラバラになっています。
別のオプションでこれを整えられるのですが、本記事の趣旨からは逸れるので後日改めて試してみようと思います。

メッセージの同時処理制限

ではメッセージの同時処理数を1つに制限してみましょう。
検証コードに以下の変更を加え再度 make publish-msg-10 を叩き準備します。

main.go
func subscribe(ctx context.Context, cli *pubsub.Client) error {
	sub := cli.Subscription(os.Getenv("PUBSUB_SUBSCRIPTION_ID"))
+	sub.ReceiveSettings.MaxOutstandingMessages = 1
	log.Printf("subscribing: %s\n", sub.ID())

同時処理数が1に制御されている且つ受信後1秒待ってからログを出力するので

  • (ほぼ)1秒毎にログが表示される

が期待結果となります。
では確認してみましょう。

$ go run main.go
2024/04/25 02:01:11 subscribing: sample-subscription
2024/04/25 02:01:14 sample-subscription - msg received: Hello, Pub/Sub! (5)
2024/04/25 02:01:15 sample-subscription - msg received: Hello, Pub/Sub! (3)
2024/04/25 02:01:16 sample-subscription - msg received: Hello, Pub/Sub! (1)
2024/04/25 02:01:17 sample-subscription - msg received: Hello, Pub/Sub! (8)
2024/04/25 02:01:18 sample-subscription - msg received: Hello, Pub/Sub! (9)
2024/04/25 02:01:19 sample-subscription - msg received: Hello, Pub/Sub! (10)
2024/04/25 02:01:20 sample-subscription - msg received: Hello, Pub/Sub! (2)
2024/04/25 02:01:21 sample-subscription - msg received: Hello, Pub/Sub! (7)
2024/04/25 02:01:22 sample-subscription - msg received: Hello, Pub/Sub! (4)
2024/04/25 02:01:23 sample-subscription - msg received: Hello, Pub/Sub! (6)

無事想定通りの動きになっていますね。
例えば MaxOutstandingMessages = 2 にすると(ほぼ)1秒毎に2行ずつログが出るはずなので、そちらも是非確認してみて下さい(長くなるのでこの記事では省略)。

ちなみにマイナス値をセットすると無制限と解釈されます。無制限設定にすると状況によってはCPUやメモリを食い潰す可能性があるので特殊な事情が無い限り何らかの値をセットしておく方が無難でしょう。

MaxOutstandingBytesについて

名前から類推できますがこちらは未処理のメッセージ容量を基準に保持するメッセージを制限します。こちらもマイナス値で無制限設定と解釈されます。

Stearming Pullの並行数制御

今度は以下の変更を加え再度実行してみましょう。

main.go
func subscribe(ctx context.Context, cli *pubsub.Client) error {
	sub := cli.Subscription(os.Getenv("PUBSUB_SUBSCRIPTION_ID"))
-	sub.ReceiveSettings.MaxOutstandingMessages = 1
+	sub.ReceiveSettings.NumGoroutines = 1
	log.Printf("subscribing: %s\n", sub.ID())

表面的な結果は「デフォルトの動き」と変わらないと思います。 NumGoroutines という名前で紛らわしいのですが、これはSubscriptionからメッセージを取得するSteaming Pull Connectionの数を設定するためのものなのでメッセージ処理とは別の話です。

1 Streaming Pullあたり最大10MB/sのスループットを得られ、デフォルトで10がセットされるので100MB/sとなりますが、これはPub/SubクライアントのgRPC Connectionもセットで検討する必要があります。1 gRPC Connectionに付き100 Streaming Pullまでサポートされており、これのデフォルト設定が4を上限とするCPUコア数です。

つまり設定を変更せず十分なリソースがある場合は 4 gRPC Connections -> 400 Streaming Pulls -> 4GB/s までサポートされます。

main.go
+	opts := []option.ClientOption{
+		option.WithGRPCConnectionPool(6),
+	}
+	cli, err := pubsub.NewClient(ctx, os.Getenv("PUBSUB_PROJECT_ID"), opts...)
-	cli, err := pubsub.NewClient(ctx, os.Getenv("PUBSUB_PROJECT_ID"))

この関連する2つの値は特段バリデーションされておらず option.WithGRPCConnectionPool(1)NumGoroutines = 2000 のような明らかにサポート範囲外の設定を入れても特にエラーは発生せず10件のメッセージを流すテストも普通に動きました。実際の高負荷環境でないと問題は起きないと思われるので、将来の地雷を埋め込まないよう注意しましょう。

考察

MaxOutstandingMessages|Bytes

この2つのオプションを使うことにより大量のメッセージがPublishされた場合でもリソースがサチらないよう制御できます。オートスケールと組み合わせれば増強までの時間稼ぎ、処理能力を固定する場合は完了までの時間と引き換えに平準化という振る舞いになります。

また1メッセージあたりの処理時間が長い割にCPU負荷が低い場合(外部コンポーネントのレスポンス待ちが長い、等)では設定値を通常より大きめに増やすことでリソース効率を高められる可能性があります。

NumGoroutines, WithGRPCConnectionPool

MaxOutstandingMessages|Bytesを設定した上でスループットのボトルネックになっていそうな場合は拡張を検討する必要があるでしょう。但しドキュメントによるとNumGoroutinesを必要以上に大きくするとI/Oバウンドのボトルネックとなることが警告されているので、あまり大胆な値にしない方が良さそうです。

まとめ

安全を確保しつつ最大限リソース利用効率を高めるためには以下の観点で設定値を検討する。

  • CPUがサチらない範囲で可能な限りMaxOutstandingMessagesを大きく
  • OOMしない範囲で可能な限りMaxOutstandingBytesを大きく
  • I/Oバウンドの問題を引き起こさない程度にNumGoroutinesを大きく
  • NumGoroutinesをカバーできるだけのWithGRPCConnectionPoolをセット

更にバーストをどう扱うかについて

  • コスト増を許容しオートスケールで動的なスループット調整
    or
  • 処理待ち時間の増加を許容してスループット固定

を判断する。

参考文献

Discussion