🐷

俺的ElasticStackチュートリアル

2022/06/05に公開

俺的なElasticStackチュートリアル作ってみた

レベル1

コンソールで入力した文字列を受け付ける

/etc/logstash/logstash.conf
input {
    stdin { }

}

output {
    stdout { codec => rubydebug }

}

実行してみる

logstashコマンドに作成したconfファイルを渡します。
いくつかWARNINGが出ていますが、お構いなく。。。

[opc@elastic ~]$ /usr/share/logstash/bin/logstash -f /etc/logstash/logstash.conf
Using bundled JDK: /usr/share/logstash/jdk
           ~略~
[INFO ] 2022-06-05 02:34:01.962 [Agent thread] agent - Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
aaa
{
    "@timestamp" => 2022-06-05T02:34:05.399462Z,
         "event" => {
        "original" => "aaa"
    },
          "host" => {
        "hostname" => "elastic-181174"
    },
       "message" => "aaa",
      "@version" => "1"
}
test
{
    "@timestamp" => 2022-06-05T02:34:37.075366Z,
         "event" => {
        "original" => "test"
    },
          "host" => {
        "hostname" => "elastic-181174"
    },
       "message" => "test",
      "@version" => "1"
}

レベル2

CSVファイルを読み取る

適当なCSVファイルを用意し、それを読み込みます。

input {
    file {
        path => ["/home/opc/test.csv"]
        start_position => "beginning"
       tags => "CSV"
    }
}
output {
    stdout { codec => rubydebug }
}

読み込ませてみる

一部抜粋
[opc@elastic ~]$ sudo /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/first-pipeline.conf
Using bundled JDK: /usr/share/logstash/jdk
           ~略~
{
      "@version" => "1",
          "host" => "elastic",
       "message" => "IK152942,平田 裕次郎,2019-01-01 00:25:33,ひらた ゆうじろう,hirata_yuujirou@example.com,M,29,1990/6/10,石川県",
          "path" => "/home/opc/test.csv",
    "@timestamp" => 2022-06-05T03:37:16.230Z,
          "tags" => [
        [0] "CSV"
    ]
}
{
      "@version" => "1",
          "host" => "elastic",
       "message" => "AS834628,久野 由樹,2019-01-01 02:00:14,ひさの ゆき,hisano_yuki@example.com,F,63,1956/1/2,茨城県",
          "path" => "/home/opc/test.csv",
    "@timestamp" => 2022-06-05T03:37:16.231Z,
          "tags" => [
        [0] "CSV"
    ]
}
{
      "@version" => "1",
          "host" => "elastic",
       "message" => "GD892565,大内 高史,2019-01-01 04:54:51,おおうち たかし,oouchi_takashi@example.com,M,54,1965/8/5,千葉県",
          "path" => "/home/opc/test.csv",
    "@timestamp" => 2022-06-05T03:37:16.232Z,
          "tags" => [
        [0] "CSV"
    ]
}
{
      "@version" => "1",
          "host" => "elastic",
       "message" =1行"customer_id,customer_name,registration_date,customer_name_kana,email,gender,age,birth,pref",
          "path" => "/home/opc/test.csv",
    "@timestamp" => 2022-06-05T03:37:16.205Z,
          "tags" => [
        [0] "CSV"
    ]
}

レベル3

CSV形式のデータを分割する

先ほどの取り込み方だと、messageに1行すべてが入っています。

"message" => "AS345469,鶴岡 薫,2019-01-01 04:48:22,つるおか かおる,tsuruoka_kaoru@example.com,M,74,1945/3/25,東京都",

CSVのカラム毎に分割して取り込めるようにします。

first-pipeline.confの編集

今回新たに[filter]を追加し、"csv"プラグを使います。

input {
    file {
        path => ["/home/opc/test.csv"]
        start_position => "beginning"
        tags => "CSV"
    }
}
filter {
    csv { }
}
output {
    stdout { codec => rubydebug }
}

CSV形式で取り込んでみた

実際に実行してみました。

一部抜粋
{
          "path" => "/home/opc/test.csv",
       "column5" => "hirata_yuujirou@example.com",
       "column9" => "石川県",
    "@timestamp" => 2022-06-05T04:27:23.435Z,
       "column6" => "M",
          "host" => "elastic",
       "column1" => "IK152942",
       "column4" => "ひらた ゆうじろう",
       "column8" => "1990/6/10",
       "column2" => "平田 裕次郎",
       "column7" => "29",
       "message" => "IK152942,平田 裕次郎,2019-01-01 00:25:33,ひらた ゆうじろう,hirata_yuujirou@example.com,M,29,1990/6/10,石川県",
          "tags" => [
        [0] "CSV"
    ],
      "@version" => "1",
       "column3" => "2019-01-01 00:25:33"
}

レベル4

不要なカラムは取り込まないようにする

必要なカラムだけ取り込みます。
まずは不要なカラムを洗い出します。
["path","@timestamp","host","message"]
それではこのカラムのみ取り出すようfirst-pipeline.confを編集していきます。

input {
    file {
        path => ["/home/opc/test.csv"]
        start_position => "beginning"
        tags => "CSV"
    }
}
filter {
    csv {
        remove_field => [ "path","@timestamp","host","message" ]
    }
}
output {
    stdout { codec => rubydebug }
}

ほしいカラムのみ収出してみた

{
    "@version" => "1",
     "column7" => "29",
     "column3" => "2019-01-01 00:25:33",
     "column2" => "平田 裕次郎",
     "column5" => "hirata_yuujirou@example.com",
     "column9" => "石川県",
     "column4" => "ひらた ゆうじろう",
     "column6" => "M",
        "tags" => [
        [0] "CSV"
    ],
     "column8" => "1990/6/10",
     "column1" => "IK152942"
}
{
    "@version" => "1",
     "column7" => "63",
     "column3" => "2019-01-01 02:00:14",
     "column2" => "久野 由樹",
     "column5" => "hisano_yuki@example.com",
     "column9" => "茨城県",
     "column4" => "ひさの ゆき",
     "column6" => "F",
        "tags" => [
        [0] "CSV"
    ],
     "column8" => "1956/1/2",
     "column1" => "AS834628"
}

おまけ

"@timestamp"に"column3"(データの日時)を反映させてみる

input {
    file {
        path => ["/home/opc/test.csv"]
        start_position => "beginning"
        sincedb_path => "/usr/share/logstash/data/plugins/inputs/file/sincedb"
        tags => "CSV"
    }
}
filter {
    csv {
        remove_field => [ "path","host","message" ]
    }
    date {
        match => [ "column3","yyyy-MM-dd HH:mm:ss" ]
        remove_field => [ "column3" ]
    }
}
output {
    stdout { codec => rubydebug }
}
Before
{
    "@version" => "1",
     "column7" => "74",
     "column3" => "2019-01-01 04:48:22",
     "column2" => "鶴岡 薫",
     "column5" => "tsuruoka_kaoru@example.com",
     "column9" => "東京都",
     "column4" => "つるおか かおる",
     "column6" => "M",
        "tags" => [
        [0] "CSV"
    ],
     "column8" => "1945/3/25",
     "column1" => "AS345469"
}
After
{
    "@timestamp" => 2019-01-01T04:48:22.000Z,
       "column1" => "AS345469",
       "column9" => "東京都",
       "column4" => "つるおか かおる",
          "tags" => [
        [0] "CSV"
    ],
       "column5" => "tsuruoka_kaoru@example.com",
       "column8" => "1945/3/25",
       "column6" => "M",
       "column2" => "鶴岡 薫",
      "@version" => "1",
       "column7" => "74"
}

レベル5

カラム名をリネームする

"column7"や"column2"はぱっと見何を意味するのか分かりません。
なので、リネームしていきます。
元々のカラムの名前は下記のとおりです。

customer_id customer_name registration_date customer_name_kana email gender age birth pref
IK152942 平田 裕次郎 2019-01-01 00:25:33 ひらた ゆうじろう hirata_yuujirou@example.com M 29 1990/6/10 石川県

first-pipeline.confの編集

input {
    file {
        path => ["/home/opc/test.csv"]
        start_position => "beginning"
        sincedb_path => "/usr/share/logstash/data/plugins/inputs/file/sincedb"
        tags => "CSV"
    }
}
filter {
    csv {
        remove_field => [ "path","host","message" ]
    }
    date {
        match => [ "column3","yyyy-MM-dd HH:mm:ss" ]
        remove_field => [ "column3" ]
    }
    mutate {
        rename => { "column1" => "customerid"}
        rename => { "column2" => "customer_name" }
        rename => { "@timestamp" => "registration_date"}
        rename => { "column4" => "customer_name_kana" }
        rename => { "column5" => "email"}
        rename => { "column6" => "gender"}
        rename => { "column7" => "age"}
        rename => { "column8" => "birth"}
        rename => { "column9" => "pref"}
    }
}
output {
    stdout { codec => rubydebug }
}
一部抜粋
{
                 "email" => "tsuruoka_kaoru@example.com",
                "gender" => "M",
     "registration_date" => 2019-01-01T04:48:22.000Z,
                  "pref" => "東京都",
            "customerid" => "AS345469",
                  "tags" => [
        [0] "CSV"
    ],
                 "birth" => "1945/3/25",
         "customer_name" => "鶴岡 薫",
                   "age" => "74",
              "@version" => "1",
    "customer_name_kana" => "つるおか かおる"
}

うまくリネームされています。

レベル6

データの型変換

いくつか型変換します。
こちらの型が文字列型なので、数値型に変換します。
["age"]

{
                 "email" => "tsuruoka_kaoru@example.com",
                "gender" => "M",
     "registration_date" => 2019-01-01T04:48:22.000Z,
                  "pref" => "東京都",
            "customerid" => "AS345469",
                  "tags" => [
        [0] "CSV"
    ],
                 "birth" => "1945/3/25",
         "customer_name" => "鶴岡 薫",
                   "age" => "74",
              "@version" => "1",
    "customer_name_kana" => "つるおか かおる"
}

first-pipeline.confの編集

input {
    file {
        path => ["/home/opc/test.csv"]
        start_position => "beginning"
        sincedb_path => "/usr/share/logstash/data/plugins/inputs/file/sincedb"
        tags => "CSV"
    }
}
filter {
    csv {
        remove_field => [ "path","host","message" ]
    }
    date {
        match => [ "column3","yyyy-MM-dd HH:mm:ss" ]
        remove_field => [ "column3" ]
    }
    mutate {
        rename => { "column1" => "customerid"}
        rename => { "column2" => "customer_name" }
        rename => { "@timestamp" => "registration_date"}
        rename => { "column4" => "customer_name_kana" }
        rename => { "column5" => "email"}
        rename => { "column6" => "gender"}
        rename => { "column7" => "age"}
        rename => { "column8" => "birth"}
        rename => { "column9" => "pref"}
            convert => {
                "age" => "integer"
            }
    }
}
output {
    stdout { codec => rubydebug }
}

"age"が数値型に変換されていることが分かります。
年齢を括っていたダブルクォーテーションが消えたと思います。

{
    "customer_name_kana" => "つるおか かおる",
                 "email" => "tsuruoka_kaoru@example.com",
                  "pref" => "東京都",
                 "birth" => "1945/3/25",
                   "age" => 74,
              "@version" => "1",
         "customer_name" => "鶴岡 薫",
                "gender" => "M",
            "customerid" => "AS345469",
     "registration_date" => 2019-01-01T04:48:22.000Z,
                  "tags" => [
        [0] "CSV"
    ]
}

レベル7

Elasticsearchにデータを送る

ようやく面白くなってきました。
これまではコンソール上での表示しかしてませんでしたが、ここでElasticsearchへ送りKibanaで確認してみます。

first-pipeline.confの編集

input {
    file {
        path => ["/home/opc/test.csv"]
        start_position => "beginning"
        sincedb_path => "/usr/share/logstash/data/plugins/inputs/file/sincedb"
        tags => "CSV"
    }
}
filter {
    csv {
        remove_field => [ "path","host","message" ]
    }
    date {
        match => [ "column3","yyyy-MM-dd HH:mm:ss" ]
        remove_field => [ "column3" ]
    }
    mutate {
        rename => { "column1" => "customerid"}
        rename => { "column2" => "customer_name" }
        rename => { "@timestamp" => "registration_date"}
        rename => { "column4" => "customer_name_kana" }
        rename => { "column5" => "email"}
        rename => { "column6" => "gender"}
        rename => { "column7" => "age"}
        rename => { "column8" => "birth"}
        rename => { "column9" => "pref"}
            convert => {
                "age" => "integer"
            }
    }
}
output {
    stdout { codec => rubydebug }
    elasticsearch {
        hosts => "http://localhost:9200/"
        index => "python-100knocks-%{+YYYY.MM.DD}"
    }
}

Kibanaで確認してみる

kibanaにログインします。http://{Server_IP_Address}:5601
画面左上のハンバーガーメニューから「Stack Management」をクリックします。
次に「Create index pattern」をクリックします。

すると先ほどfirst-pipeline.confで定義したindex名が表示されています。
Name欄にIndexと同じ名前を入力し、「Create index pattern」をクリックします。
Timestamp field欄には「timestamp」を選択します。

見覚えのあるカラム名が表示されています。

レベル8

Metricbeatを使ってインスタンスのCPU/Memory等の情報を収集する

インストール方法

sudo yum install metricbeat

Metricbeatの起動

sudo systemctl start metricbeat

これだけでkibanaにメトリックが送られます。

Discussion