試験用のfluentdサーバーを作る
fluentdを使うシステムを作っていく過程で、そもそもfluentdにデータが送られていることをどうテストしようかと悩んだので色々調べてみた。
やりたいこと
fluentdを使うシステムを作っていく過程で、fluentdにデータが送られていることを確認するテストをサクッと書きたい。
方針
CIを行う上で出来る限りテストの時間を短くしたいので、FluentLoggerで投げたデータをなるべくリアルタイムでoutputに連携したい。
以前は自分でソケット開いて待ち受けるコードを書いていたけど、バージョン上がってSSLが絡んで面倒になったので、なるべくpluginとかでなんとかしたい。
CIを回す際にdockerを使って周囲のミドルウェアを立てることが多いので、以前に一度Elasticsearchをミドルウェアキットに入れて試してもみたが、直接呼んでないElasticsearchをわざわざ建ててテストのためにelasticsearch-ruby入れて繋ぎ込むのもだるい。。。
特に公式イメージだとシングルノードで起きないし、バージョン変わった時に全部のマイクロサービスで追従してアップデートするのが面倒なので、出来る限り追加のミドルウェアなしでテストしてたい。
方式検討
リアルタイム性
fluentdのバッファリングはいつもflush_interval
の値をチューニングしながらやってるけど、テストだったらflush_mode=immediate
で行けそう
CIでの使い勝手
今回はActiveJob等でも使ってるRedisをうまく活用できないか考えたところ、公式のプラグインリストに良さげなプラグインがあったので試してみる。
MySQLのもあったけど、FluentLoggerから送るデータ構造が割とバラバラなので、どっちかというとそのまま入れてくれるredisのプラグインの方がマッチしてた。
実際にやってみた
方式検討を踏まえつつ必要なGemのインストールとfluentdの設定を進めていく。
0. 準備
gemは本体の他にfluent-plugin-redis-storeのプラグインがあればいいので、まとめて入れてしまう
gem install fluentd fluent-plugin-redis-store --no-document
Redisのインストールについては割愛するが、brewなりdockerなりを使ってredisは建てておく。
fluentdはforwardから受け取り、redis_storeに出していくので、fluent.confはこんな感じになる。
<source>
@type forward
@id input1
@label @mainstream
port 24224
</source>
<label @mainstream>
<filter **>
@type record_transformer
<record>
tag ${tag}
</record>
</filter>
<match **>
@type copy
<store>
@type stdout
</store>
<store>
@type redis_store
key_path tag
<buffer>
flush_mode immediate
</buffer>
</store>
</match>
</label>
やってることはこんな感じ。
- sourceとしてforwardから受け取ったものに
@mainstream
のラベルをつける - filterとして
@mainstream
の全てのデータに対してtagというキーでタグの値をセットする - matchとして
@mainstream
の全てのデータに対してstdoutとredis_storeの2つへアウトプットする - redis_storeの設定として、Redisのキーとして使う値に上記のfilterでセットしたタグを設定する
- redis_storeのバッファ設定として、
flush_mode immediate
としてバッファに入った後すぐにアウトプットする
stdoutに出すのはおまけだけど、出しておくと後々デバックしやすい。
これを./fluent.conf
に書き込み、fluentdを起動する
fluentd -c fluent.conf
これで準備が整ったので、データを投げ込んでみたいところだが、Redisをモニタリングしておいた方が動きがわかりやすいので、別窓を開いてモニタリングしておく
redis-cli monitor
1. fluentdにデータを投げる
fluentdにデータを送るところは、実際のアプリケーションは fluent-logger-rubyを使ってデータを送るが、今回はfluent-cat
を使って代用してみる
echo '{ "key" : "sample" }' | bundle exe fluent-cat debug.test
これは{ "key" : "sample" }
というデータに debug.test
というタグと現在時刻のタイムスタンプを付与してデータを送信してくれるコマンド。設定しない場合はlocalhost宛になるので、上記で建てたfluentdサーバーに対してデータが送信される。
そうするとfluentd側の標準出力に下記のような出力が出る。
2020-11-03 09:37:28.016591000 +0900 debug.test: {"key":"sample","tag":"debug.test"}
これはfluent.conf
に設定した@type stdout
が出力しているもので、実際にデータを受け取った後にfilterがタグを付加してこの形になっていることが確認できる。
さらにRedisをモニタリングしている窓を見ると下記のように表示されている。
1604363848.025602 [0 172.28.0.1:40998] "zadd" "debug.test" "1604363848.016591" "{\"key\":\"sample\",\"tag\":\"debug.test\"}"
pluginのreadmeに書いてあるとおりzaddされている。keyはkey_path tag
の設定が効いていて、データに含まれているtagの値であるdebug.test
がセットされている。
2. 投げたデータの確認
Redisからデータを取得してみる。
redis-cli zrange debug.test 0 -1 withscores
1) "{\"key\":\"sample\",\"tag\":\"debug.test\"}"
2) "1604363848.016591"
タイムスタンプがスコアになってるので大量に入れてもいい感じにソートされる。
ちなみにrubyでかくとこんな感じ。
require 'redis'
Redis.new.zrange 'debug.test', 0, -1, withscores: true
=> [["{\"key\":\"sample\",\"tag\":\"debug.test\"}", 1604363848.016591]]
とりあえずシンプルに値は取れるので、割と使い勝手は良さそう。
おまけ
並列テストでの使い勝手を考えてみる
parallel_tests
やtest-queue
を使ってテストの並列実行をした際、どのテストプロセスが入れたログなのか判定する必要が出てくる。その時には送るデータにプロセスIDを含めておいて、それを使ってキーを設定する形にしてしまえばいい。
プロセスIDや時間、リクエストIDはRailsのログファイルにも入れておきたいものなので、特に害にはならないと考える。
その場合送るデータは下記のようになる
{ "key" : "sample", "pid" : 123 }
これに伴ってfluent.conf
も下記のように変更する。
<label @mainstream>
<filter **>
@type record_transformer
<record>
tag ${tag}
+ tag_with_pid '${tag}.${record["pid"]}'
</record>
</filter>
<match **>
@type copy
<store>
@type stdout
</store>
<store>
@type redis_store
- key_path tag
+ key_path tag_with_pid
<buffer>
flush_mode immediate
</buffer>
</store>
</match>
</label>
これでRedisに登録される時のキーにプロセスIDが入るので、あとはそのプロセスIDを使って自プロセスのログを特定し、それを評価することで実際にデータが送られたのかどうか判定することができるようになる。
Redisの宛先を変える
READMEに書いてあるように下記の形で設定を変えられるらしい
<label @mainstream>
<filter **>
@type record_transformer
<record>
tag ${tag}
</record>
</filter>
<match **>
@type copy
<store>
@type stdout
</store>
<store>
@type redis_store
key_path tag
+ host 10.0.0.1
+ db 11
<buffer>
flush_mode immediate
</buffer>
</store>
</match>
</label>
RSpecやCucumberで使う時のヘルパー
実際にテストで使う時にはこんな感じのヘルパー作れば楽に使えそう
# frozen_string_literal: true
require 'redis'
module FluentdLogHelper
def fetch_fluentd_log_by(tag:, pid: nil)
redis_key = pid ? "#{tag}.#{pid}" : tag
redis.zrange redis_key, 0, -1
end
def redis(options = {})
options[:db] ||= 0
@redis ||= Redis.new(options)
end
end
シンプルな呼び方
fetch_fluentd_log_by tag: 'debug.test'
プロセスIDを伴う呼び方
fetch_fluentd_log_by tag: 'debug.test', pid: Process.pid
Discussion