GKEでEnvoy、OPAなどAP以外のプロダクトもトレース計装したいですよね
はじめに
こんにちは。クラウドエース株式会社で SRE をしている間瀬です。
この記事は OpenTelemetry Advent Calendar 2023 12日目の記事です。
マイクロサービスアーキテクチャ等の普及に伴いシステムのアーキテクチャが複雑になるにつれて、自前のアプリケーションだけで実装するのではなく、幾つかのプロダクトを組み合わせて実現するケースが増えてきていると思います。
このような構成においてパフォーマンスをモニタリングする上では自前のアプリケーションだけでなく、これらのプロダクトも含めて計装できることが理想と言えるでしょう。
今回は、幾つかのプロダクトを例に自前のアプリケーションに加えて一気通貫で計装してみたいと思います。
尚、今回記事で計装対象とするのはトレースのみを対象とさせていただきます。
(メトリクスやログも対象にしたかったのですが、時間が足りず。。。次の機会にさせてください。)
紹介するアーキテクチャ
今回、計装するアーキテクチャの概要は以下の通りです。
Google Cloud の Google Kubernetes Engine(以下、GKE) 上にそれぞれプロダクトやアプリケーションをデプロイして Cloud Trace へ連携することで計装していきたいと思います。
GKE は Google Cloud のマネージドな Kubernetes サービスで Cloud Trace も同じく Google Cloud のサービスで OpenTelemetry の出力先としてサポートされています。
プロダクトの役割は以下の通りです。
- Keycloak: ユーザーに対する認証
- Open Policy Agent (以下、OPA): ユーザーの認可
- Envoy: クライアントからのリクエストのプロキシ及び、認証/認可。認可は OPA へ委譲する。
- app1, app2: サンプルアプリケーション
トレースを計装するための構成は以下の通りです。
今回は OpenTelemetry Collector にて各プロダクトやアプリケーションにて取得するトレースを集約して Cloud Trace へ連携する構成としています。
OpenTelemetry Collector を利用することでトレースを取得するアプリケーションやプロダクトから Cloud Trace などの出力先に依存することなく実装することが可能です。
また、今回は言及しませんがトレースだけでなくメトリクスやログも集約して各種出力先へ連携することが可能です。
目指すところ
クライアントからのリクエストを Envoy で認証/認可をしてサンプルアプリケーションにプロキシされるまでの処理を全てトレースできるようにすることをゴールにしたいと思います。
全ての処理をトレースすると以下のようになります。
Envoy の中で行っている認証/認可の処理に加えて連携先である Keycloak や OPA での処理も計装できることが分かります。
ちなみにアプリケーションのみ計装した状態は以下の通りです。
もちろんこの状態でもダメではないのですが、Envoy、Keycloak、OPAでの処理内容がブラックボックスになってしまっています。
一気通貫にトレースをモニタリングできるようにすることで例えば認証/認可の処理でエラーが発生してアプリケーションまでリクエストが到達しないケースにおいてもトレースを取得することができるようになります。
例えば以下では、OPAでの認可状況(正常、異常)は確か debug モードにしないとログ等に出力してくれないので分かりづらいのですが、トレースをモニタリングすることで追跡することが可能です。
やってみよう
以降では各所でのトレース取得方法について紹介したいと思います。
記事では重要なポイントだけ掲載させていただきます。検証に使用したリポジトリは以下になりますので参考にしてください。
アプリケーション
今回のアプリケーションは Golang で http によるリクエストに対して自動でトレース情報を生成してくれる otelhttp を使っています。
ソースコードではトレース以外にもメトリクスを収集、公開するための実装も含まれているのでご了承ください。また、ログにおいても Google Cloud の Cloud logging へ構造化ロギングとして連携して Cloud Trace のトレース情報と紐づけられるように実装しています。
func main() {
loadEnvFile()
ctx := context.Background()
port := os.Getenv("APP_PORT")
appName := os.Getenv("APP_NAME")
oltpAddr := os.Getenv("OTEL_AGENT_ENDPOINT")
log.SetFlags(0)
shutdown := initTraceProvider(ctx, oltpAddr, appName)
defer shutdown()
meter := otel.Meter("server-meter")
commonLabels = []attribute.KeyValue{
attribute.String("project_id", os.Getenv("PROJECT_ID")),
}
reveiveCount, _ = meter.Int64Counter(
"api_server/receive_counts",
metric.WithDescription("The number of receive processed"),
)
requestLatency, _ = meter.Float64Histogram(
"api_server/request_latency",
metric.WithDescription("The latency of requests processed"),
)
requestCount, _ = meter.Int64Counter(
"api_server/request_counts",
metric.WithDescription("The number of requests processed"),
)
h := newHandler()
mux := http.NewServeMux()
mux.Handle("/api/v1/sleep", http.HandlerFunc(h.sleep))
mux.Handle("/api/v1/chain", http.HandlerFunc(h.sleepAndCall))
mux.Handle("/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(`{"status": "OK"}`))
}))
log.Println(Entry{
Severity: "INFO",
Message: "Starting Http Server...",
Component: appName,
})
log.Fatal(http.ListenAndServe(fmt.Sprintf(":%s", port), otelhttp.NewHandler(mux, "server",
otelhttp.WithMessageEvents(otelhttp.ReadEvents, otelhttp.WriteEvents),
)))
}
//リクエスト/api/v1/sleep に対する処理内容
func (h *handler) sleep(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
_, span := tracer.Start(ctx, "sleep")
defer span.End()
l := []attribute.KeyValue{
attribute.String("trace_id", span.SpanContext().TraceID().String()),
attribute.String("span_id", span.SpanContext().SpanID().String()),
}
labels := append(commonLabels, l...)
reveiveCount.Add(ctx, 1, metric.WithAttributes(labels...))
log.Println(Entry{
Severity: "INFO",
Message: "Handling request",
Component: os.Getenv("APP_NAME"),
Trace: makeTraceIdFmt(span.SpanContext().TraceID().String()),
SpanId: span.SpanContext().SpanID().String(),
})
time.Sleep(2 * time.Second)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write([]byte(fmt.Sprintf(`{"message": "Good Morning. 'I'm wake up.", "version": %s}`, os.Getenv("APP_VERSION"))))
}
//リクエスト/api/v1/chain に対する処理内容
func (h *handler) sleepAndCall(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
child, span := tracer.Start(ctx, "sleepAndCall")
defer span.End()
l := []attribute.KeyValue{
attribute.String("trace_id", span.SpanContext().TraceID().String()),
attribute.String("span_id", span.SpanContext().SpanID().String()),
}
labels := append(commonLabels, l...)
reveiveCount.Add(child, 1, metric.WithAttributes(labels...))
log.Println(Entry{
Severity: "INFO",
Message: "Handle request. This function will call other service",
Component: os.Getenv("APP_NAME"),
Trace: makeTraceIdFmt(span.SpanContext().TraceID().String()),
SpanId: span.SpanContext().SpanID().String(),
})
time.Sleep(2 * time.Second)
hreq, err := http.NewRequestWithContext(child, "GET", os.Getenv("ENDPOINT"), nil)
if err != nil {
log.Println(Entry{
Severity: "ERROR",
Message: fmt.Sprintf("Failed create http request: %v", err),
Component: os.Getenv("APP_NAME"),
Trace: makeTraceIdFmt(span.SpanContext().TraceID().String()),
SpanId: span.SpanContext().SpanID().String(),
})
w.WriteHeader(http.StatusInternalServerError)
return
}
startTime := time.Now()
resp, err := h.cli.Do(hreq)
if err != nil {
log.Println(Entry{
Severity: "ERROR",
Message: fmt.Sprintf("Failed call request: %v", err),
Component: os.Getenv("APP_NAME"),
Trace: makeTraceIdFmt(span.SpanContext().TraceID().String()),
SpanId: span.SpanContext().SpanID().String(),
})
w.WriteHeader(http.StatusInternalServerError)
return
}
latencyMs := float64(time.Since(startTime))
resp.Body.Close()
requestLatency.Record(child, latencyMs, metric.WithAttributes(labels...))
requestCount.Add(child, 1, metric.WithAttributes(labels...))
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{"message": "Good Morning. 'I'm wake up and call other service."}`))
}
// OtelTelemetry Collectorへ連携するためのProvider初期化
func initTraceProvider(ctx context.Context, otelAgentAddr string, serviceName string) func() {
res, err := resource.New(ctx,
resource.WithFromEnv(),
resource.WithProcess(),
resource.WithTelemetrySDK(),
resource.WithHost(),
resource.WithAttributes(
semconv.ServiceNameKey.String(serviceName),
),
)
if err != nil {
return nil
}
metricExp, err := otlpmetricgrpc.New(
ctx,
otlpmetricgrpc.WithInsecure(),
otlpmetricgrpc.WithEndpoint(otelAgentAddr))
if err != nil {
return nil
}
meterProvider := sdkmetric.NewMeterProvider(
sdkmetric.WithResource(res),
sdkmetric.WithReader(
sdkmetric.NewPeriodicReader(
metricExp,
sdkmetric.WithInterval(2*time.Second),
),
),
)
otel.SetMeterProvider(meterProvider)
traceClient := otlptracegrpc.NewClient(
otlptracegrpc.WithInsecure(),
otlptracegrpc.WithEndpoint(otelAgentAddr),
otlptracegrpc.WithDialOption(grpc.WithBlock()))
exporter, err := otlptrace.New(ctx, traceClient)
if err != nil {
return nil
}
bsp := sdktrace.NewBatchSpanProcessor(exporter)
tracerProvider := sdktrace.NewTracerProvider(
sdktrace.WithSampler(sdktrace.AlwaysSample()),
sdktrace.WithResource(res),
sdktrace.WithSpanProcessor(bsp),
)
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
otel.SetTracerProvider(tracerProvider)
return func() {
cxt, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
if err := exporter.Shutdown(cxt); err != nil {
otel.Handle(err)
}
if err := meterProvider.Shutdown(cxt); err != nil {
otel.Handle(err)
}
}
}
Envoy
デフォルトでは有効化されていないので、filter_chains に OpenTelemetryConfig を追加して clusters にトレースの出力先を設定して有効化します。
公式 doc よりサンプルが提供されているので参考にしました。
envoy.yaml
static_resources:
listeners:
- name: listener_0
address:
socket_address:
address: 0.0.0.0
port_value: 51051
filter_chains:
- filters:
- name: envoy.filters.network.http_connection_manager
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
stat_prefix: ingress_http
access_log:
- name: envoy.access_loggers.stdout
typed_config:
"@type": type.googleapis.com/envoy.extensions.access_loggers.stream.v3.StdoutAccessLog
generate_request_id: true
# OpenTelemetryConfigの設定
tracing:
provider:
name: envoy.tracers.opentelemetry
typed_config:
"@type": type.googleapis.com/envoy.config.trace.v3.OpenTelemetryConfig
grpc_service:
envoy_grpc:
cluster_name: opentelemetry_collector
timeout: 5.0s
service_name: front-envoy
# ここまで
route_config:
name: local_route
virtual_hosts:
- name: local_service
domains: ["*"]
routes:
- match:
prefix: "/"
route:
cluster: service_envoyproxy_io
# 一部記載を割愛
clusters:
# トレース出力先の設定
- name: opentelemetry_collector
type: STRICT_DNS
lb_policy: ROUND_ROBIN
typed_extension_protocol_options:
envoy.extensions.upstreams.http.v3.HttpProtocolOptions:
"@type": type.googleapis.com/envoy.extensions.upstreams.http.v3.HttpProtocolOptions
explicit_http_config:
http2_protocol_options: {}
load_assignment:
cluster_name: opentelemetry_collector
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: otel-collector
port_value: 4317
OPA
OPA もデフォルトではトレースが有効化されていないため、以下のように OPA 起動時に必要となるファイルへトレースの出力先を追加します。
こちらも詳細は公式 doc を参考にしてください。
opa.yaml
plugins:
envoy_ext_authz_grpc:
addr: ":9191"
enable-reflection: true
decision_logs:
console: true
# 以下を追加
distributed_tracing:
type: grpc
address: "otel-collector:4317"
Keycloak
Keycloak においてもデフォルトではトレースが有効化されておらず、opentelemetry-javaagent.jar というライブラリをインストールした上で Java の起動オブションにて javaagent として指定してあげる必要があります。
Keycloak には複数のディストリビーションが存在していますが、今回は慣れの問題で Widfly 版にて検証を行っています。この場合、自前でライブラリをインストールしてあげる必要がありそうだったので、コンテナイメージのビルド時にインストールするようにしています。
(ちゃんと調べていないですが、Quarkus 版のが新しいので本来はそちらを使った方がよさそうです。)
Dockerfile-keycloak
FROM jboss/keycloak:latest
ADD https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jar /tmp/opentelemetry-javaagent.jar
上記にてビルドしたイメージを使って Pod をデプロイする際に環境変数にて必要なパラメータを指定するようにします。
今回は検証につき Deployment としてデプロイしていますが、本来 Keycloak はステートフルなワークロードなので StatefulSet でのデプロイや永続化先としてデータベースを用いるのが一般的かと思います。
(また、下記のような設定で起動できるところは確認できていますが、KEYCLOAK_OTEL_SAMPLING_PERCENTAGEの妥当性は検証できていないのでご注意ください。)
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
app: keycloak
name: keycloak
spec:
replicas: 1
selector:
matchLabels:
app: keycloak
strategy: {}
template:
metadata:
labels:
app: keycloak
spec:
containers:
- image: keycloak:v0.0.1
name: keycloak
resources: {}
env:
- name: KEYCLOAK_USER
value: admin
- name: KEYCLOAK_PASSWORD
value: admin
- name: KEYCLOAK_OTEL_SAMPLING_PERCENTAGE
value: "1.0"
- name: OTEL_SERVICE_NAME
value: keycloak
- name: OTEL_EXPORTER_OTLP_ENDPOINT
value: http://otel-collector:4317
- name: JAVA_OPTS_APPEND
value: "-javaagent:/tmp/opentelemetry-javaagent.jar"
OpenTelemetry Collector
トレースの集約先となる OpenTelemetry Collector では以下のような設定を行い Cloud Trace へトレースを連携できるようにしています。
おまけではありますが、 proccessors では resourcedetection を使用して Google Cloud の情報(ゾーン、プラットフォーム情報等)を付与するようにしています。
resourcedetection については以下を参考してください。
receivers:
# Trace情報の受け口
otlp:
protocols:
grpc:
processors:
batch: {}
# Google Cloud 情報の付与
resourcedetection:
detectors: [gcp]
override: false
exporters:
debug:
# Cloud Traceへ連携するための設定
googlecloud:
log:
default_log_name: opentelemetry.io/collector-exported-log
service:
pipelines:
traces:
receivers: [otlp]
processors: [batch,resourcedetection]
exporters: [debug,googlecloud]
長々と記載しましたが、これらの設定を行うことで最初に紹介したような一気通貫でのトレース情報の取得が可能になり、モニタリングが可能になります。
お気づきかとは思いますが、プロダクトが OpenTelemetry に対応していないと情報を取得することはできないので開発業務では各自対応有無を確認することになるかと思います。
まとめ
今回は認証/認可を自前のアプリケーション以外で実現する構成を例に一気通貫でトレース情報を取得する方法を紹介させていただきました。
アプリケーション以外のプロダクトについても可能な限り計装して解析性の高いサービスを開発していきましょう。
今回時間の関係で紹介できなかったメトリクスやログの収集についても別途検証を進めて改めて情報発信したいと思います。
記事を閲覧いただきありがとうございました。
Discussion