入門!Python × OpenTelemetry × Google Cloud Observability
ずっと憧れていた OpenTelemetry にやっと入門できました。Google Cloud Next '24 でリブランドが発表された Google Cloud Observability でどのようにテレメトリを連携できるかを中心にまとめていきたいと思います。
Overview
- Python による OpenTelemetry と Google Cloud Observability の使い方についてフォーカスした記事となっています。OpenTelemtry がどういったものなのかは多くの記事で紹介されているためここでは割愛しています。(参考にさせていただいた記事をご参照ください。)
- 今回はどういったコードが計装に必要なのかを知りたかったので手動計装しています。トレース・ログ・メトリクスとそれぞれについて、基本的な使い方に関するコードと可視化結果を載せています。
- 実際に手を動かしながら把握したい方向けに一部計装済みのサンプルアプリケーションを用意しています。Cloud Shell で簡単に動かせるようにしているため、興味ある方はこちらもご参照ください。
トレース - Cloud Trace
トレースに関する最低限の計装します。
instrumentaion.py
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
from opentelemetry.sdk.resources import Resource
from opentelemetry.semconv.resource import ResourceAttributes
# トレース関連
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
BatchSpanProcessor,
SimpleSpanProcessor
)
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
def instrument(app):
# Semantic Convestions を指定
resource = Resource.create({
ResourceAttributes.SERVICE_NAME: "book-service",
ResourceAttributes.SERVICE_INSTANCE_ID: "book-service",
})
# Trace の設定
trace.set_tracer_provider(TracerProvider(resource=resource))
trace.get_tracer_provider().add_span_processor(
SimpleSpanProcessor(OTLPSpanExporter())
)
# FastAPI の計装
FastAPIInstrumentor.instrument_app(app)
また、Otel Collector の設定もトレースのみ下記のように設定します。
otel-collector.yml
receivers:
otlp:
protocols:
grpc:
processors:
batch: {}
resourcedetection:
detectors: [env, gcp]
timeout: 40s
override: false
exporters:
logging:
googlecloud:
project: "[project_id]"
service:
pipelines:
traces:
receivers: [otlp]
processors: [batch, resourcedetection]
exporters: [logging, googlecloud]
さっそく Swagger で Get Books を叩いてみると、下図のようなトレース情報が Cloud Trace によって可視化できました。(今回 FastAPI でサンプルアプリを実装していて、コード変更後の動作確認はデフォルトで提供されている Swagger を使っています。)
instrumentaion.py
の FastAPIInstrumentor.instrument_app(app)
の部分が FastAPI アプリケーションのリクエスト処理やルーティングなどの情報をトレースする計装[2] となります。トレース情報を思ったより簡単に可視化することができたのは嬉しかったものの、ルートスパンが表示されたのみで細かいスパンは表示されませんでした。
イメージではもう少し細かいもの表示されるかと思ったのですが、そうではなかったので計装を進めていきたいと思います!
SQLAlchemy の計装の追加
DB として SQLite3 を利用しているため関連するライブラリを追加してみます。
instrumentaion.py
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.sqlite3 import SQLite3Instrumentor
...
def instrument(app):
...
# FastAPI の計装
FastAPIInstrumentor.instrument_app(app)
# SQLite3 の計装
SQLite3Instrumentor().instrument()
instrumentaion.py
の SQLite3Instrumentor().instrument()
でアプリケーション内の SQLite3 の操作をトレースが可能となります。しかし、Swagger で Get Books を叩いても Cloud Trace で可視化されるトレースには変化がありませんでした。
実装を見ると DB として SQLite3 は使用していますが、実際に DB の操作している部分では SQLAlchemy という ORM を利用していました。なので、SQLite3 ではなく SQLAlchemy を使用した DB 操作をトレースする計装[3] に変更してみました。
instrumentaion.py
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
...
def instrument(app):
...
# FastAPI の計装
FastAPIInstrumentor.instrument_app(app)
# SQLAlchemy の計装
SQLAlchemyInstrumentor().instrument(enable_commenter=True, commenter_options={})
結果として、Swagger で Get Books を叩いてみると Connect
というスパンが表示されました。SQLAlchemy の動作を追ってスパンが自動で生成されているようです。
カスタムスパンの追加
もう少しリクエストの流れを可視化したいと思ったのでカスタムスパンを追加してみます。routers.py
の get_books
関数を下記のように変更します。
routers.py
from opentelemetry import trace
...
tracer = trace.get_tracer_provider().get_tracer("book-service")
...
@router.get("/books", tags=["/books"])
async def get_books(db: AsyncSession = Depends(get_db)) -> list[Book]:
"""
変更前
books = await functions.get_books(db)
return list(map(Book.model_validate, books))
"""
with tracer.start_as_current_span(__name__) as span:
books = await functions.get_books(db)
return list(map(Book.model_validate, books))
...
trace.get_tracer_provider().get_tracer("book-service")
で現在のトレースプロバイダーを取得して、book-service というトレーサーを取得しています。
with tracer.start_as_current_span(__name__) as span
で新しいスパンを作成して現在のコンテキストが設定されます。
変更後に Swagger で Get Books を叩くと、src.routers (start_as_current_span に渡した引数の __name__ の結果)
という名前でスパンが表示されるようになりました。
加えて、get_books
関数で呼び出している DB から情報を抽出している get_books
関数についてもカスタムスパンを作成してみます。functions.py
の get_books
関数を下記のように変更します。
functions.py
from opentelemetry import trace
...
tracer = trace.get_tracer_provider().get_tracer("book-service")
...
async def get_books(db: AsyncSession):
"""
変更前
return await db.scalars(select(Book))
"""
with tracer.start_as_current_span(__name__) as span:
return await db.scalars(select(Book))
...
同様に変更後に Swagger で Get Books を叩くと、src.functions
という名前でもスパンが表示されるようになりました。
このようにフレームワークや DB の計装をするだけでもスパンを作成できて、Cloud Trace で可視化することができますし、必要に応じてカスタムスパンも数行で追加できることがわかりました。
イベントの追加
また、スパンにイベントという追加情報を付与することができます。イベントとはトレースデータに情報に追加することができるもので、動作をより理解するために役立つようです。
ユースケースとしては、下記がありそうです。
- 状態の変化の記録:処理の開始/終了の追加やオブジェクトの状態変化の記録
- デバッグ情報の記録:エラーや例外が発生した際のデバッグに役立つ情報の追加
- パフォーマンス分析:リソースの使用状況やキャッシュに関する情報の追加
実際にイベントを追加するために、routers.py
の get_books
関数を下記のように変更します。
routers.py
from opentelemetry import trace
...
tracer = trace.get_tracer_provider().get_tracer("book-service")
...
async def get_books(db: AsyncSession):
with tracer.start_as_current_span(__name__) as span:
span.add_event(name="get_books")
return await db.scalars(select(Book))
...
span.add_event(name="get_books")
でスパンにイベントを追加することができます。name
が必須の引数となっています。
変更後に Swagger で Get Books を叩くと、src.routers
のスパンに get_books
という名前のイベントが追加されました。該当のスパンをクリックして、「ログとイベント」タブに情報が表示されます。
また、span.add_event(name="get_books")
にはオプション引数として timestamp
や attributes
を追加できます。functions.py
の get_books
関数を下記のように変更します。
functions.py
from opentelemetry import trace
import time
...
tracer = trace.get_tracer_provider().get_tracer("book-service")
...
async def get_books(db: AsyncSession):
with tracer.start_as_current_span(__name__) as span:
span.add_event(
name="select all books",
timestamp=int(time.time()),
attributes={
"sql": "select * from book"
}
)
return await db.scalars(select(Book))
...
変更後に Swagger で Get Books を叩くと、src.functions
のスパンに select all books
という名前のイベントが追加され、タブの下部のキーバリュー表に追加した情報が表示されるようになりました。
各スパンでポイントとなりそうな情報をイベントとして仕込んでおくと色々な使い道がありそうだと感じました!「ログとイベント」タブということはログも見れそうな気がしますね。
Batch と Simple の違い
instrumentation.py
であえて SpanProcessor
を Batch
と Simple
の 2 種類を import しています。色々な記事を見ているとどちらも使われていて違いが気になったので簡単にまとめてみます。
-
SimpleSpanProcessor
スパンが終了するたびに、設定された SpanExporter に直接スパンが渡されようです。メリットとしては、スパンごとにエクスポーターを呼び出すため遅延は少ないようですが、頻度が多くなることでオーバーヘッドが生じやすいです。 -
BatchSpanProcessor
スパンはキューにバッファリングされて、設定された条件(キューの最大サイズなど)に基づいてバッチで SpanExpoter に渡されるようです。エクスポーターの呼び出し回数を減らすことでオーバーヘッドを削減できます。
オーバーヘッドを減らしてパフォーマンス性を重視するなら BatchSpanProcessor
、遅延なくリアルタイム性を重視するなら SimpleSpanProcessor
といったところでしょうか。
ログ - Cloud Logging
ログに関する計装をします。
instrumentaion.py
...
# ログ関連
from opentelemetry import _logs
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
from opentelemetry.sdk._logs.export import (
BatchLogRecordProcessor,
SimpleLogRecordProcessor
)
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter
...
def instrument(app):
...
logger_provider = LoggerProvider(resource=resource)
_logs.set_logger_provider(
logger_provider.add_log_record_processor(
BatchLogRecordProcessor(OTLPLogExporter(insecure=True)
)
))
handler = LoggingHandler(level=logging.NOTSET, logger_provider=logger_provider)
logging.getLogger().addHandler(handler)
logging.getLogger().setLevel("INFO")
...
instrumentation.py
と同時に Otel Collector の Yaml ファイルも変更します。Service.pipelines
項目に logs
を traces
同様に設定します。
otel-collector.yml
...
service:
pipelines:
traces:
receivers: [otlp]
processors: [batch, resourcedetection]
exporters: [googlecloud]
logs:
receivers: [otlp]
processors: [batch, resourcedetection]
exporters: [googlecloud]
変更後に Swagger で Get Books を叩くと、src.functions
のスパンにイベントとは別の白丸が表示されました。ログとイベントを「展開して表示」に切り替えると下図のようになります。この白丸は SQLAlchemy ライブラリ内に実装されているログ出力が表示されています。
ログの確認
instrumentation.py
を見てわかる通り、LogProvider
は LoggingHandler
にセットされています。この状態での logging.getLogger().info
などのログ出力は Otel Collector を経由して Cloud Logging に送信されます。さらにログの中身を見てみると、ログレコード内に trace
と spanId
が追加されていることがわかります。
ログの追加
言わずもがなですが、トレースに追加でログを紐づける場合には logging.getLogger().info
などでログを出力すれば OK です。routers.py
の get_books
関数を下記のように変更します。
routers.py
...
import logging
logger = logging.getLogger()
...
@router.get("/books", tags=["/books"])
async def get_books(db: AsyncSession = Depends(get_db)) -> list[Book]:
logger.info("out of src.routers span")
with tracer.start_as_current_span(__name__) as span:
logger.info("in src.routers span start")
span.add_event(name="get_books")
books = await functions.get_books(db)
res = list(map(Book.model_validate, books))
logger.info("in src.routers span end")
return res
...
追加した 3 つのログ出力は想定通りの場所でトレースに紐づいていました。カスタムスパンを開始する前に差し込んだログはルートスパンに表示されました。
シンプルな計装をするだけでもトレースとログの紐付けは簡単に実現することができることから、デバッグ作業時には何がどこで起こっているかをより把握しやすくなると感じました!
メトリクス - Cloud Monitoring
メトリクスに関する計装をします。
instrumentation.py
...
from opentelemetry import metrics
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
...
def instrument(app):
...
# メトリクス関連の設定
metric_reader = PeriodicExportingMetricReader(OTLPMetricExporter())
metrics.set_meter_provider(
MeterProvider(resource=resource, metric_readers=[metric_reader])
)
...
instrumentation.py
と同時に Otel Collector の Yaml ファイルも変更します。Service.pipelines
項目に metrics
を traces
同様に設定します。
otel-collector.yml
...
service:
pipelines:
traces:
receivers: [otlp]
processors: [batch, resourcedetection]
exporters: [logging, googlecloud]
logs:
receivers: [otlp]
processors: [batch, resourcedetection]
exporters: [logging, googlecloud]
metrics:
receivers: [otlp]
processors: [batch, resourcedetection]
exporters: [logging, googlecloud]
メトリクスに関する計装を行うことで、デフォルトで送信されるメトリクスは下記の 3 つです。
- Http.server.active_requests
- Http.server.duration
- Http.server.response.size
サンプルアプリケーションの適当なメソッドを複数回実行した後に、Cloud Monitoring で確認します。Cloud Monitoring の Metrics Explore で例えば VM Instance - http.server.duration
を選択すると、複数回の実行に応じたヒートマップが表示されます。
カスタムメトリクス - Counter -
アプリケーション内でカスタムメトリクスを作成します。関数の呼び出しをカウントする routers_counter
というカスタムメトリクスを作成して、attribute
に routers.type
を加えて関数を識別できるようにします。routers.py
の get_authors
と get_books
関数を下記のように変更します。
metrics.get_meter(__name__).create_counter
には、そのカスタムメトリクスの名称とカウンターの単位を渡しています。これを特定の関数の中で routers_counter.add
を呼び出し、加算する値や属性を付与します。
routers.py
...
meter = metrics.get_meter(__name__)
routers_counter = meter.create_counter(
"routers.count",
unit="1"
)
...
@router.get("/authors", tags=["/authors"])
async def get_authors(db: AsyncSession = Depends(get_db)) -> list[Author]:
routers_counter.add(1, {
"routers.type": "GET_AUTHORS"
})
with tracer.start_as_current_span(__name__) as span:
span.add_event(name="get_authors")
authors = await functions.get_authors(db)
return list(map(Author.model_validate, authors))
@router.get("/books", tags=["/books"])
async def get_books(db: AsyncSession = Depends(get_db)) -> list[Book]:
routers_counter.add(1, {
"routers.type": "GET_BOOKS"
})
logger.info("out of src.routers span")
with tracer.start_as_current_span(__name__) as span:
logger.info("in src.routers span start")
span.add_event(name="get_books")
books = await functions.get_books(db)
res = list(map(Book.model_validate, books))
logger.info("in src.routers span end")
return res
Cloud Monitoring の Metrics Explore で VM Instance - routers.count
を選択すると、呼び出し回数をチャートで表示することができました。
カスタムメトリクス - Histogram -
今度はヒストグラムのカスタムメトリクスを作成します。同様に routers.py
の get_authors
と get_books
関数を下記のように変更します。
metrics.get_meter(__name__).create_histogram
には、同様にそのカスタムメトリクスの名称とヒストグラムの単位を渡しています。これを特定の関数の中で必要な計算を行い routers_duration_histogram.record
を呼び出し、値や付与する属性を記録します。
routers.py
...
import time
...
meter = metrics.get_meter(__name__)
...
routers_duration_histogram = meter.create_histogram(
"routers.duration",
unit="ms"
)
...
@router.get("/authors", tags=["/authors"])
async def get_authors(db: AsyncSession = Depends(get_db)) -> list[Author]:
routers_counter.add(1, {
"routers.type": "GET_AUTHORS"
})
with tracer.start_as_current_span(__name__) as span:
start_time = time.monotonic()
span.add_event(name="get_authors")
authors = await functions.get_authors(db)
res = list(map(Author.model_validate, authors))
end_time = time.monotonic()
duration_ms = (end_time - start_time) * 1000
routers_duration_histogram.record(
duration_ms,
attributes={
"routers.type": "GET_AUTHORS"
}
)
return res
@router.get("/books", tags=["/books"])
async def get_books(db: AsyncSession = Depends(get_db)) -> list[Book]:
routers_counter.add(1, {
"routers.type": "GET_BOOKS"
})
logger.info("out of src.routers span")
with tracer.start_as_current_span(__name__) as span:
start_time = time.monotonic()
logger.info("in src.routers span start")
span.add_event(name="get_books")
books = await functions.get_books(db)
res = list(map(Book.model_validate, books))
end_time = time.monotonic()
duration_ms = (end_time - start_time) * 1000
routers_duration_histogram.record(
duration_ms,
attributes={
"routers.type": "GET_BOOKS"
}
)
logger.info("in src.routers span end")
return res
同様に Cloud Monitoring の Metrics Explore で VM Instance - routers.duration
を選択すると、関数の処理時間をヒートマップで表示することができました。
カスタムメトリクスを簡単に送信できるとなると、メトリクスとトレースの紐付けをしたくなります。こちらについては OpenTelemetry だとトレースエグザンプラという機能によって提供しているようですが、提供している言語は限られているようです。
OpenTelemetry for Go だと下記の記事がとてもわかりやすかったです。
また、Google Managed Service for Prometheus (GMP) を用いたメトリクスとトレースの紐付けを実践されている下記の記事が勉強になりました。
OpenTelemetry for Python でのトレースエグザンプラの機能提供は現時点ではなさそうです。(あったらご教授いただけたら幸いです。。)
OpenTelemetry によるテレメトリーデータの 3 本柱に関する基本的な使い方と Google Cloud Obserbability での可視化結果は以上になります。
参考
OpenTelemetry 関連
- OpenTelemetryが気になってるけど実際に始めて「なるほどね」ってなるにはどうしたらいいかについて15分でまとめて喋ります
- 仕様と実装から理解するOpenTelemetryの全体像
- OpenTelemetry for Go + Cloud MonitoringでTrace Exemplarを使う
- OpenTelemetry を使ったトレースエグザンプラーの活用 / otel-trace-exemplar
- C# ではじめる OpenTelemetry
- OpenTelemetry CollectorでGKEのリソース情報を自動で埋め込む
- Google Cloud 公式ドキュメント - OTLP トレースを収集する
FastAPI 関連
さいごに
入門ということで OpenTelemetry と Google Cloud Observability を駆使した基本的なテレメトリーデータの可視化を実践してみました。どのように計装が行われているのか & 体系的にまとめたかったので、手動計装で徐々にコードを追加していくことで多少なりとも理解が深まりました。
以降は、上記で説明したサンプルコードを動かす説明となります。関数に関しては一部しか計装していないので、興味ある方は手元でコードを追加して遊んでいただければと思います。
サンプルアプリ
環境とコード
Google Cloud Console から起動できる Cloud Shell で動かすことを想定しています。
サンプルコードは Fast API で構成されたアプリケーションに計装する形にしています。
サンプルコードのベースは、SaitoTsutomu さんのコードを引用させていただいています。
こちらを利用して本編の計装を徐々に追加してどのように可視化できるかを見ています。
アプリ と Otel Collector の起動
下記の手順で Cloud Shell 上に FastAPI を用いた Web アプリケーションと Otel Collector を起動します。
※ Cloud Shell 上で起動する都合上、Otel Collector で ADC を無理やり使っている形になっているので気の進まない方は Cloud Run などにデプロイしてください。
# リポジトリのクローン
git clone git@github.com:hayashit6239/fastapi-otel-cloudobservability-sample.git
# ADC を使った認証
gcloud auth application-default login
# 保存ディレクトリを書き換えて実行
cd fastapi-otel-cloudobservability-sample && \
cp /tmp/[保存ディレクトリ]/application_default_credentials.json ./containers/otel-collector/application_default_credentials.json && \
chmod 644 ./containers/otel-collector/application_default_credentials.json
Otel Collector の設定ファイルである otel-collector.yml の Google Cloud Exporter のプロジェクト ID を修正します。
./containers/otel-collector/otel-collector.yml
receivers:
otlp:
protocols:
grpc:
processors:
batch: {}
resourcedetection:
detectors: [env, gcp]
timeout: 20s
override: false
exporters:
debug:
verbosity: detailed
googlecloud:
log:
default_log_name: opentelemetry.io/collector-exported-log
project: [project id]
service:
pipelines:
metrics:
receivers: [otlp]
processors: [batch, resourcedetection]
exporters: [debug, googlecloud]
logs:
receivers: [otlp]
processors: [batch, resourcedetection]
exporters: [debug, googlecloud]
traces:
receivers: [otlp]
processors: [batch, resourcedetection]
exporters: [debug, googlecloud]
最後にコンテナのビルドと立ち上げコマンドを実行します。
docker compose -f docker-compose.yml build
docker compose -f docker-compose.yml up
FastAPI の挙動を Swagger で確認
Cloud Shell のウェブでプレビュー機能を利用します、ポートを 8000 に変更して「ポート 8000 でプレビュー」をクリックします。別ブラウザが立ち上がるので、https://8000-**.cloudshell.dev/
となっている URL を https://8000-**.cloudshell.dev/docs
で Swagger を起動します。
Swagger が起動できれば、あとは計装済みである Get Authors や Get Books を叩いて Google Cloud Observability で可視化したり、別の関数に追加の実装して遊んでみてください。
Discussion