🎄

GKEでEnvoy、OPAなどAP以外のプロダクトもトレース計装したいですよね

2023/12/12に公開

はじめに

こんにちは。クラウドエース株式会社で SRE をしている間瀬です。
この記事は OpenTelemetry Advent Calendar 2023 12日目の記事です。

https://qiita.com/advent-calendar/2023/otel

マイクロサービスアーキテクチャ等の普及に伴いシステムのアーキテクチャが複雑になるにつれて、自前のアプリケーションだけで実装するのではなく、幾つかのプロダクトを組み合わせて実現するケースが増えてきていると思います。
このような構成においてパフォーマンスをモニタリングする上では自前のアプリケーションだけでなく、これらのプロダクトも含めて計装できることが理想と言えるでしょう。
今回は、幾つかのプロダクトを例に自前のアプリケーションに加えて一気通貫で計装してみたいと思います。

尚、今回記事で計装対象とするのはトレースのみを対象とさせていただきます。
(メトリクスやログも対象にしたかったのですが、時間が足りず。。。次の機会にさせてください。)

紹介するアーキテクチャ

今回、計装するアーキテクチャの概要は以下の通りです。
Google Cloud の Google Kubernetes Engine(以下、GKE) 上にそれぞれプロダクトやアプリケーションをデプロイして Cloud Trace へ連携することで計装していきたいと思います。
GKE は Google Cloud のマネージドな Kubernetes サービスで Cloud Trace も同じく Google Cloud のサービスで OpenTelemetry の出力先としてサポートされています。
プロダクトの役割は以下の通りです。

  • Keycloak: ユーザーに対する認証
  • Open Policy Agent (以下、OPA): ユーザーの認可
  • Envoy: クライアントからのリクエストのプロキシ及び、認証/認可。認可は OPA へ委譲する。
  • app1, app2: サンプルアプリケーション

archi

トレースを計装するための構成は以下の通りです。
今回は OpenTelemetry Collector にて各プロダクトやアプリケーションにて取得するトレースを集約して Cloud Trace へ連携する構成としています。

OpenTelemetry Collector を利用することでトレースを取得するアプリケーションやプロダクトから Cloud Trace などの出力先に依存することなく実装することが可能です。
また、今回は言及しませんがトレースだけでなくメトリクスやログも集約して各種出力先へ連携することが可能です。

otelcollector

目指すところ

クライアントからのリクエストを Envoy で認証/認可をしてサンプルアプリケーションにプロキシされるまでの処理を全てトレースできるようにすることをゴールにしたいと思います。
全ての処理をトレースすると以下のようになります。

Envoy の中で行っている認証/認可の処理に加えて連携先である Keycloak や OPA での処理も計装できることが分かります。

after

ちなみにアプリケーションのみ計装した状態は以下の通りです。
もちろんこの状態でもダメではないのですが、Envoy、Keycloak、OPAでの処理内容がブラックボックスになってしまっています。

before

一気通貫にトレースをモニタリングできるようにすることで例えば認証/認可の処理でエラーが発生してアプリケーションまでリクエストが到達しないケースにおいてもトレースを取得することができるようになります。
例えば以下では、OPAでの認可状況(正常、異常)は確か debug モードにしないとログ等に出力してくれないので分かりづらいのですが、トレースをモニタリングすることで追跡することが可能です。

opa

やってみよう

以降では各所でのトレース取得方法について紹介したいと思います。
記事では重要なポイントだけ掲載させていただきます。検証に使用したリポジトリは以下になりますので参考にしてください。

https://github.com/cloud-ace/zenn-envoy-opa-keycloak-trace

アプリケーション

今回のアプリケーションは Golang で http によるリクエストに対して自動でトレース情報を生成してくれる otelhttp を使っています。
ソースコードではトレース以外にもメトリクスを収集、公開するための実装も含まれているのでご了承ください。また、ログにおいても Google Cloud の Cloud logging へ構造化ロギングとして連携して Cloud Trace のトレース情報と紐づけられるように実装しています。


func main() {
	loadEnvFile()

	ctx := context.Background()
	port := os.Getenv("APP_PORT")
	appName := os.Getenv("APP_NAME")
	oltpAddr := os.Getenv("OTEL_AGENT_ENDPOINT")

	log.SetFlags(0)

	shutdown := initTraceProvider(ctx, oltpAddr, appName)
	defer shutdown()

	meter := otel.Meter("server-meter")
	commonLabels = []attribute.KeyValue{
		attribute.String("project_id", os.Getenv("PROJECT_ID")),
	}

	reveiveCount, _ = meter.Int64Counter(
		"api_server/receive_counts",
		metric.WithDescription("The number of receive processed"),
	)

	requestLatency, _ = meter.Float64Histogram(
		"api_server/request_latency",
		metric.WithDescription("The latency of requests processed"),
	)

	requestCount, _ = meter.Int64Counter(
		"api_server/request_counts",
		metric.WithDescription("The number of requests processed"),
	)

	h := newHandler()
	mux := http.NewServeMux()
	mux.Handle("/api/v1/sleep", http.HandlerFunc(h.sleep))
	mux.Handle("/api/v1/chain", http.HandlerFunc(h.sleepAndCall))
	mux.Handle("/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		w.Write([]byte(`{"status": "OK"}`))
	}))

	log.Println(Entry{
		Severity:  "INFO",
		Message:   "Starting Http Server...",
		Component: appName,
	})
	log.Fatal(http.ListenAndServe(fmt.Sprintf(":%s", port), otelhttp.NewHandler(mux, "server",
		otelhttp.WithMessageEvents(otelhttp.ReadEvents, otelhttp.WriteEvents),
	)))
}

//リクエスト/api/v1/sleep に対する処理内容
func (h *handler) sleep(w http.ResponseWriter, r *http.Request) {
	ctx := r.Context()
	_, span := tracer.Start(ctx, "sleep")
	defer span.End()

	l := []attribute.KeyValue{
		attribute.String("trace_id", span.SpanContext().TraceID().String()),
		attribute.String("span_id", span.SpanContext().SpanID().String()),
	}
	labels := append(commonLabels, l...)
	reveiveCount.Add(ctx, 1, metric.WithAttributes(labels...))

	log.Println(Entry{
		Severity:  "INFO",
		Message:   "Handling request",
		Component: os.Getenv("APP_NAME"),
		Trace:     makeTraceIdFmt(span.SpanContext().TraceID().String()),
		SpanId:    span.SpanContext().SpanID().String(),
	})

	time.Sleep(2 * time.Second)

	w.Header().Set("Content-Type", "application/json")
	w.WriteHeader(http.StatusOK)
	w.Write([]byte(fmt.Sprintf(`{"message": "Good Morning. 'I'm wake up.", "version": %s}`, os.Getenv("APP_VERSION"))))
}

//リクエスト/api/v1/chain に対する処理内容
func (h *handler) sleepAndCall(w http.ResponseWriter, r *http.Request) {
	ctx := r.Context()
	child, span := tracer.Start(ctx, "sleepAndCall")
	defer span.End()

	l := []attribute.KeyValue{
		attribute.String("trace_id", span.SpanContext().TraceID().String()),
		attribute.String("span_id", span.SpanContext().SpanID().String()),
	}
	labels := append(commonLabels, l...)
	reveiveCount.Add(child, 1, metric.WithAttributes(labels...))

	log.Println(Entry{
		Severity:  "INFO",
		Message:   "Handle request. This function will call other service",
		Component: os.Getenv("APP_NAME"),
		Trace:     makeTraceIdFmt(span.SpanContext().TraceID().String()),
		SpanId:    span.SpanContext().SpanID().String(),
	})

	time.Sleep(2 * time.Second)

	hreq, err := http.NewRequestWithContext(child, "GET", os.Getenv("ENDPOINT"), nil)
	if err != nil {
		log.Println(Entry{
			Severity:  "ERROR",
			Message:   fmt.Sprintf("Failed create http request: %v", err),
			Component: os.Getenv("APP_NAME"),
			Trace:     makeTraceIdFmt(span.SpanContext().TraceID().String()),
			SpanId:    span.SpanContext().SpanID().String(),
		})
		w.WriteHeader(http.StatusInternalServerError)
		return
	}

	startTime := time.Now()

	resp, err := h.cli.Do(hreq)
	if err != nil {
		log.Println(Entry{
			Severity:  "ERROR",
			Message:   fmt.Sprintf("Failed call request: %v", err),
			Component: os.Getenv("APP_NAME"),
			Trace:     makeTraceIdFmt(span.SpanContext().TraceID().String()),
			SpanId:    span.SpanContext().SpanID().String(),
		})
		w.WriteHeader(http.StatusInternalServerError)
		return
	}
	latencyMs := float64(time.Since(startTime))
	resp.Body.Close()

	requestLatency.Record(child, latencyMs, metric.WithAttributes(labels...))
	requestCount.Add(child, 1, metric.WithAttributes(labels...))

	w.Header().Set("Content-Type", "application/json")
	w.WriteHeader(http.StatusOK)
	w.Write([]byte(`{"message": "Good Morning. 'I'm wake up and call other service."}`))

}

// OtelTelemetry Collectorへ連携するためのProvider初期化
func initTraceProvider(ctx context.Context, otelAgentAddr string, serviceName string) func() {

	res, err := resource.New(ctx,
		resource.WithFromEnv(),
		resource.WithProcess(),
		resource.WithTelemetrySDK(),
		resource.WithHost(),
		resource.WithAttributes(
			semconv.ServiceNameKey.String(serviceName),
		),
	)
	if err != nil {
		return nil
	}

	metricExp, err := otlpmetricgrpc.New(
		ctx,
		otlpmetricgrpc.WithInsecure(),
		otlpmetricgrpc.WithEndpoint(otelAgentAddr))
	if err != nil {
		return nil
	}

	meterProvider := sdkmetric.NewMeterProvider(
		sdkmetric.WithResource(res),
		sdkmetric.WithReader(
			sdkmetric.NewPeriodicReader(
				metricExp,
				sdkmetric.WithInterval(2*time.Second),
			),
		),
	)
	otel.SetMeterProvider(meterProvider)

	traceClient := otlptracegrpc.NewClient(
		otlptracegrpc.WithInsecure(),
		otlptracegrpc.WithEndpoint(otelAgentAddr),
		otlptracegrpc.WithDialOption(grpc.WithBlock()))
	exporter, err := otlptrace.New(ctx, traceClient)
	if err != nil {
		return nil
	}

	bsp := sdktrace.NewBatchSpanProcessor(exporter)
	tracerProvider := sdktrace.NewTracerProvider(
		sdktrace.WithSampler(sdktrace.AlwaysSample()),
		sdktrace.WithResource(res),
		sdktrace.WithSpanProcessor(bsp),
	)
	otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
	otel.SetTracerProvider(tracerProvider)

	return func() {
		cxt, cancel := context.WithTimeout(ctx, time.Second)
		defer cancel()
		if err := exporter.Shutdown(cxt); err != nil {
			otel.Handle(err)
		}
		if err := meterProvider.Shutdown(cxt); err != nil {
			otel.Handle(err)
		}
	}
}

Envoy

デフォルトでは有効化されていないので、filter_chains に OpenTelemetryConfig を追加して clusters にトレースの出力先を設定して有効化します。
公式 doc よりサンプルが提供されているので参考にしました。

https://www.envoyproxy.io/docs/envoy/latest/start/sandboxes/opentelemetry

envoy.yaml

    static_resources:
      listeners:
        - name: listener_0
          address:
            socket_address:
              address: 0.0.0.0
              port_value: 51051
          filter_chains:
            - filters:
                - name: envoy.filters.network.http_connection_manager
                  typed_config:
                    "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
                    stat_prefix: ingress_http
                    access_log:
                      - name: envoy.access_loggers.stdout
                        typed_config:
                          "@type": type.googleapis.com/envoy.extensions.access_loggers.stream.v3.StdoutAccessLog
                    generate_request_id: true
                    # OpenTelemetryConfigの設定
                    tracing:
                      provider:
                        name: envoy.tracers.opentelemetry
                        typed_config:
                          "@type": type.googleapis.com/envoy.config.trace.v3.OpenTelemetryConfig
                          grpc_service:
                            envoy_grpc:
                              cluster_name: opentelemetry_collector
                            timeout: 5.0s
                          service_name: front-envoy
                    # ここまで
                    route_config:
                      name: local_route
                      virtual_hosts:
                        - name: local_service
                          domains: ["*"]
                          routes:
                            - match:
                                prefix: "/"
                              route:
                                cluster: service_envoyproxy_io

# 一部記載を割愛

      clusters:
        # トレース出力先の設定
        - name: opentelemetry_collector
          type: STRICT_DNS
          lb_policy: ROUND_ROBIN
          typed_extension_protocol_options:
            envoy.extensions.upstreams.http.v3.HttpProtocolOptions:
              "@type": type.googleapis.com/envoy.extensions.upstreams.http.v3.HttpProtocolOptions
              explicit_http_config:
                http2_protocol_options: {}
          load_assignment:
            cluster_name: opentelemetry_collector
            endpoints:
              - lb_endpoints:
                  - endpoint:
                      address:
                        socket_address:
                          address: otel-collector
                          port_value: 4317

OPA

OPA もデフォルトではトレースが有効化されていないため、以下のように OPA 起動時に必要となるファイルへトレースの出力先を追加します。
こちらも詳細は公式 doc を参考にしてください。

https://www.openpolicyagent.org/docs/latest/monitoring/
https://www.openpolicyagent.org/docs/latest/configuration/#distributed-tracing

opa.yaml

    plugins:
      envoy_ext_authz_grpc:
        addr: ":9191"
        enable-reflection: true
    decision_logs:
      console: true
    # 以下を追加
    distributed_tracing:
      type: grpc
      address: "otel-collector:4317"

Keycloak

Keycloak においてもデフォルトではトレースが有効化されておらず、opentelemetry-javaagent.jar というライブラリをインストールした上で Java の起動オブションにて javaagent として指定してあげる必要があります。
Keycloak には複数のディストリビーションが存在していますが、今回は慣れの問題で Widfly 版にて検証を行っています。この場合、自前でライブラリをインストールしてあげる必要がありそうだったので、コンテナイメージのビルド時にインストールするようにしています。
(ちゃんと調べていないですが、Quarkus 版のが新しいので本来はそちらを使った方がよさそうです。)

Dockerfile-keycloak

FROM jboss/keycloak:latest
ADD --chown=jboss:root https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jar /tmp/opentelemetry-javaagent.jar

上記にてビルドしたイメージを使って Pod をデプロイする際に環境変数にて必要なパラメータを指定するようにします。
今回は検証につき Deployment としてデプロイしていますが、本来 Keycloak はステートフルなワークロードなので StatefulSet でのデプロイや永続化先としてデータベースを用いるのが一般的かと思います。
(また、下記のような設定で起動できるところは確認できていますが、KEYCLOAK_OTEL_SAMPLING_PERCENTAGEの妥当性は検証できていないのでご注意ください。)

apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    app: keycloak
  name: keycloak
spec:
  replicas: 1
  selector:
    matchLabels:
      app: keycloak
  strategy: {}
  template:
    metadata:
      labels:
        app: keycloak
    spec:
      containers:
      - image: keycloak:v0.0.1
        name: keycloak
        resources: {}
        env:
          - name: KEYCLOAK_USER
            value: admin
          - name: KEYCLOAK_PASSWORD
            value: admin
          - name: KEYCLOAK_OTEL_SAMPLING_PERCENTAGE 
            value: "1.0"
          - name: OTEL_SERVICE_NAME 
            value: keycloak
          - name: OTEL_EXPORTER_OTLP_ENDPOINT
            value: http://otel-collector:4317
          - name: JAVA_OPTS_APPEND
            value: "-javaagent:/tmp/opentelemetry-javaagent.jar"

OpenTelemetry Collector

トレースの集約先となる OpenTelemetry Collector では以下のような設定を行い Cloud Trace へトレースを連携できるようにしています。
おまけではありますが、 proccessors では resourcedetection を使用して Google Cloud の情報(ゾーン、プラットフォーム情報等)を付与するようにしています。

resourcedetection については以下を参考してください。

https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/processor/resourcedetectionprocessor

    receivers:
    # Trace情報の受け口
      otlp:
        protocols:
          grpc: 

    processors:
      batch: {}
    # Google Cloud 情報の付与
      resourcedetection:
        detectors: [gcp]
        override: false

    exporters:
      debug:
    # Cloud Traceへ連携するための設定
      googlecloud: 
        log:
          default_log_name: opentelemetry.io/collector-exported-log
                  
    service:
      pipelines:
        traces:
          receivers: [otlp]
          processors: [batch,resourcedetection]
          exporters: [debug,googlecloud]

長々と記載しましたが、これらの設定を行うことで最初に紹介したような一気通貫でのトレース情報の取得が可能になり、モニタリングが可能になります。
お気づきかとは思いますが、プロダクトが OpenTelemetry に対応していないと情報を取得することはできないので開発業務では各自対応有無を確認することになるかと思います。

まとめ

今回は認証/認可を自前のアプリケーション以外で実現する構成を例に一気通貫でトレース情報を取得する方法を紹介させていただきました。
アプリケーション以外のプロダクトについても可能な限り計装して解析性の高いサービスを開発していきましょう。
今回時間の関係で紹介できなかったメトリクスやログの収集についても別途検証を進めて改めて情報発信したいと思います。
記事を閲覧いただきありがとうございました。

Discussion