📈

入門!Python × OpenTelemetry × Google Cloud Observability

2024/05/14に公開

ずっと憧れていた OpenTelemetry にやっと入門できました。Google Cloud Next '24 でリブランドが発表された Google Cloud Observability でどのようにテレメトリを連携できるかを中心にまとめていきたいと思います。

Overview

  • Python による OpenTelemetry と Google Cloud Observability の使い方についてフォーカスした記事となっています。OpenTelemtry がどういったものなのかは多くの記事で紹介されているためここでは割愛しています。(参考にさせていただいた記事をご参照ください。)
  • 今回はどういったコードが計装に必要なのかを知りたかったので手動計装しています。トレース・ログ・メトリクスとそれぞれについて、基本的な使い方に関するコードと可視化結果を載せています。
  • 実際に手を動かしながら把握したい方向けに一部計装済みのサンプルアプリケーションを用意しています。Cloud Shell で簡単に動かせるようにしているため、興味ある方はこちらもご参照ください。

トレース - Cloud Trace

トレースに関する最低限の計装します。

instrumentaion.py
instrumentation.py
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
from opentelemetry.sdk.resources import Resource
from opentelemetry.semconv.resource import ResourceAttributes

# トレース関連
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
    BatchSpanProcessor,
    SimpleSpanProcessor
)
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter

def instrument(app):
    # Semantic Convestions を指定
    resource = Resource.create({
        ResourceAttributes.SERVICE_NAME: "book-service",
        ResourceAttributes.SERVICE_INSTANCE_ID: "book-service",
    })

    # Trace の設定
    trace.set_tracer_provider(TracerProvider(resource=resource))
    trace.get_tracer_provider().add_span_processor(
        SimpleSpanProcessor(OTLPSpanExporter())
    )

    # FastAPI の計装
    FastAPIInstrumentor.instrument_app(app)

また、Otel Collector の設定もトレースのみ下記のように設定します。

otel-collector.yml
otel-collector.yml
receivers:
  otlp:
    protocols:
      grpc:

processors:
  batch: {}
  resourcedetection:
    detectors: [env, gcp]
    timeout: 40s
    override: false

exporters:
  logging:
  googlecloud:
    project: "[project_id]"

service:
  pipelines:
    traces:
      receivers: [otlp]
      processors: [batch, resourcedetection]
      exporters: [logging, googlecloud]

さっそく Swagger で Get Books を叩いてみると、下図のようなトレース情報が Cloud Trace によって可視化できました。(今回 FastAPI でサンプルアプリを実装していて、コード変更後の動作確認はデフォルトで提供されている Swagger を使っています。)

instrumentaion.pyFastAPIInstrumentor.instrument_app(app) の部分が FastAPI アプリケーションのリクエスト処理やルーティングなどの情報をトレースする計装[2] となります。トレース情報を思ったより簡単に可視化することができたのは嬉しかったものの、ルートスパンが表示されたのみで細かいスパンは表示されませんでした。

イメージではもう少し細かいもの表示されるかと思ったのですが、そうではなかったので計装を進めていきたいと思います!

SQLAlchemy の計装の追加

DB として SQLite3 を利用しているため関連するライブラリを追加してみます。

instrumentaion.py
instrumentation.py
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.sqlite3 import SQLite3Instrumentor
...

def instrument(app):
    ...
    # FastAPI の計装
    FastAPIInstrumentor.instrument_app(app)
    # SQLite3 の計装
    SQLite3Instrumentor().instrument()

instrumentaion.pySQLite3Instrumentor().instrument() でアプリケーション内の SQLite3 の操作をトレースが可能となります。しかし、Swagger で Get Books を叩いても Cloud Trace で可視化されるトレースには変化がありませんでした。

実装を見ると DB として SQLite3 は使用していますが、実際に DB の操作している部分では SQLAlchemy という ORM を利用していました。なので、SQLite3 ではなく SQLAlchemy を使用した DB 操作をトレースする計装[3] に変更してみました。

instrumentaion.py
instrumentation.py
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
...

def instrument(app):
    ...
    # FastAPI の計装
    FastAPIInstrumentor.instrument_app(app)
    # SQLAlchemy の計装
    SQLAlchemyInstrumentor().instrument(enable_commenter=True, commenter_options={})

結果として、Swagger で Get Books を叩いてみると Connect というスパンが表示されました。SQLAlchemy の動作を追ってスパンが自動で生成されているようです。

カスタムスパンの追加

もう少しリクエストの流れを可視化したいと思ったのでカスタムスパンを追加してみます。routers.pyget_books 関数を下記のように変更します。

routers.py
routers.py
from opentelemetry import trace
...
tracer = trace.get_tracer_provider().get_tracer("book-service")
...
@router.get("/books", tags=["/books"])
async def get_books(db: AsyncSession = Depends(get_db)) -> list[Book]:
    """
    変更前
    books = await functions.get_books(db)
    return list(map(Book.model_validate, books))
    """
    with tracer.start_as_current_span(__name__) as span:
        books = await functions.get_books(db)
        return list(map(Book.model_validate, books))
...

trace.get_tracer_provider().get_tracer("book-service") で現在のトレースプロバイダーを取得して、book-service というトレーサーを取得しています。

with tracer.start_as_current_span(__name__) as span で新しいスパンを作成して現在のコンテキストが設定されます。

変更後に Swagger で Get Books を叩くと、src.routers (start_as_current_span に渡した引数の __name__ の結果) という名前でスパンが表示されるようになりました。

加えて、get_books 関数で呼び出している DB から情報を抽出している get_books 関数についてもカスタムスパンを作成してみます。functions.pyget_books 関数を下記のように変更します。

functions.py
functions.py
from opentelemetry import trace
...
tracer = trace.get_tracer_provider().get_tracer("book-service")
...
async def get_books(db: AsyncSession):
    """
    変更前
    return await db.scalars(select(Book))
    """
    with tracer.start_as_current_span(__name__) as span:
        return await db.scalars(select(Book))
...

同様に変更後に Swagger で Get Books を叩くと、src.functions という名前でもスパンが表示されるようになりました。

このようにフレームワークや DB の計装をするだけでもスパンを作成できて、Cloud Trace で可視化することができますし、必要に応じてカスタムスパンも数行で追加できることがわかりました。

イベントの追加

また、スパンにイベントという追加情報を付与することができます。イベントとはトレースデータに情報に追加することができるもので、動作をより理解するために役立つようです。

ユースケースとしては、下記がありそうです。

  • 状態の変化の記録:処理の開始/終了の追加やオブジェクトの状態変化の記録
  • デバッグ情報の記録:エラーや例外が発生した際のデバッグに役立つ情報の追加
  • パフォーマンス分析:リソースの使用状況やキャッシュに関する情報の追加

実際にイベントを追加するために、routers.pyget_books 関数を下記のように変更します。

routers.py
routers.py
from opentelemetry import trace
...
tracer = trace.get_tracer_provider().get_tracer("book-service")
...
async def get_books(db: AsyncSession):
    with tracer.start_as_current_span(__name__) as span:
        span.add_event(name="get_books")
        return await db.scalars(select(Book))
...

span.add_event(name="get_books") でスパンにイベントを追加することができます。name が必須の引数となっています。

変更後に Swagger で Get Books を叩くと、src.routers のスパンに get_books という名前のイベントが追加されました。該当のスパンをクリックして、「ログとイベント」タブに情報が表示されます。

また、span.add_event(name="get_books") にはオプション引数として timestampattributes を追加できます。functions.pyget_books 関数を下記のように変更します。

functions.py
functions.py
from opentelemetry import trace
import time
...
tracer = trace.get_tracer_provider().get_tracer("book-service")
...
async def get_books(db: AsyncSession):
    with tracer.start_as_current_span(__name__) as span:
        span.add_event(
            name="select all books",
            timestamp=int(time.time()),
            attributes={
                "sql": "select * from book"
            }
        )
        return await db.scalars(select(Book))
...

変更後に Swagger で Get Books を叩くと、src.functions のスパンに select all books という名前のイベントが追加され、タブの下部のキーバリュー表に追加した情報が表示されるようになりました。

各スパンでポイントとなりそうな情報をイベントとして仕込んでおくと色々な使い道がありそうだと感じました!「ログとイベント」タブということはログも見れそうな気がしますね

Batch と Simple の違い

instrumentation.py であえて SpanProcessorBatchSimple の 2 種類を import しています。色々な記事を見ているとどちらも使われていて違いが気になったので簡単にまとめてみます。

  • SimpleSpanProcessor
    スパンが終了するたびに、設定された SpanExporter に直接スパンが渡されようです。メリットとしては、スパンごとにエクスポーターを呼び出すため遅延は少ないようですが、頻度が多くなることでオーバーヘッドが生じやすいです。

  • BatchSpanProcessor
    スパンはキューにバッファリングされて、設定された条件(キューの最大サイズなど)に基づいてバッチで SpanExpoter に渡されるようです。エクスポーターの呼び出し回数を減らすことでオーバーヘッドを削減できます。

オーバーヘッドを減らしてパフォーマンス性を重視するなら BatchSpanProcessor 、遅延なくリアルタイム性を重視するなら SimpleSpanProcessorといったところでしょうか。

ログ - Cloud Logging

ログに関する計装をします。

instrumentaion.py
instrumentation.py
...
# ログ関連
from opentelemetry import _logs
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
from opentelemetry.sdk._logs.export import (
    BatchLogRecordProcessor,
    SimpleLogRecordProcessor
)
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter
...

def instrument(app):
    ...
    logger_provider = LoggerProvider(resource=resource)
    _logs.set_logger_provider(
        logger_provider.add_log_record_processor(
            BatchLogRecordProcessor(OTLPLogExporter(insecure=True)
        )
    ))

    handler = LoggingHandler(level=logging.NOTSET, logger_provider=logger_provider)
    logging.getLogger().addHandler(handler)
    logging.getLogger().setLevel("INFO") 
    ...

instrumentation.py と同時に Otel Collector の Yaml ファイルも変更します。Service.pipelines 項目に logstraces 同様に設定します。

otel-collector.yml
otel-collector.yml
...
service:
  pipelines:
    traces:
      receivers: [otlp]
      processors: [batch, resourcedetection]
      exporters: [googlecloud]
    logs:
      receivers: [otlp]
      processors: [batch, resourcedetection]
      exporters: [googlecloud]

変更後に Swagger で Get Books を叩くと、src.functionsスパンにイベントとは別の白丸が表示されました。ログとイベントを「展開して表示」に切り替えると下図のようになります。この白丸は SQLAlchemy ライブラリ内に実装されているログ出力が表示されています。

ログの確認

instrumentation.py を見てわかる通り、LogProviderLoggingHandler にセットされています。この状態での logging.getLogger().info などのログ出力は Otel Collector を経由して Cloud Logging に送信されます。さらにログの中身を見てみると、ログレコード内に tracespanId が追加されていることがわかります。

ログの追加

言わずもがなですが、トレースに追加でログを紐づける場合には logging.getLogger().info などでログを出力すれば OK です。routers.pyget_books 関数を下記のように変更します。

routers.py
routers.py
...
import logging

logger = logging.getLogger()
...
@router.get("/books", tags=["/books"])
async def get_books(db: AsyncSession = Depends(get_db)) -> list[Book]:
    logger.info("out of src.routers span")
    with tracer.start_as_current_span(__name__) as span:
        logger.info("in src.routers span start")
        span.add_event(name="get_books")
        books = await functions.get_books(db)
        res = list(map(Book.model_validate, books))
        logger.info("in src.routers span end")
        return res
...

追加した 3 つのログ出力は想定通りの場所でトレースに紐づいていました。カスタムスパンを開始する前に差し込んだログはルートスパンに表示されました。

シンプルな計装をするだけでもトレースとログの紐付けは簡単に実現することができることから、デバッグ作業時には何がどこで起こっているかをより把握しやすくなると感じました!

メトリクス - Cloud Monitoring

メトリクスに関する計装をします。

instrumentation.py
instrumentaion.py
...
from opentelemetry import metrics
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
...
def instrument(app):
    ...
    # メトリクス関連の設定
    metric_reader = PeriodicExportingMetricReader(OTLPMetricExporter())
    metrics.set_meter_provider(
        MeterProvider(resource=resource, metric_readers=[metric_reader])
    )
    ...

instrumentation.py と同時に Otel Collector の Yaml ファイルも変更します。Service.pipelines 項目に metricstraces 同様に設定します。

otel-collector.yml
otel-collector.yml
...
service:
  pipelines:
    traces:
      receivers: [otlp]
      processors: [batch, resourcedetection]
      exporters: [logging, googlecloud]
    logs:
      receivers: [otlp]
      processors: [batch, resourcedetection]
      exporters: [logging, googlecloud]
    metrics:
      receivers: [otlp]
      processors: [batch, resourcedetection]
      exporters: [logging, googlecloud]

メトリクスに関する計装を行うことで、デフォルトで送信されるメトリクスは下記の 3 つです。

  • Http.server.active_requests
  • Http.server.duration
  • Http.server.response.size

サンプルアプリケーションの適当なメソッドを複数回実行した後に、Cloud Monitoring で確認します。Cloud Monitoring の Metrics Explore で例えば VM Instance - http.server.duration を選択すると、複数回の実行に応じたヒートマップが表示されます。

カスタムメトリクス - Counter -

アプリケーション内でカスタムメトリクスを作成します。関数の呼び出しをカウントする routers_counter というカスタムメトリクスを作成して、attributerouters.type を加えて関数を識別できるようにします。routers.pyget_authorsget_books 関数を下記のように変更します。

metrics.get_meter(__name__).create_counter には、そのカスタムメトリクスの名称とカウンターの単位を渡しています。これを特定の関数の中で routers_counter.add を呼び出し、加算する値や属性を付与します。

routers.py
routers.py
...
meter = metrics.get_meter(__name__)

routers_counter = meter.create_counter(
    "routers.count",
    unit="1"
)
...
@router.get("/authors", tags=["/authors"])
async def get_authors(db: AsyncSession = Depends(get_db)) -> list[Author]:
    routers_counter.add(1, {
        "routers.type": "GET_AUTHORS"
    })
    with tracer.start_as_current_span(__name__) as span:
        span.add_event(name="get_authors")
        authors = await functions.get_authors(db)
        return list(map(Author.model_validate, authors))


@router.get("/books", tags=["/books"])
async def get_books(db: AsyncSession = Depends(get_db)) -> list[Book]:
    routers_counter.add(1, {
        "routers.type": "GET_BOOKS"
    })
    logger.info("out of src.routers span")
    with tracer.start_as_current_span(__name__) as span:
        logger.info("in src.routers span start")
        span.add_event(name="get_books")
        books = await functions.get_books(db)
        res = list(map(Book.model_validate, books))
        logger.info("in src.routers span end")
        return res

Cloud Monitoring の Metrics Explore で VM Instance - routers.count を選択すると、呼び出し回数をチャートで表示することができました。

カスタムメトリクス - Histogram -

今度はヒストグラムのカスタムメトリクスを作成します。同様に routers.pyget_authorsget_books 関数を下記のように変更します。

metrics.get_meter(__name__).create_histogram には、同様にそのカスタムメトリクスの名称とヒストグラムの単位を渡しています。これを特定の関数の中で必要な計算を行い routers_duration_histogram.record を呼び出し、値や付与する属性を記録します。

routers.py
routers.py
...
import time
...
meter = metrics.get_meter(__name__)
...
routers_duration_histogram = meter.create_histogram(
    "routers.duration",
    unit="ms"
)
...
@router.get("/authors", tags=["/authors"])
async def get_authors(db: AsyncSession = Depends(get_db)) -> list[Author]:
    routers_counter.add(1, {
        "routers.type": "GET_AUTHORS"
    })
    with tracer.start_as_current_span(__name__) as span:
        start_time = time.monotonic()

        span.add_event(name="get_authors")
        authors = await functions.get_authors(db)
        res = list(map(Author.model_validate, authors))

        end_time = time.monotonic()

        duration_ms = (end_time - start_time) * 1000
        routers_duration_histogram.record(
            duration_ms,
            attributes={
                "routers.type": "GET_AUTHORS"
            }
        )
        return res


@router.get("/books", tags=["/books"])
async def get_books(db: AsyncSession = Depends(get_db)) -> list[Book]:
        routers_counter.add(1, {
            "routers.type": "GET_BOOKS"
        })
        logger.info("out of src.routers span")
        with tracer.start_as_current_span(__name__) as span:
            start_time = time.monotonic()

            logger.info("in src.routers span start")
            span.add_event(name="get_books")
            books = await functions.get_books(db)
            res = list(map(Book.model_validate, books))

            end_time = time.monotonic()

            duration_ms = (end_time - start_time) * 1000
            routers_duration_histogram.record(
                duration_ms,
                attributes={
                    "routers.type": "GET_BOOKS"
                }
            )
            logger.info("in src.routers span end")
        return res

同様に Cloud Monitoring の Metrics Explore で VM Instance - routers.duration を選択すると、関数の処理時間をヒートマップで表示することができました。

カスタムメトリクスを簡単に送信できるとなると、メトリクスとトレースの紐付けをしたくなります。こちらについては OpenTelemetry だとトレースエグザンプラという機能によって提供しているようですが、提供している言語は限られているようです。

OpenTelemetry for Go だと下記の記事がとてもわかりやすかったです。
https://zenn.dev/google_cloud_jp/articles/20240305-trace-exemplar#はじめに

また、Google Managed Service for Prometheus (GMP) を用いたメトリクスとトレースの紐付けを実践されている下記の記事が勉強になりました。
https://zenn.dev/k6s4i53rx/articles/2023-advent-calendar-google-cloud

OpenTelemetry for Python でのトレースエグザンプラの機能提供は現時点ではなさそうです。(あったらご教授いただけたら幸いです。。)

OpenTelemetry によるテレメトリーデータの 3 本柱に関する基本的な使い方と Google Cloud Obserbability での可視化結果は以上になります。

参考

OpenTelemetry 関連

FastAPI 関連

さいごに

入門ということで OpenTelemetry と Google Cloud Observability を駆使した基本的なテレメトリーデータの可視化を実践してみました。どのように計装が行われているのか & 体系的にまとめたかったので、手動計装で徐々にコードを追加していくことで多少なりとも理解が深まりました。

https://twitter.com/pHaya72

以降は、上記で説明したサンプルコードを動かす説明となります。関数に関しては一部しか計装していないので、興味ある方は手元でコードを追加して遊んでいただければと思います。

サンプルアプリ

環境とコード

Google Cloud Console から起動できる Cloud Shell で動かすことを想定しています。

サンプルコードは Fast API で構成されたアプリケーションに計装する形にしています。
https://github.com/hayashit6239/fastapi-otel-cloudobservability-sample

サンプルコードのベースは、SaitoTsutomu さんのコードを引用させていただいています。
https://github.com/SaitoTsutomu/fastapi-book-sample

こちらを利用して本編の計装を徐々に追加してどのように可視化できるかを見ています。

アプリ と Otel Collector の起動

下記の手順で Cloud Shell 上に FastAPI を用いた Web アプリケーションと Otel Collector を起動します。

※ Cloud Shell 上で起動する都合上、Otel Collector で ADC を無理やり使っている形になっているので気の進まない方は Cloud Run などにデプロイしてください。

terminal
# リポジトリのクローン
git clone git@github.com:hayashit6239/fastapi-otel-cloudobservability-sample.git

# ADC を使った認証
gcloud auth application-default login

# 保存ディレクトリを書き換えて実行
cd fastapi-otel-cloudobservability-sample && \
cp /tmp/[保存ディレクトリ]/application_default_credentials.json ./containers/otel-collector/application_default_credentials.json && \
chmod 644 ./containers/otel-collector/application_default_credentials.json

Otel Collector の設定ファイルである otel-collector.yml の Google Cloud Exporter のプロジェクト ID を修正します。

./containers/otel-collector/otel-collector.yml
./containers/otel-collector/otel-collector.yml
receivers:
  otlp:
    protocols:
      grpc:

processors:
  batch: {}
  resourcedetection:
    detectors: [env, gcp]
    timeout: 20s
    override: false

exporters:
  debug:
    verbosity: detailed
  googlecloud:
    log:
      default_log_name: opentelemetry.io/collector-exported-log
    project: [project id]

service:
  pipelines:
    metrics:
      receivers: [otlp]
      processors: [batch, resourcedetection]
      exporters: [debug, googlecloud]
    logs:
      receivers: [otlp]
      processors: [batch, resourcedetection]
      exporters: [debug, googlecloud]
    traces:
      receivers: [otlp]
      processors: [batch, resourcedetection]
      exporters: [debug, googlecloud]

最後にコンテナのビルドと立ち上げコマンドを実行します。

terminal
docker compose -f docker-compose.yml build
docker compose -f docker-compose.yml up

FastAPI の挙動を Swagger で確認

Cloud Shell のウェブでプレビュー機能を利用します、ポートを 8000 に変更して「ポート 8000 でプレビュー」をクリックします。別ブラウザが立ち上がるので、https://8000-**.cloudshell.dev/ となっている URL を https://8000-**.cloudshell.dev/docs で Swagger を起動します。

Swagger が起動できれば、あとは計装済みである Get Authors や Get Books を叩いて Google Cloud Observability で可視化したり、別の関数に追加の実装して遊んでみてください。

脚注
  1. A language-specific implementation of OpenTelemetry in Python ↩︎ ↩︎ ↩︎

  2. OpenTelemetry FastAPI Instrumentation ↩︎

  3. OpenTelemetry SQLAlchemy Instrumentation ↩︎

Discussion