Fluentd @type copy利用時のデータ共有の挙動と注意点
足立区の花火大会、今年も楽しみにしていたのですが…まさかの直前中止。夜空に咲くはずだった大輪の花火を想い、PCの画面に咲く大量のログを眺めるSREチームの塚田です。さて、ログといえばFluentd。その柔軟性と拡張性の高さは魅力ですが、設定によっては思わぬ挙動に遭遇することもあります。特に、イベントを複数の経路に分岐させる @type copy
と、データ構造を加工する parser フィルターを組み合わせた際に発生しがちな「データの参照共有」の問題は、気づかぬうちにデータ欠損を引き起こす可能性も。
本記事では、この現象がなぜ起こるのか、具体的な検証コードとログを交えながら解説させていただければと思います。
Fluentdにおけるイベント処理の基本
まず、Fluentdがデータをどのように扱うか、基本をおさらいできればと思います。
Fluentdは、入力されたデータ(イベント)を「レコード」という単位で処理します。レコードとは、キーと値のペアが集まったもので、プログラミングにおけるハッシュやJSONオブジェクトのようなものとイメージしてください。
例えば、以下のようなレコードがあったとします。
{
"record_id": "rec_json_test",
"target_field": "{\"message\":\"hello fluentd\", \"count\":100}",
"other_data": "original_value"
}
この例では、target_field の値がJSON文字列として格納されています。これをパースして構造化データとして扱いたい、といったケースはよくあるかと思います。
イベントを分岐させる @type copy プラグイン
Fluentdの @type copy
プラグインは、1つのイベントレコードを、設定された複数の処理経路(例えば、異なるフィルター処理や出力先)に送る機能を提供します。これにより、同じデータを異なる目的で並行して処理できます。
ここで非常に重要なポイントがあります。@type copy
は、多くの場合、イベントレコードの完全なコピー(複製)をそれぞれの経路に渡すわけではありません。同じ1つのレコードデータへの「参照」(データの実際の置き場所を指し示す情報のようなもの)を渡す振る舞いをデフォルトとして採用しています。
検証環境の準備
今回の挙動を実際に確認するため、以下のDockerfileとFluentd設定ファイルを用意しました。
# ベースOSイメージとしてUbuntu 22.04 (Jammy) を使用
FROM ubuntu:22.04
ENV DEBIAN_FRONTEND=noninteractive
RUN apt-get update && apt-get install -y --no-install-recommends \
curl \
ca-certificates \
&& rm -rf /var/lib/apt/lists/*
# fluent-packageの.debファイルをダウンロード
WORKDIR /tmp
RUN curl -fSL -O https://s3.amazonaws.com/packages.treasuredata.com/lts/5/ubuntu/jammy/pool/contrib/f/fluent-package/fluent-package_5.0.7-1_arm64.deb
RUN apt-get update && apt-get install -y --no-install-recommends ./fluent-package_5.0.7-1_arm64.deb \
&& rm -f ./fluent-package_5.0.7-1_arm64.deb \
&& rm -rf /var/lib/apt/lists/*
# ローカルのfluentd.confをコンテナの/etc/fluent/fluentd.confにコピー
COPY fluentd.conf /etc/fluent/fluentd.conf
# 適宜パーミッションを設定
RUN mkdir -p /var/run/fluentd && chown _fluentd:_fluentd /var/run/fluentd
USER _fluentd
CMD ["/usr/sbin/fluentd", "-c", "/etc/fluent/fluentd.conf", "--no-supervisor"]
このDockerfileと、後述する fluentd.conf を用意し、以下のコマンドでDockerイメージをビルド・実行します。
docker build -t fluent-package-ubuntu-repro-test . --no-cache
docker run --rm fluent-package-ubuntu-repro-test
ケース1: 影響が出ない(ように見える)パターン (remove_key_name_field false)
まずは、parser フィルターの remove_key_name_field パラメータを false に設定した場合の挙動を見てみましょう。これは、パース対象となった元のフィールドをレコードから削除「しない」設定です。
# 1. ダミーの入力データを生成
# target_field の値を有効なJSON文字列にします
<source>
@type dummy
tag input.log
dummy {"record_id": "rec_json_test", "target_field": "{\"message\":\"hello fluentd\", \"count\":100}", "other_data": "original_value"}
rate 1 # 1秒に1イベント送信
</source>
# 2. イベントを2つのラベルにコピーしてルーティング
<match input.log>
@type copy
<store>
@type relabel
@label @LABEL_A_PROCESS_WITH_PARSER # parser処理を行うラベル
</store>
<store>
@type relabel
@label @LABEL_B_OBSERVE_EFFECT # LABEL_Aの影響を確認するラベル
</store>
</match>
# 3. ラベルA: parserフィルターで target_field を処理
# (target_field が文字列からJSONオブジェクトに変換されることを期待)
<label @LABEL_A_PROCESS_WITH_PARSER>
<filter **> # このラベルに来た全イベントを対象
@type parser
key_name target_field # このキーの値をパース
reserve_data true # パース成功時は特に影響しないが、念のため残しても良い
remove_key_name_field false # ★元の文字列としてのtarget_fieldを処理
hash_value_field target_field # ★パースされたJSONオブジェクトを同じtarget_fieldキーに格納
# emit_invalid_record_to_error false # パース成功時は特に影響しない
<parse>
@type json
# ignore_parser_error true # パース成功時は特に影響しない
</parse>
</filter>
# ラベル名をレコードに追加
<filter **>
@type record_modifier
<record>
label_name LABEL_A_PROCESS_WITH_PARSER
</record>
</filter>
# ラベルAの処理後のレコードを標準出力
<match **>
@type stdout
<format>
@type json # 出力をJSON形式に
</format>
</match>
</label>
# 4. ラベルB: ラベルAでの処理の影響を確認
# (もし参照が共有されていれば、ここでも target_field がJSONオブジェクトになっていることを期待)
<label @LABEL_B_OBSERVE_EFFECT>
# ラベル名をレコードに追加
<filter **>
@type record_modifier
<record>
label_name LABEL_B_OBSERVE_EFFECT
</record>
</filter>
# ラベルBに渡ってきたレコードをそのまま標準出力
<match **>
@type stdout
<format>
@type json # 出力をJSON形式に
</format>
</match>
</label>
実行結果:
{"record_id":"rec_json_test","target_field":{"message":"hello fluentd","count":100},"other_data":"original_value","label_name":"LABEL_A_PROCESS_WITH_PARSER"}
{"record_id":"rec_json_test","target_field":"{\"message\":\"hello fluentd\", \"count\":100}","other_data":"original_value","label_name":"LABEL_B_OBSERVE_EFFECT"}
(上記のような出力が1秒ごとに繰り返されます)
この結果を見ると、
LABEL_A_PROCESS_WITH_PARSER では、target_field が期待通りJSONオブジェクトにパースされています。
LABEL_B_OBSERVE_EFFECT では、target_field は元のJSON文字列のままです。
この時点では、LABEL_A での処理が LABEL_B に影響を与えず、それぞれ独立して動作しているように見えます。これは、元のフィールドを「削除」せず、同名キーにパース結果を「上書き」するような形で処理されたため、分岐先での見え方に影響が出なかった(あるいは出にくかった)と考えられます。
ケース2: 影響が出てしまうパターン (remove_key_name_field true)
次に、remove_key_name_field パラメータを true に変更してみましょう。これは、パース対象となった元のフィールドをレコードから削除「する」設定です。
fluentd.conf (LABEL_Aのfilter部分のみ変更):
# 1. ダミーの入力データを生成
# target_field の値を有効なJSON文字列にします
<source>
@type dummy
tag input.log
dummy {"record_id": "rec_json_test", "target_field": "{\"message\":\"hello fluentd\", \"count\":100}", "other_data": "original_value"}
rate 1 # 1秒に1イベント送信
</source>
# 2. イベントを2つのラベルにコピーしてルーティング
<match input.log>
@type copy
<store>
@type relabel
@label @LABEL_A_PROCESS_WITH_PARSER # parser処理を行うラベル
</store>
<store>
@type relabel
@label @LABEL_B_OBSERVE_EFFECT # LABEL_Aの影響を確認するラベル
</store>
</match>
# 3. ラベルA: parserフィルターで target_field を処理
# (target_field が文字列からJSONオブジェクトに変換されることを期待)
<label @LABEL_A_PROCESS_WITH_PARSER>
<filter **> # このラベルに来た全イベントを対象
@type parser
key_name target_field # このキーの値をパース
reserve_data true # パース成功時は特に影響しないが、念のため残しても良い
remove_key_name_field true # ★元の文字列としてのtarget_fieldを処理
hash_value_field target_field # ★パースされたJSONオブジェクトを同じtarget_fieldキーに格納
# emit_invalid_record_to_error false # パース成功時は特に影響しない
<parse>
@type json
# ignore_parser_error true # パース成功時は特に影響しない
</parse>
</filter>
# ラベル名をレコードに追加
<filter **>
@type record_modifier
<record>
label_name LABEL_A_PROCESS_WITH_PARSER
</record>
</filter>
# ラベルAの処理後のレコードを標準出力
<match **>
@type stdout
<format>
@type json # 出力をJSON形式に
</format>
</match>
</label>
# 4. ラベルB: ラベルAでの処理の影響を確認
# (もし参照が共有されていれば、ここでも target_field がJSONオブジェクトになっていることを期待)
<label @LABEL_B_OBSERVE_EFFECT>
# ラベル名をレコードに追加
<filter **>
@type record_modifier
<record>
label_name LABEL_B_OBSERVE_EFFECT
</record>
</filter>
# ラベルBに渡ってきたレコードをそのまま標準出力
<match **>
@type stdout
<format>
@type json # 出力をJSON形式に
</format>
</match>
</label>
LABEL_A_PROCESS_WITH_PARSER では、target_field が空のJSONオブジェクト {} として存在しています(元のフィールドが削除された後、パース結果が同名で再格納されたため)。
しかし、変更したのは LABEL_A_PROCESS_WITH_PARSER であるのにも関わらず、LABEL_B_OBSERVE_EFFECT では、target_field 自体が消失してしまいました。
標準出力は以下のようになります。
{"record_id":"rec_json_test","other_data":"original_value","label_name":"LABEL_A_PROCESS_WITH_PARSER","target_field":{}}
{"record_id":"rec_json_test","other_data":"original_value","label_name":"LABEL_B_OBSERVE_EFFECT"}
そして、Fluentdの警告ログには以下のようなメッセージが出力されます。
2025-06-02 02:11:41 +0000 [warn]: dump an error event: error_class=ArgumentError error="target_field does not exist" location=nil tag="input.log" time=2025-06-02 02:11:41.099968812 +0000 record={"record_id"=>"rec_json_test", "other_data"=>"original_value", "label_name"=>"LABEL_B_OBSERVE_EFFECT"}
(上記のような出力と警告ログが1秒ごとに繰り返されます)
この振る舞いは以下の背景があります。
remove_key_name_field true の設定では、LABEL_A の parser フィルターは、target_field(元のJSON文字列)をレコードから削除します。
LABEL_B がレコードを参照する際、LABEL_A によって target_field が一度「削除」された影響を直接受けてしまうのです。
LABEL_A でパース結果として新しい target_field が再追加されるタイミングや、Fluentd内部での変更の伝播の仕方によっては、LABEL_B から見ると target_field が存在しない状態として観測されてしまいます。
@type copy
プラグインについて
Fluentd の@type copy
はデフォルトでは基本的に同じレコード(データ)への参照を複数の処理経路に渡します。
そのため、分岐先でデータの変更を行うのであれば copy_mode を deep にしておかないと他の経路に影響が出る可能性があります。
十分なテストと監視:
設定変更後は、必ず意図した通りにデータが処理されているか、データフローの各所で確認しましょう。また、エラーログやワーニングログを監視し、早期に問題を発見できる体制を整えることも重要です。
Fluentdは非常に強力で柔軟なツールですが、その内部的な動作原理を理解することで、より安定した、信頼性の高いデータパイプラインを構築できます。今回の記事が、皆さんのFluentdを用いたシステム開発や運用の一助となれば幸いです。
Discussion