🔭

OpenTelemetry 小ネタ - Pub/Sub 連携 & SpanProcessor カスタム-

に公開

2025 年 5 月 14 日と 16 日に行われた下記のイベントにて登壇させていただきました。

両イベントともオブザーバビリティがテーマだったため、直近で触れた OpenTelemetry の小ネタについて話をしました。

オブザーバビリティ分科会 Meetup では、非同期処理で Google Cloud の Pub/Sub を利用した際の OpenTelemetry オプションについてです。

https://speakerdeck.com/phaya72/sub

3-shake SRE Tech Talk では、OpenTelemetry の SpanProcessor を自身でカスタマイズできることについてです。

https://speakerdeck.com/phaya72/opentelemetry-no-spanprocessor-wo-lets-kasutamaizu

本記事では、これらネタについてコードに関わるところについて深掘りや自分なりの解説をまとめていこうと思います。背景や課題感は、登壇資料内にありますので興味があればぜひお読みください。

Overview

  • OpenTelemetry と Pub/Sub の連携では、実装する際に利用する Pub/Sub Client の enable_open_telemetry_tracing を有効化することで Pub/Sub にコンテキストが自動で注入されて分散トレーシングが可能となります
  • OpenTelemetry の SpanProcessor カスタムでは、Trace Provider をセットアップする際に必要な SpanProcessor クラスを継承したクラスを用意してスパン生成開始と終了のメソッドをオーバーライドすることで処理を追加することが可能となります

OpenTelemetry × Pub/Sub

簡単にどのような流れで Pub/Sub を介した非同期処理で分散トレーシングが可能となるかを説明します。当然ですが、パブリッシャーでもサブスクライバーでも OpenTelemetry で計装されている前提になります。

Client で enable_open_telemetry_tracing オプションを有効化するとパブリッシャーから Pub/Sub には下記のようなメッセージデータが送られるようになります。

ポイントは、attributes キーに W3C Trace Context に従ったコンテキスト情報が挿入されるところです。

{
  "data": {
    "message": {
      "attributes": {
        "googclient_traceparent": "00-f1e2a3629f4812ae11bb445e6131837d-9f6b28b7e461e9e6-01"
      },
      "data": "eyJ0cmFpbmluZ0p...",
      "messageId": "14855342720724240",
      "publishTime": "2025-05-16T07:43:03.807Z"
    },
    "subscription": "projects/xxx/subscriptions/yyy"
  },
  ...
}

サブスクライバーでも Client でオプションを有効化すると、上記のメッセージからコンテキスト情報を受け取ってコンテキストが伝播することで分散トレーシングが可能となります。

これらの挙動がどのように実装されているかをコードを追って確認します。

パブリッシャーの Client セットアップ

PublisherClient をインスタンス化する際に PublisherOptions を渡すことができ、PublisherOptions には enable_open_telemetry_tracing を設定することができます。

publisher = PublisherClient(
    publisher_options=PublisherOptions(
        enable_open_telemetry_tracing=True,
    ),
)

Pub/Sub ライブラリでは下記のようにオプションの値は self._open_telemetry_enabled という変数に格納されます。

https://github.com/googleapis/python-pubsub/blob/main/google/cloud/pubsub_v1/publisher/client.py#L164-L166

この値によってパブリッシュする際にスパンの生成やメッセージへのコンテキスト情報の挿入が制御されます。

パブリッシャーの Publish でのトレーシング

enable_open_telemetry_tracing が有効化されていると、pubslish メソッドstart_create_span メソッドstart_publisher_flow_control_span メソッド などが実行されます。

https://github.com/googleapis/python-pubsub/blob/main/google/cloud/pubsub_v1/publisher/client.py#L399-L422

名前的に start_create_span メソッドが重要そうなので、深掘っていきます。

パブリッシャーでのスパン生成

get_tracer メソッドでトレーサーを取得して、スパンを開始しています。ここでアプリケーションにセットアップしたトレーサーとリンクさせていますね。また、スパンに対して attributes 属性としてさまざまな情報を追加しているのがわかります。

イベントが追加された後に TraceContextTextMapPropagator クラスinject メソッドによって Pub/Sub にパブリッシュされるメッセージへのコンテキスト伝播が行われていそうです。

https://github.com/googleapis/python-pubsub/blob/main/google/cloud/pubsub_v1/open_telemetry/publish_message_wrapper.py#L61-L91

オブザーバビリティツールでは実際にこのように見えました。

メッセージへのコンテキスト伝播①

inject メソッドでは、現在のスパンからコンテキスト情報を取得してメッセージに注入される文字列を生成しています。

引数として渡された setter オブジェクトの set メソッド によって生成された文字列を処理しているようです。

https://github.com/open-telemetry/opentelemetry-python/blob/main/opentelemetry-api/src/opentelemetry/trace/propagation/tracecontext.py#L25-L109

これだけでは何をやっているかわからないので TraceContextTextMapPropagator クラスが継承している TextMapPropagator クラスを見てみます。

メッセージへのコンテキスト伝播②

抽象クラスである TextMapPropagator クラスは、下記のような説明がされています。

このクラスは、HTTP リクエストのヘッダーにコンテキストを抽出および注入することを可能にするインターフェースを提供します。HTTP フレームワークやクライアントは、ヘッダーを含むオブジェクト、そして値の抽出と注入のための getter 関数と setter 関数をそれぞれ提供することで、TextMapPropagator と統合することができます。

HTTP で通信する際には、このクラスがコンテキスト伝播のためのインターフェースになるようです。そのため、抽出するための extract メソッドと注入するための inject メソッドが用意されているわけですね。

https://github.com/open-telemetry/opentelemetry-python/blob/main/opentelemetry-api/src/opentelemetry/propagators/textmap.py#L124-L132

また、inject メソッドでは下記のように説明されています。

inject は、HTTP クライアントや HTTP リクエストを実行する他のオブジェクトに値を伝播させることを可能にします。実装は、キャリア(伝達媒体)に値を設定するために、Setter の set メソッドを使用すべきです。

ここではキャリアという伝達媒体に対して Setter として渡されたクラスの set メソッドを使って値を設定しろとのことですね。

https://github.com/open-telemetry/opentelemetry-python/blob/main/opentelemetry-api/src/opentelemetry/propagators/textmap.py#L162-L184

ちなみにキャリアについては、OpenTelemetry の公式ドキュメントにも記載がありました。

Carrier
A carrier is the medium used by Propagators to read values from and write values to. Each specific Propagator type defines its expected carrier type, such as a string map or a byte array.

Carriers used at Inject are expected to be mutable.
OpenTelemetry 公式ドキュメント - Propagator Types

メッセージへのコンテキスト伝播③

Setter として渡されたクラスは Pub/Sub ライブラリで定義されています。ここでパブリッシュされるメッセージの attributes キーに対して、googleclient_ というプレフィックスがついた情報を設定しています。

https://github.com/googleapis/python-pubsub/blob/main/google/cloud/pubsub_v1/open_telemetry/context_propagation.py#L22-L41

まとめ

HTTP 通信でのインターフェースとなる TextMapPropagator クラスで文字列マップようなキャリアを利用するのは、HTTP ヘッダーのようなテキストベースのキーバリュー形式でコンテキスト伝播させるためのようです。この伝播のためにコンテキスト情報を注入しているのが inject メソッドということですね。

また、今回は Pub/Sub にパブリッシュされるメッセージのメタデータattributes キー)もテキストベースのキーバリュー形式であることから、このクラスを使ってコンテキスト伝播が可能になっているようです。

Pub/Sub Client のセットアップから実際に Pub/Sub にパブリッシュされるメッセージにコンテキスト情報が注入されるところまでをしっかり追うことができました!

OpenTelemetry SpanProcessor のカスタム

スパンのフィルタリングなどは OpenTelemetry Collector で処理可能なことがすぐ思いつくのですが、今回は Collector を用意しない判断をしました。(理由は LT 資料に記載があります)

アプリケーションのことを考えても計装の方で複雑なことをやろうとすると確かに負荷が上がりそうな気はするので、Collector に任せたいところではありますが・・・。

ここではスパン名に特定のサフィックスがつくものをオブザーバビリティツールに送らないような実装をしました。

こちらでも少し OpenTelemetry-Python のコードを追ってみます。

スパン名に特定のサフィックスがつくスパンの処理

今回は TraceProvider を設定する際に渡している SpanProcessor をカスタムしています。正確には BatchSpanProcessor クラスを継承した CustomSpanProcessor クラスを作成して BatchSpanProcessor の代わりに渡しています。

trace.set_tracer_provider(TracerProvider(resource=resource))
trace.get_tracer_provider().add_span_processor(
    CustomSpanProcessor(OTLPSpanExporter())
)

class CustomSpanProcessor(BatchSpanProcessor):

    EXCLUDED_SPAN_NAMES = [r"http receive", r"http send"]

    def on_end(self, span: ReadableSpan) -> None:
        for regex in self.EXCLUDED_SPAN_NAMES:
            if re.search(regex, span.name):
                return

継承元クラスの確認:BatchSpanProcessor クラス

CustomSpanProcessor クラスが継承した BatchSpanProcessor クラスは下記のようになっていて、SpanProcessor クラス を継承しています。

説明には SpanProcessor インターフェースの具体的な実装と書かれていて、環境変数で定められた粒度でスパンをバッチ処理してエクスポーターに渡す旨が書かれています。

https://github.com/open-telemetry/opentelemetry-python/blob/main/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py#L141-L154

基底クラスの確認:SpanProcessor クラス

SpanProcessor クラスは基底クラスとなっていて、下記のように説明されています。

SDKの Span 開始メソッドおよび終了メソッドの呼び出しに対してフックを許可するインターフェースです。

スパンプロセッサーは、TracerProvider.add_span_processor を使って直接登録することができ、登録されたのと同じ順序で呼び出されます。

つまり、SDK 内部でスパンが開始されるタイミングと終了されるタイミングで独自の処理を割り込ませる仕組みを提供しているとのことです。

開始されるタイミングを on_start メソッド、終了されるタイミングを on_end メソッドを実装することで実現できそうです。

on_start メソッドでは span 型の引数を受け取ってるのに対して、on_end メソッドは読み取り専用の span 型の引数となっていますね。

https://github.com/open-telemetry/opentelemetry-python/blob/main/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py#L92-L124

SpanProcessor のカスタム:CustomSpanProcessor クラス

SpanProcessor をカスタムするには、基底クラスである SpanProcessor クラスを継承するか SpanProcessor クラスを継承している BatchSpanProcessor クラスを継承すれば良いとわかりました。

今回は BatchSpanProcessor クラスをベースにしたかったので下記のような実装としました。(再掲)

処理としては、スパン終了時に任意の文字列と合致するスパン名であれば何もせず、それ以外であれば親クラスの on_end メソッドを呼び出す形にしています。

もしかしたら、スパン生成の度に正規表現による検索を行うのスパン量によっては CPU に大きな負荷がかかるので、パフォーマンス要件が厳しい場合には除外の条件付けは再考の余地があるかもしれません。

class CustomSpanProcessor(BatchSpanProcessor):

    EXCLUDED_SPAN_NAMES = [r"http receive", r"http send"]

    def on_end(self, span: ReadableSpan) -> None:
        for regex in self.EXCLUDED_SPAN_NAMES:
            if re.search(regex, span.name):
                return
        super().on_end(span)

まとめ

アプリケーションの負荷を増加させない観点から OpenTelemetry Collector に処理を寄せたいケースもありますが、チーム状況を考慮してアプリケーションからオブザーバビリティツールへの直送という形を選択しました。

その中で意図しない大量のスパンが生成されたことに対応として、スパンの開始時と終了時に任意の処理を差し込むためのインターフェースである SpanProcessor クラスを継承した BatchSpanProcessor クラスをさらに継承させたクラスを作成しました。

特定のサフィックスがつくスパン名のスパンの除外を on_end メソッドにて実装することで実現しました。

さいごに

ソフトウェアエンジニアとして、オブザーバビリティの重要性に気づきプロダクトに導入することで自然と OpenTelemetry に触れる機会も増えました。

まだまだにわかではありますが、実践の中での Tips を LT やブログで共有していけたらと思います。また、Tips に加えてなぜそういったことが可能になるかもコードを追って理解を深められればと思います。

お読みいただき、ありがとうございました!

https://twitter.com/pHaya72

Discussion