👋

Cloud FunctionsでのOpenTelemetryの活用検証

2024/11/21に公開

こちらの記事は、LUUP のTVCM放映に合わせた一足早い「Luup Developers Advent Calendar 2024」の21日目の記事です。

こんにちは。SREチームの岡谷です。

LuupではCloud Run Functions(以下、CF)がバックエンドの主なインフラとなっています。
そのなかで同期的に行う必要のない重い処理をPub/Subを経由して非同期実行する場合があります。その時の連携のメジャーパターンとしてCF -(Pub/Sub)-> Pub/Sub Triggerd CFというパターンがあります。

このようなメジャーパターンの連携ですが、普通にしていると下記のような情報が分かりずらいです。

  • 一連の処理にどれくらい時間がかかっているか
  • どこの処理でコケているか

悩ましい日々を送っていたのですが、OpenTelemetryを使えばTraceできるのではないかという情報を得ました。

では検証してみようということになりました。

OpenTelemetryとは?

公式によると「オブザーバビリティフレームワークであり、トレース、メトリクス、ログのようなテレメトリーデータを作成・管理するためにデザインされたツールキット」とのことです。

elastic社によると「メトリック、ログ、トレースを収集し、監視プラットフォームにルーティングするための標準化されたプロトコルとツールを提供するために、Cloud Native Computing Foundation(CNCF)によって開発されました」とあります。

こちらの方がわかりやすいですね。CNCFによって作られたオブザーバビリティのために標準化されたプロトコルとツールキットということですね。プロトコルがオープンであるので様々なプログラミング言語の対応も進みそうです。

実装はどうなったか?

ここにGolangによるPub/Sub経由のPublisherとSubscriberのTraceを取得する実行があります。

これをCFで動くように書き換えてあげれば良さそうです。

最終的に動いた実装は下記の通りです。

import {onRequest} from "firebase-functions/v2/https";
import * as logger from "firebase-functions/logger";
import {onMessagePublished} from "firebase-functions/v2/pubsub";
import {initializeApp} from "firebase-admin/app";
import {PubSub} from "@google-cloud/pubsub";
import { context, propagation, trace } from '@opentelemetry/api';
import * as opentelemetry from "@opentelemetry/sdk-node";
import { getNodeAutoInstrumentations } from '@opentelemetry/auto-instrumentations-node';
import { Resource } from '@opentelemetry/resources';
import { ATTR_SERVICE_NAME } from '@opentelemetry/semantic-conventions';
import { TraceExporter } from '@google-cloud/opentelemetry-cloud-trace-exporter';
import {
  BatchSpanProcessor,
 } from "@opentelemetry/sdk-trace-base";

// OpenTelemetryの初期化関数
function initializeOpenTelemetry() {
  const traceExporter = new TraceExporter();
  const spanProcessor = new BatchSpanProcessor(traceExporter);
  const sdk = new opentelemetry.NodeSDK({
    resource: new Resource({
      [ATTR_SERVICE_NAME]: 'firebase-functions',
    }),
    traceExporter,
    spanProcessor,
    instrumentations: [getNodeAutoInstrumentations()],
  });

  sdk.start();

  // プロセス終了時にSDKをシャットダウン
  process.on('SIGTERM', () => {
    sdk.shutdown()
      .then(() => logger.info('OpenTelemetry SDK shut down'))
      .catch((error) => logger.error('Error shutting down OpenTelemetry SDK', error))
      .finally(() => process.exit(0));
  });
}

// Firebase Adminの初期化
initializeApp();

// OpenTelemetryの初期化
initializeOpenTelemetry();

// PubSubクライアントの初期化
const pubSubClient = new PubSub();

export const helloWorld = onRequest(
  { 
    region: "asia-northeast1",  // 東京リージョン
    cors: true,
  },
  async (request, response) => {
    console.log("headers", request.headers);
    
    // リクエストヘッダーからコンテキストを抽出
    const parentContext = propagation.extract(context.active(), request.headers);
    if(!parentContext) {
      throw new Error("No parent context found");
    }

    try {
      return context.with(parentContext, async () => {
        // testトピックにメッセージを送信
        const messageData = {
          message: "Hello from Firebase!",
          timestamp: new Date().toISOString()
        };

        const attributes: {[key: string]: string} = {};

        // OpenTelemetryのコンテキストを属性に注入
        propagation.inject(context.active(), attributes);
        logger.info("attributes", {attributes});
        const topic = pubSubClient.topic("test");
        
        // シーケンスを分かりやすくするために500ms待つ
        await new Promise(resolve => setTimeout(resolve, 500));
        
        const messageId = await topic.publishMessage({
          data: Buffer.from(JSON.stringify(messageData)),
          attributes: attributes,
        });

        // シーケンスを分かりやすくするために500ms待つ
        await new Promise(resolve => setTimeout(resolve, 500));

        logger.info("Message published", {messageId});
        response.send(`Message published successfully! Message ID: ${messageId}`);
      });
    } catch (error) {
      logger.error("Error publishing message", error);
      response.status(500).send("Error publishing message");
    }
});

export const handleTestMessage = onMessagePublished(
  {
    topic: "test",
    region: "asia-northeast1"
  },
  async (event) => {
    const tracer = trace.getTracer('firebase-functions');
    // OpenTelemetryのコンテキストを属性から抽出
    let attributes = event.data.message.attributes;
    if (attributes) {
      const parentContext = propagation.extract(context.active(), attributes);
      return context.with(parentContext, async() => {
        const span = tracer.startSpan('handleTestMessage');
        try {
          // メッセージのデータを取得
          const message = event.data.message.json;
          
          // ログ出力
          logger.info("Received message on 'test' topic", message);
          await new Promise(resolve => setTimeout(resolve, 1000));

        } finally {
          span.end();
        }
      });
    } else {
        // 属性がない場合は通常の処理を実行
        const message = event.data.message.json;
        logger.info("Received message on 'test' topic", message);
    }    
  }
);

結果は下記の通りにTraceできました。

動かして見た様子

苦労した点/今後の検討事項

最初からGolangを書き直す前提で動けばよかったのですが、TypeScriptで既存実装がないか探し回ってドハマリしました。

部分的なコードしかなく部分的に挿入しても動かないコードばかりでした。

とりわけ検索して見つかるコードではなかなかわからない下記の二点が鬼門だったかなと思います。

  • OpenTelemetryの初期化処理は必要
  • Spanがはじまっていないと propagation.inject(context.active(), attributes); が機能しない

検索して見つからないコードにぶち当たるなんてめんどくさい!と言わずに違う開発言語やインフラの想定でも動くコードを読み込んで翻訳するということはとても重要です。

でもまあ…めんどくさいですね。正直めんどくかったです。

今後の展望としては、まだログ(Cloud Logging)について同じTraceIDで見るということが出来ていないので、それを検証した後、プロダクションに入れ込むということをやっていこうと考えています。

めんどくさいのでノウハウ持っている方がいらっしゃったら助けてください。

最後に

Luup では、一緒に開発してくださるソフトウェアエンジニアを積極的に募集しています。

カジュアル面談も実施しておりますのでぜひお気軽にお声掛けください。

https://recruit.luup.sc/

参考リンク

Luup Developers Blog

Discussion