✂️

Pub/Sub の Single Message Transform (SMT) を使ってメッセージを手軽に変換する

に公開

はじめに

Cloud Pub/Sub はスケーラブルなメッセージキューですが、キューを流れるメッセージの加工はキューの外部で行う必要がありました。例えば Cloud Dataflow を使ったり VM やコンテナの上で独自のデータ加工アプリを動かしたりする方法が挙げられます。確かに Dataflow は多機能でスケール性に優れたサービスですが、ちょっとしたメッセージの加工に使うのは大がかりに感じられることもあるかと思います。このような場合に手軽に利用できるのが2025年6月に出た Single Message Transform (SMT) です。SMT を使うと Pub/Sub の中で JavaScript の関数を動かし、メッセージを加工することができます。

前提

想定する読者は Pub/Sub の基礎的な仕組み(トピックにメッセージを入れて、サブスクリプションから取り出す)を知っている方です。

ユースケースや既存の機能との違い

  • メッセージのバリデーション:Pub/Sub ではスキーマを設定してメッセージの形式を規定することが可能ですが、SMTを使うとスキーマだけでは表現できない複雑なバリデーションを行うことも可能です。
  • メッセージのフィルタ:Pub/Sub のサブスクリプションのフィルタ機能では、その基準に設定可能なのはメッセージの属性のみです。SMT では属性に加え、データも参照してフィルタすることが可能です。またトピック側にもフィルタを設定することができます。
  • 不要なフィールドの削除:メッセージに含まれる特定の項目(例えば、JSON の個人情報に関連するフィールド)を削除します。
  • スキーマの変換:キューの入出力でスキーマが異なる場合、メッセージを詰め替えることができます。

ユースケースではないもの

SMT の関数には下記の制限があり、複雑な処理は行えません。また Single Message の名の通り、複数のメッセージを集計することはできません。そういった要件がある場合は Dataflow などを使って加工してください。

  • ソースコードの容量: 20 KB
  • 実行時間: 500 ms
  • 外部 API は利用不可:例えば Gemini API を呼び出してメッセージにラベルを付けるようなことはできません
  • 外部ライブラリの読み込み不可

仕様

それでは具体的な仕様を見ていきます。

関数

SMT で利用する JavaScript の関数は次のような仕様に沿って作る必要があります。

メッセージの入出力

関数の引数のシグネチャは下記の通りです。

  • message (Object)
    • data (string)
    • attributes (Object<string, string>)
  • metadata (Object)
    • message_id (string)
    • publish_time (string)
    • ordering_key (string)

戻り値は次のようなフィールドを持つ Object もしくは null です。

  • data (string)
  • attributes: (Object<string, string>)

関数を実行した結果は次の3種類のいずれかです。

  1. 加工したメッセージを渡して処理を続行する:メッセージを戻り値とする
  2. メッセージを捨て、正常終了する:戻り値に null を設定
  3. エラーを返して中断:例外をスロー

Pub/Sub のキューはトピックとサブスクリプションの2つで構成されていますが、SMT の関数はどちらにも設定することが可能です。

SMT を有効にした Pub/Sub のフロー

設定はコンソール、gcloudコマンドTerraformのいずれかにより行えます。コンソールやgcloud コマンドでは、関数のコードのバリデーションを行い、テストメッセージを送ってどのように変換されるのかを確認することができます。

コンソールでの SMT 関数の設定

トピックへの設定

トピックへ SMT を設定すると次のような特徴があります。

  • トピックからサブスクリプション間のデータ量を削減
    • 複数のサブスクリプションへ分岐する場合、トピックの段階で先に不必要なフィールドを取り除きデータ転送量を削減することができます。
  • メッセージが破棄されたことは送信元にはわからない
    • 送信側はメッセージ ID が戻り、トピックが正常にメッセージを受け取ったように見えます
    • 破棄されたメッセージのデータ量は Cloud Monitoring の pubsub.googleapis.com/topic/byte_costoperation_type=smt_publish_filter_drop に絞ることで確認可能です。
  • 送信元は例外がスローされたことがわかる
    • 関数の中で例外がスローされた場合、メッセージの送信側はエラーが起きたことを検出することができます。

サブスクリプションへの設定

  • エラーの起きたメッセージを Dead Letter Queue (DLQ) に送信可能
    • 例外がスローされた場合、そのメッセージやエラーログを DLQ に送ることができます。関数でエラーが起きた場合のメッセージを保存して、デバッグやエラー処理に利用することができます。
  • メッセージが破棄されるとそのメッセージの処理は終了する
    • 破棄されたメッセージのデータ量は Cloud Monitoring のpubsub.googleapis.com/subscription/byte_costoperation_type=smt_subscribe_filter_drop に絞ることで確認可能です。
  • バックログに関するメトリクス(バックログにあるメッセージの件数や滞留時間)は SMT の処理待ちになっているメッセージも含まれます。 これらのメトリクスを使って受信側のワーカーをオートスケールする場合、SMT が破棄し受信側には流れないメッセージもメトリクスの計算対象であることを考慮する必要があります。

料金

料金の単価は $40 / TiB (Pub/Sub 自体のトピックやサブスクリプションのスループット単価と同じ)です。SMT の入出力のうち大きい方で課金されるので、8 MiB のメッセージを 5 MiB に削減する関数の場合は 8 MiB 分が課金されます。

実際に挙動を確認する

メッセージの加工、廃棄、例外のスロー

簡単なサンプルとして整数を与えて FizzBuzz を行う関数を設定して、SMT の基本的な動作を確認します。
関数は複数指定することも可能で、その場合メッセージは先に設定された関数から直列に処理されていきます。

サンプルコードの仕様:

  • トピック:バリデーション関数 T-1 を設定
    • 数値ではない文字列→クライアント側にエラーを返す
    • 整数ではない数値→メッセージを廃棄
  • サブスクリプション:バリデーション関数 S-1 と FizzBuzz を行う関数 S-2 を設定
    • 負の数値:DLQ へ送る
    • 0:メッセージを廃棄
    • 正の数:FizzBuzz を実施
      • 例: 15→ {"original": "15", "converted": "1 2 Fizz 4 Buzz Fizz 7 8 Fizz Buzz 11 Fizz 13 14 FizzBuzz"}

メッセージの加工

message.data は文字列なので、適宜 JSON や Number にパースして扱います。戻り値の data は文字列型の仕様なので、加工が済んだメッセージは文字列に変換します。

関数S-2: サブスクリプションに設定したメッセージ加工用関数

function fizzbuzz(message, metadata) {
  const n = Number(message.data);
  const fizzBuzzSequence = Array.from({ length: n }, (_, i) => getFizzBuzzValue(i + 1));

  return {
    data: JSON.stringify({
      original: message.data,
      converted: fizzBuzzSequence.join(' ')
    }),
    attributes: message.attributes,
  };
}

function getFizzBuzzValue(n) {
  if (n % 15 === 0) return "FizzBuzz";
  if (n % 3 === 0) return "Fizz";
  if (n % 5 === 0) return "Buzz";
  return String(n);
}

変換後のメッセージを BigQuery のテーブルへ書き込むと、関数からの出力が記録されていることが確認できます。

SMT により変換されたメッセージ

メッセージの破棄とエラーハンドリング

メッセージを破棄したい場合は戻り値を null にします。

関数T: トピックに指定したバリデーション関数

function acceptOnlyInteger(message, metadata) {
  num = Number(message.data)
  if (Number.isNaN(num)) {
    throw new Error("message should be valid number");
  }

  if (Number.isInteger(num)) {
    return message
  }

  return null;
}

関数S-1: サブスクリプションに設定したバリデーション、フィルタ用関数

function acceptPositiveNumber(message, metadata) {
  const n = Number(message.data)

  if (n < 0) {
    throw new Error(`${n} is not positive number`)
  } else if (n === 0) {
    return null
  }

  return message;
}

続いて関数から例外がスローされたときの挙動を確認します。
トピック内の関数で例外をスローすると、下記のように送信元にもエラーが発生したことが伝わります。例外のメッセージや関数名、行数などを確認することができます。

例外発生時のエラーメッセージ
ERROR: (gcloud.pubsub.topics.publish) INVALID_ARGUMENT: Pub/Sub failed to apply a message transformation to one or more messages in the publish request. Error: Failed to execute JavaScript UDF: `acceptOnlyInteger`. Error: message should be valid number at acceptOnlyInteger line 21, columns 4-5..
- '@type': type.googleapis.com/google.rpc.ErrorInfo
  domain: pubsub.googleapis.com
  metadata:
    message: 'Pub/Sub failed to apply a message transformation to one or more messages
      in the publish request. Error: Failed to execute JavaScript UDF: `acceptOnlyInteger`.
      Error: message should be valid number at acceptOnlyInteger line 21, columns
      4-5..'
  reason: SMT_FAILURE
- '@type': type.googleapis.com/google.rpc.DebugInfo
  detail: '[ORIGINAL ERROR] generic::invalid_argument: Pub/Sub failed to apply a message
    transformation to one or more messages in the publish request. Error: Failed to
    execute JavaScript UDF: `acceptOnlyInteger`. Error: message should be valid number
    at acceptOnlyInteger line 21, columns 4-5.. [google.rpc.error_details_ext] { message:
    "Pub/Sub failed to apply a message transformation to one or more messages in the
    publish request. Error: Failed to execute JavaScript UDF: `acceptOnlyInteger`.
    Error: message should be valid number at acceptOnlyInteger line 21, columns 4-5.."
    details { [type.googleapis.com/google.rpc.ErrorInfo] { reason: "SMT_FAILURE" domain:
    "pubsub.googleapis.com" metadata { key: "message" value: "Pub/Sub failed to apply
    a message transformation to one or more messages in the publish request. Error:
    Failed to execute JavaScript UDF: `acceptOnlyInteger`. Error: message should be
    valid number at acceptOnlyInteger line 21, columns 4-5.." } } } }'

一方、サブスクリプションで例外がスローされると、そのメッセージは Nack されてその後 DLQ(設定があれば)へ送られます。
DLQ に送られたメッセージの attributes にはエラーメッセージも含まれます。
attributes の例を示します。

{
  "CloudPubSubDeadLetterSourceDeliveryCount": "5",
  "CloudPubSubDeadLetterSourceSMTErrorMessage": "Failed to execute JavaScript UDF: `acceptPositiveNumber`. Error: -3 is not positive number at acceptPositiveNumber line 5, columns 4-5.",
  "CloudPubSubDeadLetterSourceSubscription": "fizzbuzz-sub",
  "CloudPubSubDeadLetterSourceSubscriptionProject": "PROJECT_ID",
  "CloudPubSubDeadLetterSourceTopicPublishTime": "2025-07-15T06:09:57.849+00:00"
}

実行時間の制限

関数の中に無限ループを仕込んで、タイムアウトすることを確認します。

タイムアウト発生時のエラーメッセージ
ERROR: (gcloud.pubsub.topics.publish) INVALID_ARGUMENT: Pub/Sub failed to apply a message transformation to one or more messages in the publish request. Error: Execution of JavaScript UDF timed out: `functionName`..
- '@type': type.googleapis.com/google.rpc.ErrorInfo
  domain: pubsub.googleapis.com
  metadata:
    message: 'Pub/Sub failed to apply a message transformation to one or more messages
      in the publish request. Error: Execution of JavaScript UDF timed out: `functionName`..'
  reason: SMT_FAILURE
- '@type': type.googleapis.com/google.rpc.DebugInfo
  detail: '[ORIGINAL ERROR] generic::invalid_argument: Pub/Sub failed to apply a message
    transformation to one or more messages in the publish request. Error: Execution
    of JavaScript UDF timed out: `functionName`.. [google.rpc.error_details_ext] {
    message: "Pub/Sub failed to apply a message transformation to one or more messages
    in the publish request. Error: Execution of JavaScript UDF timed out: `functionName`.."
    details { [type.googleapis.com/google.rpc.ErrorInfo] { reason: "SMT_FAILURE" domain:
    "pubsub.googleapis.com" metadata { key: "message" value: "Pub/Sub failed to apply
    a message transformation to one or more messages in the publish request. Error:
    Execution of JavaScript UDF timed out: `functionName`.." } } } }'

関数の実行時間は Cloud Monitoring の下記のメトリクスで確認することが可能です。

  • pubsub.googleapis.com/topic/message_transform_latencies
  • pubsub.googleapis.com/subscription/message_transform_latencies

この記事に載せているコードでは最長でも数十 ms 程度でした。

応用例

データパイプラインの中で SMT を使う以外のユースケースとして、Webhook へメッセージを送信する例を示します。

Google Cloud のマネージドサービスの中には、リソースの状況を Pub/Sub のメッセージとして取得することができるものがあります。例えば、Cloud Build のビルドの状況や GKE クラスタの更新情報、予算の状況を通知する機能が挙げられます。今までは Pub/Sub から Cloud Run functions へメッセージを送り、そこで加工する必要がありましたが、SMT により Pub/Sub のみで実現することができます。
この記事では、GKE クラスタの更新通知を加工して Slack の Webhook へ送ります。

Pub/Sub の Push サブスクリプションを使うとメッセージを外部の URL へ POST することができます。Pub/Sub のデフォルトではメッセージを Pub/Sub 独自のスキーマでラップされ、外部 API の仕様と合わなくなってしまいますが、Unwrap 機能を有効にすると、SMT で加工したメッセージをそのまま外部へ送信することができます。今回は Slack の仕様に沿った JSON を組み立てます。
GKE からのメッセージは data にイベントを説明する文字列、 attributes.payload に JSON 形式の文字列でメタデータが入っているので後者は JSON としてパースして必要な項目を取り出します。

function gkeUpgrade(message, metadata) {

  let slackBlock
  switch (message.attributes.type_url) {
    case "type.googleapis.com/google.container.v1beta1.UpgradeEvent":
      slackBlock = upgradeEvent(message)
      break;
    case "type.googleapis.com/google.container.v1beta1.UpgradeInfoEvent":
      slackBlock = upgradeInfoEvent(message)
      break;
    default:
      // SecurityBulletinEvent, UpgradeAvailableEvent are not implemented
      slackBlock = {
        "type": "section",
        "text": {
          "type": "plain_text",
          "text": `Unimplemented type ${message.attributes.type_url}`,
        }
      }
      break;
  }

  return {
    data: JSON.stringify(slackBlock), attributes: {
      "content-type": "application/json"
    }
  }
}

function upgradeEvent(message) {
  const payload = JSON.parse(message.attributes.payload)

  return {
    blocks: [{
      "type": "section",
      "text": {
        "type": "plain_text",
        "text": `:information_source: ${message.data}`
      }
    }, {
      "type": "section",
      "fields": [
        {
          "type": "mrkdwn",
          "text": `*Cluster:*\n${message.attributes.cluster_name}`
        },
        {
          "type": "mrkdwn",
          "text": `*Project #:*\n${message.attributes.project_id}`
        },
        {
          "type": "mrkdwn",
          "text": `*Start time:*\n${slackDateFormat(payload.operationStartTime)}`
        },
        {
          "type": "mrkdwn",
          "text": `*Current ver.:*\n${payload.currentVersion}`
        }
      ]
    }
    ]
  }
}

function upgradeInfoEvent(message) {
  const payload = JSON.parse(message.attributes.payload)
  let details

  switch (payload.eventType) {
    case "UPGRADE_LIFECYCLE":
      details = [{
        "type": "mrkdwn",
        "text": `*Start time:*\n${slackDateFormat(payload.startTime)}`
      },
      {
        "type": "mrkdwn",
        "text": `*End time:*\n${slackDateFormat(payload.endTime)}`
      },
      {
        "type": "mrkdwn",
        "text": `*Previous ver.:*\n${payload.currentVersion}`
      }];
      break;
    case "END_OF_SUPPORT":
      details = [{
        "type": "mrkdwn",
        "text": `*Standard EoL:*\n${slackDateFormat(payload.standardSupportEndTime)}`
      },
      {
        "type": "mrkdwn",
        "text": `*Extended EoL:*\n${slackDateFormat(payload.extendedSupportEndTime)}`
      },
      {
        "type": "mrkdwn",
        "text": `*Current ver.:*\n${payload.currentVersion}`
      }];
      break;
    case "COS_MILESTONE_VERSION_UPDATE":
      details = [{
        "type": "mrkdwn",
        "text": `*COS upgrade:*\n${payload.description}`
      }];
      break;
    default:
      details = [{
        "type": "mrkdwn",
        "text": `*Unknown event:*\n${payload.eventType}`
      }];
      break;
  }

  return {
    blocks: [{
      "type": "section",
      "text": {
        "type": "plain_text",
        "text": `:information_source: ${message.data}`
      }
    }, {
      "type": "section",
      "fields": [
        {
          "type": "mrkdwn",
          "text": `*Cluster name:*\n${message.attributes.cluster_name}`
        },
        {
          "type": "mrkdwn",
          "text": `*Project #:*\n${message.attributes.project_id}`
        },
        ...details
      ]
    }
    ]
  }
}

function slackDateFormat(datetimeStr){
  return `<!date^${Math.floor(new Date(datetimeStr).getTime() / 1000)}^{date_num} {time_secs}|${datetimeStr}>`
}

実際に GKE クラスタが更新されると次のような通知が届きます。
GKE クラスタの更新通知

参考リンク

Google Cloud Japan

Discussion