🔭

OpenTelemetry Transformation Language (OTTL) をテストする

2024/12/15に公開

この記事は OpenTelemetry Advent Calender 2024 の12日目の記事です。

https://qiita.com/advent-calendar/2024/opentelemetry

はじめに

今までは Datadog SDK および Datadog Agent を主に使っていたのですが、転職してからは OpenTelemetry (以下 otel)とずっと戯れています。
その中で OpenTelemetry Collector(以下otel collector) の processor での処理を記述できる OpenTelemetry Transformation Language (以下OTTL) を書く機会がありましたが、記述した OTTL が構文的に正しいか、意図した変換がされているかをテストしたい気持ちになったため、transform processor の内部実装でのテストを参考に実装してみました。

OTTL とは

OTTL(OpenTelemetry Transformation Language) は、otel シグナルのデータを加工するための宣言型言語であり、SQLに似た構文で記述します。例えば以下のような変換が可能です:

  • 特定の条件で値を書き換える
  • 新しい属性を追加する

https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/rfcs/processing.md

https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/ottl/README.md

transform:
  trace_statements:
    - context: span
      statements:
        - set(status.code, 1) where attributes["http.path"] == "/health"
        - set(name, attributes["http.route"])
        - replace_match(attributes["http.target"], "/user/*/list/*", "/user/{userId}/list/{listId}")
        - limit(attributes, 100, [])
        - truncate_all(attributes, 4096)

これは Transform Processor における OTTL の記述例です。

https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/transformprocessor

テストで確かめたい

このように OTTL を用いて変換処理を記述できるのですが、yaml に直接記述していくこともあり「自分の書いた OTTL は本当に希望通りの変換ができているのか…?そもそも構文これで合ってるのか…?」という気持ちになってきました。

テストの手法としてはいくつか存在しそうで、telemetrygen などを使用して実際に立ち上げた Otel Collector に流して確認するというのがありそうです。

https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/cmd/telemetrygen
https://zenn.dev/taxin/articles/otel-tail-sampling

しかしプロジェクトの実装が Go であること、また Otel Collector も Go で実装されていることから、内部実装を参考にして Go のユニットテストの形としてかけると嬉しいです。

アプリケーション側の Go の Trace などをテストする際は、SDK 内部に tracetest などのテストに使用できるものが存在しています。これは SDK 内部での実装のテストでも使用されています。

https://pkg.go.dev/go.opentelemetry.io/otel/sdk/trace/tracetest

同様に Otel Collector や processor の内部実装を見ると、OTTL に関するテストに使えそうなものを見つけられるかもしれないため、OTTL の実装部分を見てみます。

otel collector / ottl / transform processor の内部実装を参考にする

OTTL の実装は opentelemetry-collector-contrib リポジトリの pkg/ottl 配下にあります。

https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/ottl

これを実際に使用している processor の一つとして、transformprocessor があります。

https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/transformprocessor

今回は span に対しての transformprocessor のテストを書きたいのですが、実はすでにテストがtransformprocessor の pkg 内に存在します。

https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/30490e434c72944084a0a898db6abf6f0a83b4d9/processor/transformprocessor/internal/traces/processor_test.go#L1-L749

// resource のテスト
func Test_ProcessTraces_ResourceContext(t *testing.T) {
	tests := []struct {
		statement string
		want      func(td ptrace.Traces)
	}{
		{
			statement: `set(attributes["test"], "pass")`,
			want: func(td ptrace.Traces) {
				td.ResourceSpans().At(0).Resource().Attributes().PutStr("test", "pass")
			},
		},
		{
			statement: `set(attributes["test"], "pass") where attributes["host.name"] == "wrong"`,
			want: func(_ ptrace.Traces) {
			},
		},
		{
			statement: `set(schema_url, "test_schema_url")`,
			want: func(td ptrace.Traces) {
				td.ResourceSpans().At(0).SetSchemaUrl("test_schema_url")
			},
		},
	}

	for _, tt := range tests {
		t.Run(tt.statement, func(t *testing.T) {
			td := constructTraces()
			processor, err := NewProcessor([]common.ContextStatements{{Context: "resource", Statements: []string{tt.statement}}}, ottl.IgnoreError, componenttest.NewNopTelemetrySettings())
			assert.NoError(t, err)

			_, err = processor.ProcessTraces(context.Background(), td)
			assert.NoError(t, err)

			exTd := constructTraces()
			tt.want(exTd)

			assert.Equal(t, exTd, td)
		})
	}
}

一見するとこれをそのまま使えば良さそうに見えますが、このテスト自体も使用しているメソッドも internal 配下に多く配置されていることもあり、主要なメソッドを外部から使用することができません。(go:linkname使うヤンチャな方法はありますが…
というわけで、この内部実装を見ながらなんとかなんとかテストで使用きないか見ていきます。
今回は span に対して処理を実行したかったため、span に対するテストを見てみます
https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/30490e434c72944084a0a898db6abf6f0a83b4d9/processor/transformprocessor/internal/traces/processor_test.go#L114-L412

	for _, tt := range tests {
		t.Run(tt.statement, func(t *testing.T) {
			td := constructTraces()
			processor, err := NewProcessor([]common.ContextStatements{{Context: "span", Statements: []string{tt.statement}}}, ottl.IgnoreError, componenttest.NewNopTelemetrySettings())
			assert.NoError(t, err)

			_, err = processor.ProcessTraces(context.Background(), td)
			assert.NoError(t, err)

			exTd := constructTraces()
			tt.want(exTd)

			assert.Equal(t, exTd, td)
		})
	}

テストケースはたくさんありますが、主要な部分はここです。

processor, err := NewProcessor([]common.ContextStatements{{Context: "span", Statements: []string{tt.statement}}}, ottl.IgnoreError, componenttest.NewNopTelemetrySettings())
...
_, err = processor.ProcessTraces(context.Background(), td)

ここで、context = span で processor を作成しています。
この context は、otel collector の config.yaml で指定しているものが対応しています。
流れとしては以下のようです。

  • NewProcessor で signal のタイプ情報も含めた processor を作成
  • processor.ProcessTraces で実際に trace data を入れて処理を実行

まずは NewProcessor の実装をみていきます。

func NewProcessor(contextStatements []common.ContextStatements, errorMode ottl.ErrorMode, settings component.TelemetrySettings) (*Processor, error) {
	pc, err := common.NewTraceParserCollection(settings, common.WithSpanParser(SpanFunctions()), common.WithSpanEventParser(SpanEventFunctions()), common.WithTraceErrorMode(errorMode))
	if err != nil {
		return nil, err
	}

	contexts := make([]consumer.Traces, len(contextStatements))
	var errors error
	for i, cs := range contextStatements {
		context, err := pc.ParseContextStatements(cs)
		if err != nil {
			errors = multierr.Append(errors, err)
		}
		contexts[i] = context
	}

	if errors != nil {
		return nil, errors
	}

	return &Processor{
		contexts: contexts,
		logger:   settings.Logger,
	}, nil
}

https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/30490e434c72944084a0a898db6abf6f0a83b4d9/processor/transformprocessor/internal/traces/processor.go#L24-L48

NewProcessor の引数では common.ContextStatements のスライスを渡しています。テストでは以下を渡しています。

[]common.ContextStatements{{Context: "span", Statements: []string{tt.statement}}}

Statement には、OTTL で記述した statement が string のスライスで入ってきています。これを元に、Processor struct の contexts に入れていきます。

type Processor struct {
	contexts []consumer.Traces
	logger   *zap.Logger
}
...

func NewProcessor(...) (...) {
    ...
    for i, cs := range contextStatements {
        context, err := pc.ParseContextStatements(cs)
        if err != nil {
            errors = multierr.Append(errors, err)
        }
        contexts[i] = context
    }
    ...
    return &Processor{
        contexts: contexts,
        logger:   settings.Logger,
    }, nil
}
...
func (p *Processor) ProcessTraces(ctx context.Context, td ptrace.Traces) (ptrace.Traces, error) {
	for _, c := range p.contexts {
		err := c.ConsumeTraces(ctx, td)
		if err != nil {
			p.logger.Error("failed processing traces", zap.Error(err))
			return td, err
		}
	}
	return td, nil
}

この context は ProcessorTraces の内部で呼び出されて、ConsumeTraces を実行しています。この ConsumeTraces は otel collector の Traces interface で定義されいているもので、ptrace.Traces を処理するためのものです。というわけで、この context の生成部分を見ると良さそうな雰囲気です。

https://github.com/open-telemetry/opentelemetry-collector/blob/5da51fae5654f836690510052668a3f515c4bb69/consumer/traces.go#L13-L19

for i, cs := range contextStatements {
    context, err := pc.ParseContextStatements(cs)
    if err != nil {
        errors = multierr.Append(errors, err)
    }
    contexts[i] = context
}

[]common.ContextStatements から context を作っている ParseContextStatements を見ていきます。

func (pc TraceParserCollection) ParseContextStatements(contextStatements ContextStatements) (consumer.Traces, error) {
	switch contextStatements.Context {
	case Span:
		parsedStatements, err := pc.spanParser.ParseStatements(contextStatements.Statements)
		if err != nil {
			return nil, err
		}
		globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForSpan, contextStatements.Conditions, pc.parserCollection, filterottl.StandardSpanFuncs())
		if errGlobalBoolExpr != nil {
			return nil, errGlobalBoolExpr
		}
		sStatements := ottlspan.NewStatementSequence(parsedStatements, pc.settings, ottlspan.WithStatementSequenceErrorMode(pc.errorMode))
		return traceStatements{sStatements, globalExpr}, nil
	case SpanEvent:
		parsedStatements, err := pc.spanEventParser.ParseStatements(contextStatements.Statements)
		if err != nil {
			return nil, err
		}
		globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForSpanEvent, contextStatements.Conditions, pc.parserCollection, filterottl.StandardSpanEventFuncs())
		if errGlobalBoolExpr != nil {
			return nil, errGlobalBoolExpr
		}
		seStatements := ottlspanevent.NewStatementSequence(parsedStatements, pc.settings, ottlspanevent.WithStatementSequenceErrorMode(pc.errorMode))
		return spanEventStatements{seStatements, globalExpr}, nil
	default:
		return pc.parseCommonContextStatements(contextStatements)
	}
}

https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/30490e434c72944084a0a898db6abf6f0a83b4d9/processor/transformprocessor/internal/common/traces.go#L164-L192

switch 文を使用し、context のタイプによって statement を parse しています。今回は span なので span の実装を詳しく見ます。

// processor/transformprocessor/internal/common/traces.go
type TraceParserCollection struct {
	parserCollection
	spanParser      ottl.Parser[ottlspan.TransformContext]
	spanEventParser ottl.Parser[ottlspanevent.TransformContext]
}

// processor/transformprocessor/internal/common/traces.go
parsedStatements, err := pc.spanParser.ParseStatements(contextStatements.Statements)

TraceParserCollection(pc) の spanParser に対して ParseStatements を実行しています。この TraceParserCollection は先ほどの NewProcessor で最初に作成していました。

func NewProcessor(contextStatements []common.ContextStatements, errorMode ottl.ErrorMode, settings component.TelemetrySettings) (*Processor, error) {
	pc, err := common.NewTraceParserCollection(settings, common.WithSpanParser(SpanFunctions()), common.WithSpanEventParser(SpanEventFunctions()), common.WithTraceErrorMode(errorMode))
	if err != nil {
		return nil, err
	}
    ...

この内部実装を見ます。

func NewTraceParserCollection(settings component.TelemetrySettings, options ...TraceParserCollectionOption) (*TraceParserCollection, error) {
    ...
	tpc := &TraceParserCollection{
		parserCollection: parserCollection{
			settings:       settings,
			resourceParser: rp,
			scopeParser:    sp,
		},
	}

	for _, op := range options {
		err := op(tpc)
		if err != nil {
			return nil, err
		}
	}

	return tpc, nil
}

func WithSpanParser(functions map[string]ottl.Factory[ottlspan.TransformContext]) TraceParserCollectionOption {
	return func(tp *TraceParserCollection) error {
		spanParser, err := ottlspan.NewParser(functions, tp.settings)
		if err != nil {
			return err
		}
		tp.spanParser = spanParser
		return nil
	}
}

ついにきました。欲しかったのは ottlspan.NewParser です。これは pkg/ottl の internal ではない pkg にあるので使用できます。というわけで、test で使用する際にこれは使えそうです。
https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/30490e434c72944084a0a898db6abf6f0a83b4d9/pkg/ottl/contexts/ottlspan/span.go#L81-L96

ここで再度 TraceParserCollection 内部の ConsumeTraces を確認します。

var _ consumer.Traces = &traceStatements{}

type traceStatements struct {
	ottl.StatementSequence[ottlspan.TransformContext]
	expr.BoolExpr[ottlspan.TransformContext]
}

func (t traceStatements) Capabilities() consumer.Capabilities {
	return consumer.Capabilities{
		MutatesData: true,
	}
}

func (t traceStatements) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
	for i := 0; i < td.ResourceSpans().Len(); i++ {
		rspans := td.ResourceSpans().At(i)
		for j := 0; j < rspans.ScopeSpans().Len(); j++ {
			sspans := rspans.ScopeSpans().At(j)
			spans := sspans.Spans()
			for k := 0; k < spans.Len(); k++ {
				tCtx := ottlspan.NewTransformContext(spans.At(k), sspans.Scope(), rspans.Resource(), sspans, rspans)
				condition, err := t.BoolExpr.Eval(ctx, tCtx)
				if err != nil {
					return err
				}
				if condition {
					err := t.Execute(ctx, tCtx)
					if err != nil {
						return err
					}
				}
			}
		}
	}
	return nil
}

https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/30490e434c72944084a0a898db6abf6f0a83b4d9/processor/transformprocessor/internal/common/traces.go#L22-L57

色々ありますが、t.Execute が重要な部分です。

https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/30490e434c72944084a0a898db6abf6f0a83b4d9/pkg/ottl/parser.go#L336-L356

https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/30490e434c72944084a0a898db6abf6f0a83b4d9/pkg/ottl/parser.go#L27-L49

最終的には type Statement[K any] の Execute を呼び出しています。これは Parser[K] に生えている ParseStatements が返している構造体です。

// ParseStatements parses string statements into ottl.Statement objects ready for execution.
// Returns a slice of statements and a nil error on successful parsing.
// If parsing fails, returns nil and a joined error containing each error per failed statement.
func (p *Parser[K]) ParseStatements(statements []string) ([]*Statement[K], error) {
	parsedStatements := make([]*Statement[K], 0, len(statements))
	var parseErrs []error

	for _, statement := range statements {
		ps, err := p.ParseStatement(statement)
		if err != nil {
			parseErrs = append(parseErrs, fmt.Errorf("unable to parse OTTL statement %q: %w", statement, err))
			continue
		}
		parsedStatements = append(parsedStatements, ps)
	}

	if len(parseErrs) > 0 {
		return nil, errors.Join(parseErrs...)
	}

	return parsedStatements, nil
}

https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/30490e434c72944084a0a898db6abf6f0a83b4d9/pkg/ottl/parser.go#L119-L140

というわけでここまでで道具が揃いました。

ユニットテストを書く

今回は transformprocessor のテストケースでもあった「span name = operationA に合致する span の test という key に pass という値を追加する」という processor のテストを書いてみます。

...
{
    statement: `set(attributes["test"], "pass") where name == "operationA"`,
    want: func(td ptrace.Traces) {
        td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().PutStr("test", "pass")
    },
},
...

https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/30490e434c72944084a0a898db6abf6f0a83b4d9/processor/transformprocessor/internal/traces/processor_test.go#L119-L124

ここまでの流れをまとめると以下になります。

  1. context type と statement を parser に入れて、OTTL Statement を取得
  2. OTTL Statement に対して、trace data を入れて Execute を実行

というわけでテストを書いていきます。

先に全体実装
package transform_test

import (
	"context"
	"testing"

	"github.com/google/go-cmp/cmp"
	"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlspan"
	"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/ottlfuncs"
	"go.opentelemetry.io/collector/component/componenttest"
	"go.opentelemetry.io/collector/confmap/confmaptest"
	"go.opentelemetry.io/collector/pdata/pcommon"
	"go.opentelemetry.io/collector/pdata/ptrace"
)

func assertType[T any](t *testing.T, v interface{}) T {
	t.Helper()

	result, ok := v.(T)
	if !ok {
		t.Fatalf("failed to assert as %T", *new(T))
	}
	return result
}

func loadStatements(t *testing.T) []string {
	t.Helper()

	conf, err := confmaptest.LoadConf("config.yaml")
	if err != nil {
		t.Fatalf("failed to load config file: %v", err)
	}

	processors := assertType[map[string]interface{}](t, conf.ToStringMap()["processors"])
	transform := assertType[map[string]interface{}](t, processors["transform"])
	traceStatements := assertType[[]interface{}](t, transform["trace_statements"])

	var statements []string
	for _, stmt := range traceStatements {
		stmtMap := assertType[map[string]interface{}](t, stmt)
		if stmtMap["context"] == "span" {
			stmts := assertType[[]interface{}](t, stmtMap["statements"])
			for _, s := range stmts {
				statements = append(statements, assertType[string](t, s))
			}
		}
	}

	return statements
}

func TestTransformStatements(t *testing.T) {
	t.Parallel()

	statements := loadStatements(t)
	if len(statements) == 0 {
		t.Fatal("failed to load statements from config file")
	}

	tests := []struct {
		name  string
		input func() ptrace.Span
		want  map[string]string
	}{
		{
			name: "set attribute",
			input: func() ptrace.Span {
				span := ptrace.NewSpan()
				span.SetName("operationA")
				return span
			},
			want: map[string]string{
				"test": "pass",
			},
		},
	}

	for _, tt := range tests {
		tt := tt
		t.Run(tt.name, func(t *testing.T) {
			t.Parallel()

			settings := componenttest.NewNopTelemetrySettings()
			parser, err := ottlspan.NewParser(ottlfuncs.StandardFuncs[ottlspan.TransformContext](), settings)
			if err != nil {
				t.Fatalf("failed to create parser: %v", err)
			}

			var (
				span = tt.input()
				tctx = ottlspan.NewTransformContext(
					span,
					pcommon.NewInstrumentationScope(),
					pcommon.NewResource(),
					ptrace.NewScopeSpans(),
					ptrace.NewResourceSpans(),
				)
			)

			for _, stmt := range statements {
				statement, err := parser.ParseStatement(stmt)
				if err != nil {
					t.Fatalf("failed to parse statement: %v", err)
				}

				_, _, err = statement.Execute(context.Background(), tctx)
				if err != nil {
					t.Fatalf("failed to execute transformation: %v", err)
				}
			}

			got := span.Attributes().AsRaw()

			if diff := cmp.Diff(tt.want, got, cmpopts.EquateEmpty()); diff != "" {
				t.Errorf("attributes mismatch (-want +got):\n%s", diff)
			}
		})
	}
}

otel collector の config.yaml

transformprocessor に対象の ottl を記述します。

...
processors:
  transform:
    trace_statements:
      - context: span
        statements:
          - set(attributes["test"], "pass") where name == "operationA"
...

config.yaml を読み込む

実際に otel collector に記述している config.yaml の内容をテストしたいので、実際の config.yaml を読み込んでいきます。

import (
	"testing"

	"go.opentelemetry.io/collector/confmap/confmaptest"
)

func assertType[T any](t *testing.T, v interface{}) T {
	t.Helper()
	result, ok := v.(T)
	if !ok {
		t.Fatalf("failed to assert as %T", *new(T))
	}
	return result
}

func loadStatements(t *testing.T) []string {
	t.Helper()

	conf, err := confmaptest.LoadConf("config.yaml")
	if err != nil {
		t.Fatalf("failed to load config file: %v", err)
	}

	processors := assertType[map[string]interface{}](t, conf.ToStringMap()["processors"])
	transform := assertType[map[string]interface{}](t, processors["transform"])
	traceStatements := assertType[[]interface{}](t, transform["trace_statements"])

	var statements []string
	for _, stmt := range traceStatements {
		stmtMap := assertType[map[string]interface{}](t, stmt)
		if stmtMap["context"] == "span" {
			stmts := assertType[[]interface{}](t, stmtMap["statements"])
			for _, s := range stmts {
				statements = append(statements, assertType[string](t, s))
			}
		}
	}

	return statements
}

ここでは otel collector の内部にある confmaptest.LoadConf を使用しています。
https://github.com/open-telemetry/opentelemetry-collector/blob/5da51fae5654f836690510052668a3f515c4bb69/confmap/confmaptest/configtest.go#L17-L31

これを使用して、statement を []string として取り出します。

Go test

func TestTransformStatements(t *testing.T) {
	t.Parallel()

	statements := loadStatements(t)
	if len(statements) == 0 {
		t.Fatal("failed to load statements from config file")
	}

	tests := []struct {
		name  string
		input func() ptrace.Span
		want  map[string]any
	}{
		{
			name: "set attribute",
			input: func() ptrace.Span {
				span := ptrace.NewSpan()
				span.SetName("operationA")
				return span
			},
			want: map[string]any{
				"test": "pass",
			},
		},
		{
			name: "not set attribute with expression",
			input: func() ptrace.Span {
				span := ptrace.NewSpan()
				span.SetName("operationB")
				return span
			},
			want: map[string]any{},
		},
	}

	for _, tt := range tests {
		tt := tt
		t.Run(tt.name, func(t *testing.T) {
			t.Parallel()

			settings := componenttest.NewNopTelemetrySettings()
			parser, err := ottlspan.NewParser(ottlfuncs.StandardFuncs[ottlspan.TransformContext](), settings)
			if err != nil {
				t.Fatalf("failed to create parser: %v", err)
			}

			var (
				span = tt.input()
				tctx = ottlspan.NewTransformContext(
					span,
					pcommon.NewInstrumentationScope(),
					pcommon.NewResource(),
					ptrace.NewScopeSpans(),
					ptrace.NewResourceSpans(),
				)
			)

			for _, stmt := range statements {
				statement, err := parser.ParseStatement(stmt)
				if err != nil {
					t.Fatalf("failed to parse statement: %v", err)
				}

				_, _, err = statement.Execute(context.Background(), tctx)
				if err != nil {
					t.Fatalf("failed to execute transformation: %v", err)
				}
			}

			got := span.Attributes().AsRaw()

			if diff := cmp.Diff(tt.want, got); diff != "" {
				t.Errorf("attributes mismatch (-want +got):\n%s", diff)
			}
		})
	}
}

先ほどの helper を使用して statement を取り出しておきます。

settings := componenttest.NewNopTelemetrySettings()
parser, err := ottlspan.NewParser(ottlfuncs.StandardFuncs[ottlspan.TransformContext](), settings)
if err != nil {
    t.Fatalf("failed to create parser: %v", err)
}

ottlspan.NewParser を使用して、ottl parser を作成します。今回は trace provider などを使用せずに直接 span を流し込むので、NewNopTelemetrySettings を使用します(transformprocessor の test でも同様のものが使用されている)。NewParser の引数には ottlspan.TransformContext を指定して span に対する parser を作ります。

var (
    span = tt.input()
    tctx = ottlspan.NewTransformContext(
        span,
        pcommon.NewInstrumentationScope(),
        pcommon.NewResource(),
        ptrace.NewScopeSpans(),
        ptrace.NewResourceSpans(),
    )
)

テスト対象の span から context を作成しておきます。

for _, stmt := range statements {
    statement, err := parser.ParseStatement(stmt)
    if err != nil {
        t.Fatalf("failed to parse statement: %v", err)
    }

    _, _, err = statement.Execute(context.Background(), tctx)
    if err != nil {
        t.Fatalf("failed to execute transformation: %v", err)
    }
}

config.yaml に記載されている statement を Parse していきます。parser.ParseStatement を使用して ottl statement を作成、そこに Execute を実行して ottl の記述された処理を実行します。

got := span.Attributes().AsRaw()

if diff := cmp.Diff(tt.want, got); diff != "" {
    t.Errorf("attributes mismatch (-want +got):\n%s", diff)
}

span を取り出して want の map と比較します。

tests := []struct {
    name  string
    input func() ptrace.Span
    want  map[string]any
}{
    {
        name: "set attribute",
        input: func() ptrace.Span {
            span := ptrace.NewSpan()
            span.SetName("operationA")
            return span
        },
        want: map[string]any{
            "test": "pass",
        },
    },
    {
        name: "not set attribute with expression",
        input: func() ptrace.Span {
            span := ptrace.NewSpan()
            span.SetName("operationB")
            return span
        },
        want: map[string]any{},
    },
}

不正な statement を入れてみる

あえて最後の " を消してみます

...
processors:
  transform:
    trace_statements:
      - context: span
        statements:
          - set(attributes["test"], "pass") where name == "operationA
...

ちゃんと parse で失敗してエラーが出ました。

failed to parse statement: statement has invalid syntax: 1:47: invalid input text "\"operationA"

全体実装

改めて全体実装です。

package transform_test

import (
	"context"
	"testing"

	"github.com/google/go-cmp/cmp"
	"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlspan"
	"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/ottlfuncs"
	"go.opentelemetry.io/collector/component/componenttest"
	"go.opentelemetry.io/collector/confmap/confmaptest"
	"go.opentelemetry.io/collector/pdata/pcommon"
	"go.opentelemetry.io/collector/pdata/ptrace"
)

func assertType[T any](t *testing.T, v interface{}) T {
	t.Helper()

	result, ok := v.(T)
	if !ok {
		t.Fatalf("failed to assert as %T", *new(T))
	}
	return result
}

func loadStatements(t *testing.T) []string {
	t.Helper()

	conf, err := confmaptest.LoadConf("config.yaml")
	if err != nil {
		t.Fatalf("failed to load config file: %v", err)
	}

	processors := assertType[map[string]interface{}](t, conf.ToStringMap()["processors"])
	transform := assertType[map[string]interface{}](t, processors["transform"])
	traceStatements := assertType[[]interface{}](t, transform["trace_statements"])

	var statements []string
	for _, stmt := range traceStatements {
		stmtMap := assertType[map[string]interface{}](t, stmt)
		if stmtMap["context"] == "span" {
			stmts := assertType[[]interface{}](t, stmtMap["statements"])
			for _, s := range stmts {
				statements = append(statements, assertType[string](t, s))
			}
		}
	}

	return statements
}

func TestTransformStatements(t *testing.T) {
	t.Parallel()

	statements := loadStatements(t)
	if len(statements) == 0 {
		t.Fatal("failed to load statements from config file")
	}

	tests := []struct {
		name  string
		input func() ptrace.Span
		want  map[string]any
	}{
		{
			name: "set attribute",
			input: func() ptrace.Span {
				span := ptrace.NewSpan()
				span.SetName("operationA")
				return span
			},
			want: map[string]any{
				"test": "pass",
			},
		},
		{
			name: "not set attribute with expression",
			input: func() ptrace.Span {
				span := ptrace.NewSpan()
				span.SetName("operationB")
				return span
			},
			want: map[string]any{},
		},
	}

	for _, tt := range tests {
		tt := tt
		t.Run(tt.name, func(t *testing.T) {
			t.Parallel()

			settings := componenttest.NewNopTelemetrySettings()
			parser, err := ottlspan.NewParser(ottlfuncs.StandardFuncs[ottlspan.TransformContext](), settings)
			if err != nil {
				t.Fatalf("failed to create parser: %v", err)
			}

			var (
				span = tt.input()
				tctx = ottlspan.NewTransformContext(
					span,
					pcommon.NewInstrumentationScope(),
					pcommon.NewResource(),
					ptrace.NewScopeSpans(),
					ptrace.NewResourceSpans(),
				)
			)

			for _, stmt := range statements {
				statement, err := parser.ParseStatement(stmt)
				if err != nil {
					t.Fatalf("failed to parse statement: %v", err)
				}

				_, _, err = statement.Execute(context.Background(), tctx)
				if err != nil {
					t.Fatalf("failed to execute transformation: %v", err)
				}
			}

			got := span.Attributes().AsRaw()

			if diff := cmp.Diff(tt.want, got); diff != "" {
				t.Errorf("attributes mismatch (-want +got):\n%s", diff)
			}
		})
	}
}

まとめ

今回は OTTL についてのテストを実装しました。各言語の SDK ではなく otel collector で処理を記述した際の挙動の確認の事例があまり見当たらなかったので、何かの参考になれば幸いです。今回の事例に限らず、otel collector 内部での挙動を config.yaml 含めて検証したいケースはあるので、その際の道標になれば幸いです。
ですが…もしももっと良い OTTl のテスト手法があればぜひ教えてください。

Discussion