🔮

Logstash の代わりに Vector でログをフィルタリングする

2025/01/28に公開

以前に書いた以下の記事では filebeat, losgtash, elasticsearch, kibana を使って k8s クラスタ上のログを収集・可視化する方法を紹介しました。

https://zenn.dev/zenogawa/articles/k8s_filebeat

上記の組み合わせは ELK スタックと呼ばれ、elasticsearch をログの分析・可視化基盤として使う際によく使用される組み合わせになっています。参考: https://aws.amazon.com/jp/what-is/elk-stack/

機能的にはこれで十分なのですが、logstash には以下のような欠点?もあります。

  • ログの parse や整形処理を記述する際に独自の conf ファイルを使用するため初期の学習コストがやや高い
  • ドキュメント の記述量が多く、設定の調整やトラブルシューティングが難しいことがある。
  • jvm ベースのアプリケーションでなのでリソース使用量が多くなることがある(規模や設定にもよりますが)。

そこで logstash の機能が代替できる Product がないか探していたところ Vector が良さそうだったので、今回は logstash の代わりに vector を使ってログの収集・フィルタリング・整形を行う方法を調べてみます。

概要

Vector とは

https://vector.dev/
https://github.com/vectordotdev/vector

vector は observability パイプラインを構成するための Rust 製 OSS ツールとなっています。エージェントや aggregator として機能し、ログだけでなくメトリクスやトレースなどのデータを収集し、加工・整形して様々なデータストアに送信することができます。ざっくりまとめると filebeat や fluentd、 prometheus の exporter などの機能を組み合わせたツールといったイメージです。

Rust 製だけあってパフォーマンスの高さも強調されており、github comparison に filebeat や logstash, fluentd など既存のエージェントツールとの比較が記載されています。

Test Vector Filebeat FluentBit FluentD Logstash SplunkUF SplunkHF
TCP to Blackhole 86mib/s n/a 64.4mib/s 27.7mib/s 40.6mib/s n/a n/a
File to TCP 76.7mib/s 7.8mib/s 35mib/s 26.1mib/s 3.1mib/s 40.1mib/s 39mib/s
Regex Parsing 13.2mib/s n/a 20.5mib/s 2.6mib/s 4.6mib/s n/a 7.8mib/s
TCP to HTTP 26.7mib/s n/a 19.6mib/s <1mib/s 2.7mib/s n/a n/a
TCP to TCP 69.9mib/s 5mib/s 67.1mib/s 3.9mib/s 10mib/s 70.4mib/s 7.6mib/s

もちろん具体的な数値に関しては環境や設定によって変わるので vector が最適とは断言できませんが、総合的に見て既存のツールよりも高い性能が出そうな雰囲気が伺えます。

デプロイロール

Vector ではログを収集するために Daemon, Sidecar, Aggregator という 3 つのデプロイパターンが提供されており、ドキュメントでは Deployment Role と呼んでいます。

Daemon


引用: https://vector.dev/docs/setup/deployment/roles/

Daemon ではノード毎に vector をデプロイして、ノード上の /var/log/pod などに出力された pod ログやその他 node レベルのログを収集・送信する方式です。これは前記事の filebeat と同様の方式で、効率よくログを収集できるため基本的にこれが推奨されるとのこと。

Sidecar


引用: https://vector.dev/docs/setup/deployment/roles/

Sidecar は k8s の Sidecar container with a logging agent のように pod 内に vector コンテナを入れてログを収集する方式です。基本的には上記の Daemon 方式が推奨されていますが、アプリケーションコンテナが stdout/stderr ではなくファイルに出力したログを収集する際など一部のユースケースでは有効。

Aggregator


引用: https://vector.dev/docs/setup/deployment/roles/

Aggregator は上記の 2 つの方式とは異なり、別のデータソースから受信したログの整形、フィルタリング、集計などを行って別のデータソースに送信する役割となっています。前回の構成でいうと logstash の部分に相当。

ドキュメントにも書いてありますが、vector は filebeat のような logging エージェントの役割だけでなく logstash や elasticsearch のような aggregator の機能も持っていることが特徴的となっています。

For Vector, this role should be reserved for exactly that: cross-host aggregation and analysis. Vector is unique in the fact that it can serve both as an Agent and an Aggregator. This makes it possible to distribute processing along the edge (recommended).


daemon, aggregator については helm でインストールする際のパラメータで切り替えることが可能です。参考: https://vector.dev/docs/setup/installation/package-managers/helm/
sidecar は自分で vector 設定ファイルと vector pod の定義を既存の pod に追加してデプロイする方法になっています。以下を参考。

検証構成

今回は logstash の機能を vector で代替するため、前の記事と同様に filebeat が k8s クラスタから収集したログを vector で受信し、elasticsearch に送信する構成を検証します。


構成図

vector にも kubernetes logs を収集できる機能 があるので filebeat も置き換えることができますが、ひとまず上記の構成で動作を見ていきます。上記の vector はデプロイロールでいうと aggregator に相当します。

インストール

vector は 以下の 3 通り で k8s クラスタ上にデプロイできます。

  • マニフェストを直接適用する
  • helm でインストールする
  • vector operator でインストールする

最初は vector operator を使ったインストール方法を試しましたが後述の vector 設定がうまく反映されない問題があったので、ここでは helm を使ってインストールします。
設定のカスタマイズを行うため helm chart の values をファイルに書き出します。

helm repo add vector https://helm.vector.dev
helm show values vector/vector > values.yml

vector 設定のカスタマイズ

filebeat から送信されたログを vector で受け取るためには logstash source を利用します。

https://vector.dev/docs/reference/configuration/sources/logstash/

Sources の名前は logstash となっていますが、 example に記載の通り filebeat や heartbeat などの beat agent から直接データを受け取れるようになっているため、logstash を利用しない場合もこの source でログを受信できます。設定項目は type と受信を待ち受ける address を指定すれば ok.

    sources:
      filebeat:
        address: 0.0.0.0:9000
        type: logstash

vector では上記のようなデータの送受信や加工に関する設定は configuration に記載します。helm chart によるインストールではデフォルトの configuration が自動で configmap として作成されますが、 values の existingConfigMapscustomConfig を通じて設定をカスタマイズできるようになっています。

values.yml
# existingConfigMaps -- List of existing ConfigMaps for Vector's configuration instead of creating a new one. Requires
# dataDir to be set. Additionally, containerPorts, service.ports, and serviceHeadless.ports should be specified based on
# your supplied configuration. If set, this parameter takes precedence over customConfig and the chart's default configs.
existingConfigMaps: []

# customConfig -- Override Vector's default configs, if used **all** options need to be specified. This section supports
# using helm templates to populate dynamic values. See Vector's [configuration documentation](https://vector.dev/docs/reference/configuration/)
# for all options.
customConfig: {}
  # data_dir: /vector-data-dir

helm での指定方法はこちらも参照。
https://vector.dev/highlights/2021-07-13-helm-customconfig/

どちらを使ってもいいですが、ここでは事前に設定ファイルを configmap として作成して existingConfigMaps に指定する方法にします。

vector が受信したログは前回と同様に elasticsearch に送信するため、elasticsearch sink を使ってログ送信します。

パラメータは次のように設定。

  • bulk.index: ログを保存する elasticsearch の index 名
  • endpoints: elasticsearch のホスト名とポート番号
  • inputs: ログを受信する source の名前。ここでは上記の source の filebeat を指定
  • type: elasticsearch を指定
  sinks:
    elasticsearch:
      type: elasticsearch
      bulk:
        index: vector-%Y-%m-%d
      endpoints:
      - http://192.168.3.180:9200
      inputs:
      - filebeat

ここでは宛先 elasticsearch は http 接続、認証なしの設定にしています。認証や https で通信する場合は auth など対応する項目を追加する必要があります。

ここまでの設定項目をまとめると、vector の configuration に対応する configmap は以下のようになります(api 等はデフォルト設定をもとに指定)。

vector.yml
apiVersion: v1
kind: ConfigMap
metadata:
  name: vector
  namespace: vector
data:
  vector.yaml: |
    api:
      address: 127.0.0.1:8686
      enabled: true
      playground: true
    data_dir: /vector-data-dir
    sources:
      filebeat:
        address: 0.0.0.0:9000
        type: logstash
    sinks:
      elasticsearch:
        bulk:
          index: vector-%Y-%m-%d
        endpoints:
        - http://192.168.3.180:9200
        inputs:
        - filebeat
        type: elasticsearch

kubectl apply -f vector.yml で configmap を作成。
values.yml では上記で作成した configmap 名を existingConfigMaps に指定します。また、existingConfigMaps を指定する場合は以下の設定も必要になります。

  • dataDir: vector のデータディレクトリ
  • services.ports: vector pod に関連付けられる service の port 設定。ここでは api と source logstash の port を指定。
values.yml
existingConfigMaps:
  - vector

services:
   ports:
     - name: api
       port: 8686
       protocol: TCP
       targetPort: 8686
     - name: logstash
       port: 9000
       protocol: TCP
       targetPort: 9000

dataDir: /vector-data-dir

まだログの parse 処理は設定していませんが、いったん vector をデプロイして動作を確認します。

helm install vector vector/vector \
  --namespace vector \
  --create-namespace \
  --values values.yml

これにより vector pod が起動し、filebeat からのログを受信して elasticsearch に送信する設定が完了します。pod ログより port 9000 で beat からの通信を待ち受けていることが確認できます。

$ k logs vector-0 vector
2025-01-27T10:54:31.534682Z  INFO vector::app: Log level is enabled. level="info"
2025-01-27T10:54:31.535147Z  INFO vector::app: Loading configs. paths=["/etc/vector"]
2025-01-27T10:54:31.564484Z  INFO vector::topology::running: Running healthchecks.
2025-01-27T10:54:31.564569Z  INFO vector: Vector has started. debug="false" version="0.44.0" arch="x86_64" revision="3cdc7c3 2025-01-13 21:26:04.735691656"
2025-01-27T10:54:31.564734Z  INFO source{component_kind="source" component_id=filebeat component_type=logstash}: vector::sources::util::net::tcp: Listening. addr=0.0.0.0:9000
2025-01-27T10:54:31.565351Z  INFO vector::internal_events::api: API server running. address=127.0.0.1:8686 playground=http://127.0.0.1:8686/playground graphql=http://127.0.0.1:8686/graphql
2025-01-27T10:54:31.566941Z  INFO vector::topology::builder: Healthcheck passed.

filebeat の設定

filebeat は 前回の記事 と同じで helm を使ってデプロイします。
設定は上記記事とほぼ同じですが、output には logstash を設定し、vector の svc 名、port を指定します。

      output.logstash:
        host: '${NODE_NAME}'
        hosts: ["vector:9000"]

values.yml の中身は以下

values.yml
values.yml
daemonset:
  ...
  extraEnvs: []
#    - name: "ELASTICSEARCH_USERNAME"
#      valueFrom:
#        secretKeyRef:
#          name: elasticsearch-master-credentials
#          key: username
#    - name: "ELASTICSEARCH_PASSWORD"
#      valueFrom:
#        secretKeyRef:
#          name: elasticsearch-master-credentials
#          key: password

 filebeatConfig:
    filebeat.yml: |
      filebeat.inputs:
      - type: filestream
        paths:
          - /var/log/syslog
        include_lines:
          - 'kubelet'
        tags:
          - kubelet
        processors:
        - add_fields:
            target: ""
            fields:
              cluster: k8s-control
      - type: container
        paths:
          - /var/log/containers/*.log
        processors:
        - add_kubernetes_metadata:
            host: ${NODE_NAME}
            matchers:
            - logs_path:
                logs_path: "/var/log/containers/"
        - add_fields:
            target: ""
            fields:
              cluster: k8s-cluster
      output.logstash:
        host: '${NODE_NAME}'
        hosts: ["vector:9000"]

  secretMounts: []
#    - name: elasticsearch-master-certs
#      secretName: elasticsearch-master-certs
#      path: /usr/share/filebeat/certs/
  #  - name: filebeat-certificates
  #    secretName: filebeat-certificates
  #    path: /usr/share/filebeat/certs

filebeat をデプロイ。

helm install filebeat elastic/filebeat \
  --namespace vector \
  --values values.yml

ログの確認

これで k8s pod のログが filebeat → vector → elasticsearch に送信されるので kibana から確認できるようになります。
ログの整形や加工はまだ行っていないので、送信されたログは生のログメッセージ + filebeat や vector によって設定されたフィールドが含まれています。試しに etcd pod のログを見てみると以下のような内容となっています。

etcd のログ

pod のメッセージは message に対応

_index: vector-2025-01-26
_id: 9_KeopQBmjaOKqG9cLZE
_version: 1
_ignored:
  - message.keyword
_source:
  '@metadata':
    beat: filebeat
    type: _doc
    version: 8.5.1
  '@timestamp': "2025-01-26T12:37:54.567Z"
  agent:
    ephemeral_id: 6850ba87-f4f3-4534-b935-e7a9b2e4e99c
    id: 3265a7ee-08db-42e7-8224-299ca0582b39
    name: filebeat-filebeat-zxm9v
    type: filebeat
    version: 8.5.1
  cluster: k8s-cluster
  container:
    id: f1ef45444698b5dd23745bd92452e431819fa5d203c27272b8c26869968a8ccd
    image:
      name: registry.k8s.io/etcd:3.5.15-0
    runtime: containerd
  ecs:
    version: 8.0.0
  host:
    name: filebeat-filebeat-zxm9v
  input:
    type: container
  kubernetes:
    container:
      name: etcd
    labels:
      component: etcd
      tier: control-plane
    namespace: kube-system
    namespace_labels:
      kubernetes_io/metadata_name: kube-system
    namespace_uid: e366417f-78ef-4175-a721-d78a7602481a
    node:
      hostname: k8s-m1
      labels:
        beta_kubernetes_io/arch: amd64
        beta_kubernetes_io/os: linux
        kubernetes_io/arch: amd64
        kubernetes_io/hostname: k8s-m1
        kubernetes_io/os: linux
        node-role_kubernetes_io/control-plane: ""
        node_kubernetes_io/exclude-from-external-load-balancers: ""
      name: k8s-m1
      uid: e76bf0f2-d222-4233-9cd3-f39e03fc2091
    pod:
      ip: 10.0.0.40
      name: etcd-k8s-m1
      uid: fc260fad-d780-4ac2-be72-8af1cb125e8b
  log:
    file:
      path: /var/log/containers/etcd-k8s-m1_kube-system_etcd-f1ef45444698b5dd23745bd92452e431819fa5d203c27272b8c26869968a8ccd.log
    offset: 5162710
  message: '{"level":"info","ts":"2025-01-26T12:37:54.566933Z","caller":"traceutil/trace.go:171","msg":"trace[1718877744] transaction","detail":"{read_only:false; response_revision:4895789; number_of_response:1; }","duration":"132.491495ms","start":"2025-01-26T12:37:54.434417Z","end":"2025-01-26T12:37:54.566908Z","steps":["trace[1718877744] ''process raft request''  (duration: 109.983011ms)","trace[1718877744] ''compare''  (duration: 22.392708ms)"],"step_count":2}'
  source_type: logstash
  stream: stderr
  timestamp: "2025-01-26T12:37:54.567Z"
fields:
  agent.version.keyword:
    - 8.5.1
  cluster:
    - k8s-cluster
  kubernetes.node.uid:
    - e76bf0f2-d222-4233-9cd3-f39e03fc2091
  kubernetes.namespace_uid.keyword:
    - e366417f-78ef-4175-a721-d78a7602481a
  host.name.keyword:
    - filebeat-filebeat-zxm9v
  kubernetes.namespace_uid:
    - e366417f-78ef-4175-a721-d78a7602481a
  kubernetes.node.labels.kubernetes_io/os:
    - linux
  container.id:
    - f1ef45444698b5dd23745bd92452e431819fa5d203c27272b8c26869968a8ccd
  kubernetes.labels.component.keyword:
    - etcd
  kubernetes.node.labels.kubernetes_io/os.keyword:
    - linux
  ecs.version.keyword:
    - 8.0.0
  container.image.name:
    - registry.k8s.io/etcd:3.5.15-0
  kubernetes.container.name.keyword:
    - etcd
  kubernetes.namespace:
    - kube-system
  kubernetes.node.labels.beta_kubernetes_io/os:
    - linux
  kubernetes.pod.name.keyword:
    - etcd-k8s-m1
  agent.name:
    - filebeat-filebeat-zxm9v
  host.name:
    - filebeat-filebeat-zxm9v
  kubernetes.node.labels.kubernetes_io/hostname.keyword:
    - k8s-m1
  agent.id.keyword:
    - 3265a7ee-08db-42e7-8224-299ca0582b39
  kubernetes.node.labels.node-role_kubernetes_io/control-plane.keyword:
    - ""
  input.type:
    - container
  kubernetes.node.uid.keyword:
    - e76bf0f2-d222-4233-9cd3-f39e03fc2091
  source_type:
    - logstash
  log.offset:
    - 5162710
  '@metadata.version':
    - 8.5.1
  container.runtime:
    - containerd
  agent.id:
    - 3265a7ee-08db-42e7-8224-299ca0582b39
  '@metadata.type':
    - _doc
  ecs.version:
    - 8.0.0
  '@metadata.beat':
    - filebeat
  kubernetes.node.labels.node-role_kubernetes_io/control-plane:
    - ""
  agent.version:
    - 8.5.1
  kubernetes.labels.tier.keyword:
    - control-plane
  kubernetes.namespace.keyword:
    - kube-system
  kubernetes.node.name:
    - k8s-m1
  input.type.keyword:
    - container
  stream.keyword:
    - stderr
  kubernetes.node.hostname:
    - k8s-m1
  '@metadata.version.keyword':
    - 8.5.1
  kubernetes.node.name.keyword:
    - k8s-m1
  kubernetes.pod.uid:
    - fc260fad-d780-4ac2-be72-8af1cb125e8b
  kubernetes.node.hostname.keyword:
    - k8s-m1
  '@metadata.beat.keyword':
    - filebeat
  agent.type:
    - filebeat
  stream:
    - stderr
  kubernetes.node.labels.kubernetes_io/arch.keyword:
    - amd64
  kubernetes.pod.name:
    - etcd-k8s-m1
  source_type.keyword:
    - logstash
  container.image.name.keyword:
    - registry.k8s.io/etcd:3.5.15-0
  log.file.path.keyword:
    - /var/log/containers/etcd-k8s-m1_kube-system_etcd-f1ef45444698b5dd23745bd92452e431819fa5d203c27272b8c26869968a8ccd.log
  agent.type.keyword:
    - filebeat
  kubernetes.pod.ip:
    - 10.0.0.40
  timestamp:
    - "2025-01-26T12:37:54.567Z"
  agent.ephemeral_id.keyword:
    - 6850ba87-f4f3-4534-b935-e7a9b2e4e99c
  kubernetes.container.name:
    - etcd
  agent.name.keyword:
    - filebeat-filebeat-zxm9v
  kubernetes.node.labels.beta_kubernetes_io/arch.keyword:
    - amd64
  kubernetes.namespace_labels.kubernetes_io/metadata_name:
    - kube-system
  cluster.keyword:
    - k8s-cluster
  message:
    - '{"level":"info","ts":"2025-01-26T12:37:54.566933Z","caller":"traceutil/trace.go:171","msg":"trace[1718877744] transaction","detail":"{read_only:false; response_revision:4895789; number_of_response:1; }","duration":"132.491495ms","start":"2025-01-26T12:37:54.434417Z","end":"2025-01-26T12:37:54.566908Z","steps":["trace[1718877744] ''process raft request''  (duration: 109.983011ms)","trace[1718877744] ''compare''  (duration: 22.392708ms)"],"step_count":2}'
  kubernetes.labels.tier:
    - control-plane
  kubernetes.node.labels.kubernetes_io/hostname:
    - k8s-m1
  kubernetes.labels.component:
    - etcd
  kubernetes.node.labels.beta_kubernetes_io/arch:
    - amd64
  '@timestamp':
    - "2025-01-26T12:37:54.567Z"
  kubernetes.pod.uid.keyword:
    - fc260fad-d780-4ac2-be72-8af1cb125e8b
  container.runtime.keyword:
    - containerd
  kubernetes.namespace_labels.kubernetes_io/metadata_name.keyword:
    - kube-system
  kubernetes.node.labels.node_kubernetes_io/exclude-from-external-load-balancers.keyword:
    - ""
  log.file.path:
    - /var/log/containers/etcd-k8s-m1_kube-system_etcd-f1ef45444698b5dd23745bd92452e431819fa5d203c27272b8c26869968a8ccd.log
  agent.ephemeral_id:
    - 6850ba87-f4f3-4534-b935-e7a9b2e4e99c
  kubernetes.node.labels.kubernetes_io/arch:
    - amd64
  container.id.keyword:
    - f1ef45444698b5dd23745bd92452e431819fa5d203c27272b8c26869968a8ccd
  kubernetes.node.labels.beta_kubernetes_io/os.keyword:
    - linux
  kubernetes.node.labels.node_kubernetes_io/exclude-from-external-load-balancers:
    - ""
  kubernetes.pod.ip.keyword:
    - 10.0.0.40
  '@metadata.type.keyword':
    - _doc
ignored_field_values:
  message.keyword:
    - '{"level":"info","ts":"2025-01-26T12:37:54.566933Z","caller":"traceutil/trace.go:171","msg":"trace[1718877744] transaction","detail":"{read_only:false; response_revision:4895789; number_of_response:1; }","duration":"132.491495ms","start":"2025-01-26T12:37:54.434417Z","end":"2025-01-26T12:37:54.566908Z","steps":["trace[1718877744] ''process raft request''  (duration: 109.983011ms)","trace[1718877744] ''compare''  (duration: 22.392708ms)"],"step_count":2}'

ログを parse する

vector を介したログ送受信が確認できたので、次に vector を使ってログのフィルタリングや整形をやっていきます。vector では transform で受信したデータを送信する前にフィルタリングや整形、集計などの操作を実行できるようになっています。
transform にもいくつか種類がありますが、ログデータの抽出・加工には Vector Remap Language (VRL) による remap を使うのが有効です。詳細はこちら https://vector.dev/blog/vector-remap-language/

filebeat から受信したログを parse するにはログの形式に合わせた VRL を記述する必要がありますが、VRL playground ではブラウザ上で動作を確認できるようになっているので、まずはこれで対象ログの形式と VRL の動作を確認するところから始めるのがおすすめです。

例えば、以下のような k8s pod のログを見やすくするためにフィールド毎に抽出することを考えます。

{
  "message": "{\"level\":\"info\",\"ts\":\"2025-01-26T13:53:41.337903Z\",\"caller\":\"traceutil/trace.go:171\",\"msg\":\"trace[1296786419] transaction\",\"detail\":\"{read_only:false; response_revision:4909691; number_of_response:1; }\",\"duration\":\"157.30959ms\",\"start\":\"2025-01-26T13:53:41.180574Z\",\"end\":\"2025-01-26T13:53:41.337883Z\",\"steps\":[\"trace[1296786419] 'process raft request'  (duration: 156.747673ms)\"],\"step_count\":1}"
}

VRL では syslog, json, klog など有名どころのログ形式を処理するための parse_xxx 関数が事前に用意されており、ある程度は自分でログ抽出のパターンを書かなくても適切に抽出できるようになっています。

https://vector.dev/docs/reference/vrl/functions/#parse-functions
https://vector.dev/docs/reference/configuration/transforms/remap/#examples

上記のような json 形式の文字列は parse_json を使うことで key-value 形式に変換できます。その他に VRL の記述では以下のような記述でフィールドの新規追加や整形・不要なフィールドを削除できるようになっています。

VRL
.json, err = parse_json(.message)
if !is_null(err) {
    abort
}
.json_level = .json.level
.json_ts = .json.ts
.json_caller = .json.caller
.json_msg = .json.msg
.json_revision = .json.revision

del(.message)
  • .json, err = parse_json(.message) で .message の文字列を parse して json フィールドに設定
  • VRL では .[var_name] で既存のフィールドを参照する。存在しない場合は新規にフィールドを追加する。
  • もとの message は不要なので del(.message) で削除

playground 上の Program に上記の VRL を書いて Run program を実行すると、もとの message に含まれていた値が json フィールド以下に設定され、トップレベルに json_xxxx というフィールドが新規追加されることがわかります。

{
	"json": {
		"caller": "traceutil/trace.go:171",
		"detail": "{read_only:false; response_revision:4909691; number_of_response:1; }",
		"duration": "157.30959ms",
		"end": "2025-01-26T13:53:41.337883Z",
		"level": "info",
		"msg": "trace[1296786419] transaction",
		"start": "2025-01-26T13:53:41.180574Z",
		"step_count": 1,
		"steps": [
			"trace[1296786419] 'process raft request'  (duration: 156.747673ms)"
		],
		"ts": "2025-01-26T13:53:41.337903Z"
	},
	"json_caller": "traceutil/trace.go:171",
	"json_level": "info",
	"json_msg": "trace[1296786419] transaction",
	"json_revision": null,
	"json_ts": "2025-01-26T13:53:41.337903Z"
}


playground 上の実行結果

このように VRL を使うことでログの加工やフィルタリングが柔軟に行えるようになります。

etcd pod はちょうど上記の parse_json で抽出できる形式のログになっているので、実際に elasticsearch 上に送信されたログのフィールドが適切に抽出されることを確認してみます。上記の VRL を適用するため先ほど作成した configmap を以下のように変更。

vector.yml
apiVersion: v1
kind: ConfigMap
metadata:
  name: vector
  namespace: vector
data:
  vector.yaml: |
    api:
      address: 127.0.0.1:8686
      enabled: true
      playground: true
    data_dir: /vector-data-dir
    sources:
      filebeat:
        address: 0.0.0.0:9000
        type: logstash
+    transform:
+      etcd_filter:
+        type: filter
+        inputs:
+          - filebeat
+        condition: '.kubernetes.labels.component == "etcd"'
+      etcd_parse:
+        type: remap
+        inputs:
+          - filebeat
+        source: |
+          .json, err = parse_json(.message)
+          if !is_null(err) {
+              abort
+          }
+          del(.message)
    sinks:
      elasticsearch:
        bulk:
          index: vector-%Y-%m-%d
        endpoints:
        - http://192.168.3.180:9200
        inputs:
-        - filebeat
+        + etcd_parse
        type: elasticsearch
      stdout:
        encoding:
          codec: json
        inputs:
        - filebeat
        type: console

configmap を変更したら pod を一度削除して変更を適用します。

k apply -f vector.yml
k delete pod vector-0

kibana で elasticsearch に送信されたログを見てみると、たしかに json 以下のフィールドに caller や level, msg が parse されていることが確認できます。

このように VRL を使うことでログの加工やフィルタリングが柔軟に行えるようになっています。

カスタムログの parse

既存の parse_xxx に当てはまらない形式のログに関しては parse_regex を使うことで正規表現形式で各フィールドを抽出できるようになっています。
例えば、以下のような自作アプリケーションのログを vector で parse することを考えてみます。

import logging
import random
import time

# Configure logging
logging.basicConfig(
    level=logging.DEBUG,
    format="[%(asctime)s] [%(levelname)s] %(message)s",
    datefmt="%Y-%m-%dT%H:%M:%SZ",
)


def main():
    log_levels = [logging.INFO, logging.WARNING, logging.ERROR, logging.DEBUG]

    while True:
        log_level = random.choice(log_levels)
        logging.log(log_level, "test message")
        time.sleep(5)  # Wait for 5 seconds before the next log


if __name__ == "__main__":
    main()

このアプリのログは以下のような形式になっています。

[2025-01-27T13:54:19Z] [DEBUG] test message
[2025-01-27T13:54:24Z] [WARNING] test message
[2025-01-27T13:54:29Z] [ERROR] test message
[2025-01-27T13:54:34Z] [INFO] test message

これは parse_regex を使った以下のような VRL で各フィールドを抽出できます。

.|= parse_regex!(
    .message,
    r'^\[(?P<timestamp>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z)\] \[(?P<loglevel>[A-Z]+)\] (?P<logmessage>.*)$'
)

.timestamp = parse_timestamp(.timestamp, "%Y-%m-%d %H:%M:%S") ?? now()
.loglevel = downcase(.loglevel)
del(.message)

transform でログ整形用の type: remap を複数指定する場合は filter transform と condition を組み合わせることで、特定の条件を満たすログのみフィルタを適用することができます。

  • .kubernetes.labels.component == "etcd" のラベルがついているログは etcd_parse を適用
  • 上記の自作 app の .kubernetes.labels.app == "vector-sample" のラベルがついているログは myapp_parse を適用
  • 上記に一致しないログには filter を適用しない
vector.yml
    transforms:
      etcd_filter:
        type: filter
        inputs:
          - filebeat
        condition: '.kubernetes.labels.component == "etcd"'
      etcd_parse:
        type: remap
        inputs:
          - filebeat
        source: |
          .json, err = parse_json(.message)
          del(.message)
      myapp_filter:
        type: filter
        inputs:
          - filebeat
        condition: '.kubernetes.labels.app == "vector-sample"'
      myapp_parse:
        type: remap
        inputs:
          - myapp_filter
        source: |
          .|= parse_grok!(.message, "\\[%{TIMESTAMP_ISO8601:timestamp}\\] \\[%{LOGLEVEL:loglevel}\\] %{GREEDYDATA:logmessage}")
          .timestamp = parse_timestamp(.timestamp, "%Y-%m-%YT%H:%M:%SZ") ?? now()
          del(.message)
      others:
        type: filter
        inputs:
          - filebeat
        condition: '.kubernetes.labels.component != "etcd" && .kubernetes.labels.app != "vector-sample"'
    sinks:
      elasticsearch:
        type: elasticsearch
        bulk:
          index: vector-%Y-%m-%d
        endpoints:
        - http://192.168.3.180:9200
        inputs:
        - etcd_parse
        - myapp_parse
        - others

これで etcd pod のログは parse_json で、vector-sample アプリのログは parse_grok でフィールドが抽出されるようになります。
kibana でログを確認すると、どちらも想定通りにフィールドを抽出できていることが確認できます。


myapp のログ。logmessage, loglevel, timestamp が抽出されている。


etcd pod のログ。json.caller や json.msg など json フィールド以下に抽出されている。

OpenObserve にログ送信する

以前の記事では filebeat + logstash のログ送信先として elasticsearch の代わりに openobserve を使う方法について書きました。

https://zenn.dev/zenogawa/articles/k8s_openobserve

o2 用の vector sink は現時点でサポートされていませんが(issue はある)、o2 側のドキュメント に記載の通り http sink か elasticsearch sink を使うことで vector から o2 にデータを送信できるようになっています。ここでは elasticsearch sink を使って送信するため vector.yml を以下のように修正します。

vector.yml
data:
  vector.yaml: |
    sinks:
      elasticsearch:
        type: elasticsearch
        bulk:
          index: vector-%Y-%m-%d
        endpoints:
        - http://o2-openobserve-router.openobserve:5080/api/myorg # o2 の router のアドレス/api/[org]
        inputs:
        - etcd_parse
        - myapp_parse
        - others
        auth:
          strategy: basic
          user: "root@example.com" # o2 のユーザ名
          password: "Complexpass#123" # o2 のパスワード
        compression: gzip
        healthcheck:
          enabled: false

o2 は簡単のため最小構成の helm でインストールします。

https://openobserve.ai/docs/ha_deployment/#installation

kubectl apply --server-side -f \
  https://raw.githubusercontent.com/cloudnative-pg/cloudnative-pg/release-1.23/releases/cnpg-1.23.1.yaml

helm repo add openobserve https://charts.openobserve.ai
helm repo update
helm install -n openobserve o2 openobserve/openobserve --create-namespace

o2 の web UI (router) にログインして org: myorg, ストリーム: vector-[%Y-%m-%d] に設定してクエリを実行すると、elasticsearch と同様にログが確認できます。


o2 に送信された myapp のログ

o2 自体が elasticsearch との互換性が高いので sink を少し書き換えるだけで簡単に移行できます。

その他

vector でログ収集

Vector デプロイパターンで見たように Daemon では filebeat と同様に pod ログ、および node レベルのログを収集することができます。
helm インストール時に daemon でデプロイするには values.ymlrole: "Agent" を指定します。

values.yml
# Each role is created with the following workloads:
# Agent = DaemonSet
# Aggregator = StatefulSet
# Stateless-Aggregator = Deployment

role: "Agent"

control plane ノード上に配置されるように tolerations も設定

tolerations:
  - key: node-role.kubernetes.io/control-plane
    operator: Exists
    effect: NoSchedule

これによりクラスタ内の各ノードに daemonset として vector pod が作成されるようになります。
この状態で helm install した際に作成される configmap 内の 設定では以下の要素が収集されます。

configmap
data:
  agent.yaml: |
    data_dir: /vector-data-dir
    api:
      enabled: true
      address: 127.0.0.1:8686
      playground: false
    sources:
      kubernetes_logs:
        type: kubernetes_logs
      host_metrics:
        filesystem:
          devices:
            excludes: [binfmt_misc]
          filesystems:
            excludes: [binfmt_misc]
          mountpoints:
            excludes: ["*/proc/sys/fs/binfmt_misc"]
        type: host_metrics
      internal_metrics:
        type: internal_metrics
    sinks:
      prom_exporter:
        type: prometheus_exporter
        inputs: [host_metrics, internal_metrics]
        address: 0.0.0.0:9090
      stdout:
        type: console
        inputs: [kubernetes_logs]
        encoding:
          codec: json

kubernetes_logs では /var/log/pod 以下のログのみが収集対象となるのでデフォルトでは node ログは収集されません。収集するには file source で収集対象のログのパスを追加する必要があります。

ノードの /var/log/syslog を収集する例
sources:
  kube:
    type: kubernetes_logs
  syslog:
    type: file
    include:
      - /var/log/syslog

デフォルトでノード上の /var/log などを pod にマウントするように設定されるので volume や mount の設定は不要。

  Containers:
   vector:
    Image:      timberio/vector:0.44.0-distroless-libc
    Port:       <none>
    Host Port:  <none>
    Args:
      --config-dir
      /etc/vector/
    Environment:
      VECTOR_LOG:                 info
      VECTOR_SELF_NODE_NAME:       (v1:spec.nodeName)
      VECTOR_SELF_POD_NAME:        (v1:metadata.name)
      VECTOR_SELF_POD_NAMESPACE:   (v1:metadata.namespace)
      PROCFS_ROOT:                /host/proc
      SYSFS_ROOT:                 /host/sys
    Mounts:
      /etc/vector/ from config (ro)
      /host/proc from procfs (ro)
      /host/sys from sysfs (ro)
      /var/lib from var-lib (ro)
      /var/log/ from var-log (ro)
      /vector-data-dir from data (rw)

上記の設定で pod, node 上のログが収集できるようになります。config に transform も書けば 1 つの vector でログ収集・整形・送信が実行できるため、filebeat + logstash と比べてよりコンパクトな構成になります。

まとめ

vector では yaml 形式で設定を記載できるため、logstash の config 設定よりも可読性が良く、yaml なので他ツールによる format やポリシーチェックのような統合もやりやすいかと感じました。VRL remap でログの加工やフィルタリングも柔軟に行えるので機能的な面でも特に不足はなく、公表されているパフォーマンスも filebeat や logstash などの既存ツールより優れているため、総合的に見て vector は logstash からの移行先として有力な選択肢になりそうです。

Discussion