⚙️

【Logstash】GCSからBigQueryへのログ取り込み方法

8 min read

はじめに

日常的にデータ加工にElastic社のLogstash[1]を使っている身としては
ちょっとしたデータ加工や取り込みをする際は、Logstashに頼ってしまいがちでです^^;

最適なアーキテクチャという意味では、より良い方法があるのは理解しつつも
使い慣れたツールに頼ってしまうこともありますよね(笑)

やりたきこと

今回はGoogle Cloud Storage(以降、GCS)に保存しているCSV形式のログデータを
BigQuery(以降、BQ)に取り込みたい、そして日付のフォーマット処理を行いたい
ということでLogstashを使って実装してみることにしました。

今回のチャレンジ

下記のプラグインを使ったデータ取り込みになります。

  • logstash-input-google_cloud_storage[2]
  • logstash-output-google_bigquery[3]

実行環境

Product version
Logstash 7.11.2
Open JDK 11.0.10
OS CentOS 7.7
BigQuery 2021年3月18日時点
Cloud Storage 2021年3月18日時点
Region asia-northeast1

【構成図】

【前提条件】
・ GCPコンソール操作はオーナー権限で実行しています。
・ GCSのバケット(log-bucket)は作成済みです。
・ ログの保存先パスはlog-bucket/LogData/20210318/testlog.gzになります。
・ BQのテーブル(log-table)は作成済みです。
・ テーブルのフルパスはtest-project.dev_dataset.log-tableになります。

実施手順

  1. サービスアカウントの作成
  2. OpenJDKのインストール
  3. リポジトリの登録
  4. Logstashのインストール
  5. プラグインのインストール
  6. ログデータとテーブルスキーマ
  7. パイプライン定義ファイルの設定
  8. データ取り込み確認

1. サービスアカウントの作成

Cloud Shellを起動します。

下記コマンドでサービスアカウント(for-logstash)を作成します。

$ gcloud iam service-accounts create for-logstash \
    --display-name="for-logstash"

下記コマンドでサービスアカウントに以下のロールを割り当てます。
・ ストレージオブジェクト管理者
・ BigQueryデータ編集者
・ BigQueryジョブユーザー

$ gcloud projects add-iam-policy-binding test-project \
    --member="serviceAccount:for-logstash@test-project.iam.gserviceaccount.com" \
    --role="roles/storage.objectAdmin"

$ gcloud projects add-iam-policy-binding test-project \
    --member="serviceAccount:for-logstash@test-project.iam.gserviceaccount.com" \
    --role="roles/bigquery.dataEditor"

$ gcloud projects add-iam-policy-binding test-project \
    --member="serviceAccount:for-logstash@test-project.iam.gserviceaccount.com" \
    --role="roles/bigquery.jobUser"

下記コマンドでlogstash-vmに作成したサービスアカウントを割り当てます。

$ gcloud compute instances set-service-account logstash-vm \
   --service-account for-logstash@test-project.iam.gserviceaccount.com

2. OpenJDKのインストール

GCPコンソールからlogstash-vmにブラウザウィンドウでログインします。
下記コマンドでOpenJDKをインストールします。

$ sudo yum install java-11-openjdk-devel

下記コマンドでインストールされていることを確認します。

$ sudo java -version
openjdk version "11.0.10" 2021-01-19 LTS
OpenJDK Runtime Environment 18.9 (build 11.0.10+9-LTS)
OpenJDK 64-Bit Server VM 18.9 (build 11.0.10+9-LTS, mixed mode, sharing)

3. リポジトリの登録

下記コマンドでElasticのリポジトリを登録します。
(logstash.repoとしていますが、名前は何でも大丈夫です)

$ sudo vi /etc/yum.repos.d/logstash.repo

上記ファイルに記載する内容は下記になります。

[logstash-7.x]
name=Elastic repository for 7.x packages
baseurl=https://artifacts.elastic.co/packages/7.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md

【参考】
Installing Logstash

4. Logstashのインストール

下記コマンドでPGP鍵ファイルをインポートし、yumコマンドでLogstashをインストールします。

$ sudo rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch
$ sudo yum install logstash

下記コマンドでインストールされていることを確認します。

$ sudo yum list installed | grep logstash
logstash.x86_64          1:7.11.2-1          @logstash-7.x

下記コマンドで自動起動設定およびステータスを確認しておきます。

$ sudo systemctl daemon-reload
$ sudo systemctl enable logstash
$ sudo systemctl status logstash

5. プラグインのインストール

下記コマンドで2つのプラグインをインストールします。
Installation successfulと表示されればOKです。

$ sudo /usr/share/logstash/bin/logstash-plugin install logstash-input-google_cloud_storage
Validating logstash-input-google_cloud_storage
Installing logstash-input-google_cloud_storage
Installation successful

$ sudo /usr/share/logstash/bin/logstash-plugin install logstash-output-google_bigquery
Validating logstash-output-google_bigquery
Installing logstash-output-google_bigquery
Installation successful

下記コマンドでインストールされていることを確認します。

$ /usr/share/logstash/bin/logstash-plugin list logstash-input-google_cloud_storage
logstash-input-google_cloud_storage

$ /usr/share/logstash/bin/logstash-plugin list logstash-output-google_bigquery
logstash-output-google_bigquery

6. ログデータとテーブルスキーマ

GCSに配置されているログデータは下記の通りです。

【ログフォーマット】

"colums1","colums2","colums3","colums4","time_stamp"

【ログサンプル】

"data1","data2","data3","data4","2021/03/18 01:12:42"

格納先のBQのテーブルのスキーマは下記の通りです。

フィールド名 タイプ モード
colums1 STRING NULLABLE
colums2 STRING NULLABLE
colums3 STRING NULLABLE
colums4 STRING NULLABLE
time_stamp TIMESTAMP NULLABLE

7. パイプライン定義ファイルの設定

パイプライン定義ファイル(logstash.conf)の設定をおこないます。

$ sudo vi /etc/logstash/conf.d/logstash.conf

下記の内容をlogstash.confに記載します。

logstash.conf
input {
  google_cloud_storage {
    # GCSを見にいく間隔(秒)
    interval => 60
    # GCSのバケット名
    bucket_id => "test-bucket"
    # 取込むファイル名
    file_matches => "LogData/20210318/testlog.*.gz"
    # プレーンテキストとして読込む
    codec => "plain"
  }
}

filter {
  csv {
    # CSVのヘッダー情報
    columns =>
    [
      "colums1",
      "colums2",
      "colums3",
      "colums4",
      "time_stamp"
    ]
  }
  date {
    # 日付型としてマッチする
    match => ["time_stamp", "yyyy/MM/dd HH:mm:ss"]
    # タイムゾーン指定
    timezone => "Asia/Tokyo"
  }
}

output {
  # BQに出力
  google_bigquery {
    # GCPのプロジェクト名
    project_id => "test-project"
    # 出力したいBQのデータセット名
    dataset => "dev_dataset"
    # BQに取り込みたいフィールド名と型を指定
    csv_schema => "colums1:STRING,colums2:STRING,colums3:STRING,colums4:STRING,time_stamp:TIMESTAMP"
    # テーブル名を指定
    table_prefix => "log-table"
    # テーブル名につく接続文字を消す
    table_separator => ""
    # テーブル名の後に付く日付フォーマットを消すことにより日付できない。
    date_pattern => ""
    # エラー時に出力されるファイルのディレクトリ名
    error_directory => "/tmp/bigquery-errors"
    # テーブルスキーマと一致しない余分な列値を無視する
    ignore_unknown_values => true
  }
}

【設定の解説】
・ 今回取り込むファイルはtestlog.gzですが、testlog.01.gzでも取り込み可能にしています。
・ time_stampフィールドの日付形式をISO8601に変換するため、date filterを利用しています。
・ タイムゾーンにAsia/Tokyoを指定することでUTC時間でBQに格納しています。
・ BQへのデフォルト出力設定は、TablePrefix_%Y-%m-%dT%H:00というテーブル名になります。
・ テーブル名のみを指定したい場合、table_separatordate_patternを空白にします。
・ 作成済みBQテーブルに格納する場合、パーティション分割テーブルを利用できます。
・ 今回は事前にパーティション分割テーブルを作成し、BQ出力で格納しています。
・ ログデータのフィールドのうち、テーブルに格納しないものはremove fieldで削除します。
ignore_unknown_valuestrueとすることで取り込まないことも可能です。

LogstashのBQ出力でテーブル生成する場合、パーティション分割テーブルにできません。
また、BQに格納するデータの重複排除を行う仕組みもLogstashでは実装されていません。

Logstashが取り込み時に自動的に打刻する@timestampはBQに出力する際、「@」を外してInsertする動作になっています。そのため、他にtimestampTimeStampなど(大文字/小文字の区別なし)のフィールドを持っていると勝手に@timestampの時刻に上書きされますので、注意してください!

8. データ取り込み確認

Logstashによって、BQに格納されたデータはストリーミングインサートで取り込まれます。

https://cloud.google.com/blog/products/bigquery/life-of-a-bigquery-streaming-insert

よって、格納された直後は下記のような状態となります。
(最大90分後にはテーブルに格納されます)

まとめ

さて、いかがでしたでしょうか?
継続的な運用も視野に入れて、マネージドサービスに寄せる方法も多数あると思います。

特にデータ加工をせずにBQに入れるだけであれば、BigQuery Data Transfer Service[4]
データ加工もGUIのノーコードで実施するのであれば、Cloud Data Fusion[5]もアリだと思います。

自分たちの要件に合ったやり方で色々なデータエンジニアリングを楽しんでいきましょう!!

脚注
  1. Logstashとは ↩︎

  2. Google Cloud Storage Input Plugin ↩︎

  3. Google BigQuery output plugin ↩︎

  4. BigQuery Data Transfer Serviceの概要 ↩︎

  5. Cloud Data Fusion ↩︎

Discussion

ログインするとコメントできます