📖

Fluentd @type copy利用時のデータ共有の挙動と注意点

に公開

足立区の花火大会、今年も楽しみにしていたのですが…まさかの直前中止。夜空に咲くはずだった大輪の花火を想い、PCの画面に咲く大量のログを眺めるSREチームの塚田です。さて、ログといえばFluentd。その柔軟性と拡張性の高さは魅力ですが、設定によっては思わぬ挙動に遭遇することもあります。特に、イベントを複数の経路に分岐させる @type copy と、データ構造を加工する parser フィルターを組み合わせた際に発生しがちな「データの参照共有」の問題は、気づかぬうちにデータ欠損を引き起こす可能性も。

本記事では、この現象がなぜ起こるのか、具体的な検証コードとログを交えながら解説させていただければと思います。

Fluentdにおけるイベント処理の基本

まず、Fluentdがデータをどのように扱うか、基本をおさらいできればと思います。
Fluentdは、入力されたデータ(イベント)を「レコード」という単位で処理します。レコードとは、キーと値のペアが集まったもので、プログラミングにおけるハッシュやJSONオブジェクトのようなものとイメージしてください。

例えば、以下のようなレコードがあったとします。

{
  "record_id": "rec_json_test",
  "target_field": "{\"message\":\"hello fluentd\", \"count\":100}",
  "other_data": "original_value"
}

この例では、target_field の値がJSON文字列として格納されています。これをパースして構造化データとして扱いたい、といったケースはよくあるかと思います。

イベントを分岐させる @type copy プラグイン

Fluentdの @type copy プラグインは、1つのイベントレコードを、設定された複数の処理経路(例えば、異なるフィルター処理や出力先)に送る機能を提供します。これにより、同じデータを異なる目的で並行して処理できます。

ここで非常に重要なポイントがあります。@type copy は、多くの場合、イベントレコードの完全なコピー(複製)をそれぞれの経路に渡すわけではありません。同じ1つのレコードデータへの「参照」(データの実際の置き場所を指し示す情報のようなもの)を渡す振る舞いをデフォルトとして採用しています。

https://docs.fluentd.org/output/copy#copy_mode

検証環境の準備

今回の挙動を実際に確認するため、以下のDockerfileとFluentd設定ファイルを用意しました。

# ベースOSイメージとしてUbuntu 22.04 (Jammy) を使用
FROM ubuntu:22.04

ENV DEBIAN_FRONTEND=noninteractive
RUN apt-get update && apt-get install -y --no-install-recommends \
    curl \
    ca-certificates \
    && rm -rf /var/lib/apt/lists/*

# fluent-packageの.debファイルをダウンロード
WORKDIR /tmp
RUN curl -fSL -O https://s3.amazonaws.com/packages.treasuredata.com/lts/5/ubuntu/jammy/pool/contrib/f/fluent-package/fluent-package_5.0.7-1_arm64.deb

RUN apt-get update && apt-get install -y --no-install-recommends ./fluent-package_5.0.7-1_arm64.deb \
    && rm -f ./fluent-package_5.0.7-1_arm64.deb \
    && rm -rf /var/lib/apt/lists/*

# ローカルのfluentd.confをコンテナの/etc/fluent/fluentd.confにコピー
COPY fluentd.conf /etc/fluent/fluentd.conf

# 適宜パーミッションを設定
RUN mkdir -p /var/run/fluentd && chown _fluentd:_fluentd /var/run/fluentd
USER _fluentd

CMD ["/usr/sbin/fluentd", "-c", "/etc/fluent/fluentd.conf", "--no-supervisor"]

このDockerfileと、後述する fluentd.conf を用意し、以下のコマンドでDockerイメージをビルド・実行します。

docker build -t fluent-package-ubuntu-repro-test . --no-cache
docker run --rm fluent-package-ubuntu-repro-test

ケース1: 影響が出ない(ように見える)パターン (remove_key_name_field false)

まずは、parser フィルターの remove_key_name_field パラメータを false に設定した場合の挙動を見てみましょう。これは、パース対象となった元のフィールドをレコードから削除「しない」設定です。

# 1. ダミーの入力データを生成
#    target_field の値を有効なJSON文字列にします
<source>
  @type dummy
  tag input.log
  dummy {"record_id": "rec_json_test", "target_field": "{\"message\":\"hello fluentd\", \"count\":100}", "other_data": "original_value"}
  rate 1 # 1秒に1イベント送信
</source>

# 2. イベントを2つのラベルにコピーしてルーティング
<match input.log>
  @type copy
  <store>
    @type relabel
    @label @LABEL_A_PROCESS_WITH_PARSER # parser処理を行うラベル
  </store>
  <store>
    @type relabel
    @label @LABEL_B_OBSERVE_EFFECT   # LABEL_Aの影響を確認するラベル
  </store>
</match>

# 3. ラベルA: parserフィルターで target_field を処理
#    (target_field が文字列からJSONオブジェクトに変換されることを期待)
<label @LABEL_A_PROCESS_WITH_PARSER>
  <filter **> # このラベルに来た全イベントを対象
    @type parser
    key_name target_field         # このキーの値をパース
    reserve_data true           # パース成功時は特に影響しないが、念のため残しても良い
    remove_key_name_field false    # ★元の文字列としてのtarget_fieldを処理
    hash_value_field target_field # ★パースされたJSONオブジェクトを同じtarget_fieldキーに格納
    # emit_invalid_record_to_error false # パース成功時は特に影響しない
    <parse>
      @type json
      # ignore_parser_error true # パース成功時は特に影響しない
    </parse>
  </filter>

  # ラベル名をレコードに追加
  <filter **>
    @type record_modifier
    <record>
      label_name LABEL_A_PROCESS_WITH_PARSER
    </record>
  </filter>

  # ラベルAの処理後のレコードを標準出力
  <match **>
    @type stdout
    <format>
      @type json # 出力をJSON形式に
    </format>
  </match>
</label>

# 4. ラベルB: ラベルAでの処理の影響を確認
#    (もし参照が共有されていれば、ここでも target_field がJSONオブジェクトになっていることを期待)
<label @LABEL_B_OBSERVE_EFFECT>
  # ラベル名をレコードに追加
  <filter **>
    @type record_modifier
    <record>
      label_name LABEL_B_OBSERVE_EFFECT
    </record>
  </filter>

  # ラベルBに渡ってきたレコードをそのまま標準出力
  <match **>
    @type stdout
    <format>
      @type json # 出力をJSON形式に
    </format>
  </match>
</label>

実行結果:

{"record_id":"rec_json_test","target_field":{"message":"hello fluentd","count":100},"other_data":"original_value","label_name":"LABEL_A_PROCESS_WITH_PARSER"}
{"record_id":"rec_json_test","target_field":"{\"message\":\"hello fluentd\", \"count\":100}","other_data":"original_value","label_name":"LABEL_B_OBSERVE_EFFECT"}

(上記のような出力が1秒ごとに繰り返されます)

この結果を見ると、
LABEL_A_PROCESS_WITH_PARSER では、target_field が期待通りJSONオブジェクトにパースされています。
LABEL_B_OBSERVE_EFFECT では、target_field は元のJSON文字列のままです。

この時点では、LABEL_A での処理が LABEL_B に影響を与えず、それぞれ独立して動作しているように見えます。これは、元のフィールドを「削除」せず、同名キーにパース結果を「上書き」するような形で処理されたため、分岐先での見え方に影響が出なかった(あるいは出にくかった)と考えられます。

ケース2: 影響が出てしまうパターン (remove_key_name_field true)

次に、remove_key_name_field パラメータを true に変更してみましょう。これは、パース対象となった元のフィールドをレコードから削除「する」設定です。

fluentd.conf (LABEL_Aのfilter部分のみ変更):

# 1. ダミーの入力データを生成
#    target_field の値を有効なJSON文字列にします
<source>
  @type dummy
  tag input.log
  dummy {"record_id": "rec_json_test", "target_field": "{\"message\":\"hello fluentd\", \"count\":100}", "other_data": "original_value"}
  rate 1 # 1秒に1イベント送信
</source>

# 2. イベントを2つのラベルにコピーしてルーティング
<match input.log>
  @type copy
  <store>
    @type relabel
    @label @LABEL_A_PROCESS_WITH_PARSER # parser処理を行うラベル
  </store>
  <store>
    @type relabel
    @label @LABEL_B_OBSERVE_EFFECT   # LABEL_Aの影響を確認するラベル
  </store>
</match>

# 3. ラベルA: parserフィルターで target_field を処理
#    (target_field が文字列からJSONオブジェクトに変換されることを期待)
<label @LABEL_A_PROCESS_WITH_PARSER>
  <filter **> # このラベルに来た全イベントを対象
    @type parser
    key_name target_field         # このキーの値をパース
    reserve_data true           # パース成功時は特に影響しないが、念のため残しても良い
    remove_key_name_field true    # ★元の文字列としてのtarget_fieldを処理
    hash_value_field target_field # ★パースされたJSONオブジェクトを同じtarget_fieldキーに格納
    # emit_invalid_record_to_error false # パース成功時は特に影響しない
    <parse>
      @type json
      # ignore_parser_error true # パース成功時は特に影響しない
    </parse>
  </filter>

  # ラベル名をレコードに追加
  <filter **>
    @type record_modifier
    <record>
      label_name LABEL_A_PROCESS_WITH_PARSER
    </record>
  </filter>

  # ラベルAの処理後のレコードを標準出力
  <match **>
    @type stdout
    <format>
      @type json # 出力をJSON形式に
    </format>
  </match>
</label>

# 4. ラベルB: ラベルAでの処理の影響を確認
#    (もし参照が共有されていれば、ここでも target_field がJSONオブジェクトになっていることを期待)
<label @LABEL_B_OBSERVE_EFFECT>
  # ラベル名をレコードに追加
  <filter **>
    @type record_modifier
    <record>
      label_name LABEL_B_OBSERVE_EFFECT
    </record>
  </filter>

  # ラベルBに渡ってきたレコードをそのまま標準出力
  <match **>
    @type stdout
    <format>
      @type json # 出力をJSON形式に
    </format>
  </match>
</label>

LABEL_A_PROCESS_WITH_PARSER では、target_field が空のJSONオブジェクト {} として存在しています(元のフィールドが削除された後、パース結果が同名で再格納されたため)。
しかし、変更したのは LABEL_A_PROCESS_WITH_PARSER であるのにも関わらず、LABEL_B_OBSERVE_EFFECT では、target_field 自体が消失してしまいました。

標準出力は以下のようになります。

{"record_id":"rec_json_test","other_data":"original_value","label_name":"LABEL_A_PROCESS_WITH_PARSER","target_field":{}}
{"record_id":"rec_json_test","other_data":"original_value","label_name":"LABEL_B_OBSERVE_EFFECT"}

そして、Fluentdの警告ログには以下のようなメッセージが出力されます。

2025-06-02 02:11:41 +0000 [warn]: dump an error event: error_class=ArgumentError error="target_field does not exist" location=nil tag="input.log" time=2025-06-02 02:11:41.099968812 +0000 record={"record_id"=>"rec_json_test", "other_data"=>"original_value", "label_name"=>"LABEL_B_OBSERVE_EFFECT"}

(上記のような出力と警告ログが1秒ごとに繰り返されます)

この振る舞いは以下の背景があります。
remove_key_name_field true の設定では、LABEL_A の parser フィルターは、target_field(元のJSON文字列)をレコードから削除します。
LABEL_B がレコードを参照する際、LABEL_A によって target_field が一度「削除」された影響を直接受けてしまうのです。

LABEL_A でパース結果として新しい target_field が再追加されるタイミングや、Fluentd内部での変更の伝播の仕方によっては、LABEL_B から見ると target_field が存在しない状態として観測されてしまいます。

Fluentd の@type copyプラグインについて

@type copy はデフォルトでは基本的に同じレコード(データ)への参照を複数の処理経路に渡します。
そのため、分岐先でデータの変更を行うのであれば copy_mode を deep にしておかないと他の経路に影響が出る可能性があります。

十分なテストと監視:

設定変更後は、必ず意図した通りにデータが処理されているか、データフローの各所で確認しましょう。また、エラーログやワーニングログを監視し、早期に問題を発見できる体制を整えることも重要です。

Fluentdは非常に強力で柔軟なツールですが、その内部的な動作原理を理解することで、より安定した、信頼性の高いデータパイプラインを構築できます。今回の記事が、皆さんのFluentdを用いたシステム開発や運用の一助となれば幸いです。

TRUSTDOCK テックブログ

Discussion