【Logstash】GCSからBigQueryへのログ取り込み方法
はじめに
日常的にデータ加工にElastic社のLogstash[1]を使っている身としては
ちょっとしたデータ加工や取り込みをする際は、Logstashに頼ってしまいがちでです^^;
最適なアーキテクチャという意味では、より良い方法があるのは理解しつつも
使い慣れたツールに頼ってしまうこともありますよね(笑)
やりたきこと
今回はGoogle Cloud Storage(以降、GCS)に保存しているCSV形式のログデータを
BigQuery(以降、BQ)に取り込みたい、そして日付のフォーマット処理を行いたい
ということでLogstashを使って実装してみることにしました。
今回のチャレンジ
下記のプラグインを使ったデータ取り込みになります。
実行環境
Product | version |
---|---|
Logstash | 7.11.2 |
Open JDK | 11.0.10 |
OS | CentOS 7.7 |
BigQuery | 2021年3月18日時点 |
Cloud Storage | 2021年3月18日時点 |
Region | asia-northeast1 |
【構成図】
【前提条件】
・ GCPコンソール操作はオーナー権限で実行しています。
・ GCSのバケット(log-bucket)は作成済みです。
・ ログの保存先パスはlog-bucket/LogData/20210318/testlog.gz
になります。
・ BQのテーブル(log-table)は作成済みです。
・ テーブルのフルパスはtest-project.dev_dataset.log-table
になります。
実施手順
- サービスアカウントの作成
- OpenJDKのインストール
- リポジトリの登録
- Logstashのインストール
- プラグインのインストール
- ログデータとテーブルスキーマ
- パイプライン定義ファイルの設定
- データ取り込み確認
1. サービスアカウントの作成
Cloud Shellを起動します。
下記コマンドでサービスアカウント(for-logstash
)を作成します。
$ gcloud iam service-accounts create for-logstash \
--display-name="for-logstash"
下記コマンドでサービスアカウントに以下のロールを割り当てます。
・ ストレージオブジェクト管理者
・ BigQueryデータ編集者
・ BigQueryジョブユーザー
$ gcloud projects add-iam-policy-binding test-project \
--member="serviceAccount:for-logstash@test-project.iam.gserviceaccount.com" \
--role="roles/storage.objectAdmin"
$ gcloud projects add-iam-policy-binding test-project \
--member="serviceAccount:for-logstash@test-project.iam.gserviceaccount.com" \
--role="roles/bigquery.dataEditor"
$ gcloud projects add-iam-policy-binding test-project \
--member="serviceAccount:for-logstash@test-project.iam.gserviceaccount.com" \
--role="roles/bigquery.jobUser"
下記コマンドでlogstash-vmに作成したサービスアカウントを割り当てます。
$ gcloud compute instances set-service-account logstash-vm \
--service-account for-logstash@test-project.iam.gserviceaccount.com
2. OpenJDKのインストール
GCPコンソールからlogstash-vmにブラウザウィンドウでログインします。
下記コマンドでOpenJDKをインストールします。
$ sudo yum install java-11-openjdk-devel
下記コマンドでインストールされていることを確認します。
$ sudo java -version
openjdk version "11.0.10" 2021-01-19 LTS
OpenJDK Runtime Environment 18.9 (build 11.0.10+9-LTS)
OpenJDK 64-Bit Server VM 18.9 (build 11.0.10+9-LTS, mixed mode, sharing)
3. リポジトリの登録
下記コマンドでElasticのリポジトリを登録します。
(logstash.repo
としていますが、名前は何でも大丈夫です)
$ sudo vi /etc/yum.repos.d/logstash.repo
上記ファイルに記載する内容は下記になります。
[logstash-7.x]
name=Elastic repository for 7.x packages
baseurl=https://artifacts.elastic.co/packages/7.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
【参考】
・ Installing Logstash
4. Logstashのインストール
下記コマンドでPGP鍵ファイルをインポートし、yumコマンドでLogstashをインストールします。
$ sudo rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch
$ sudo yum install logstash
下記コマンドでインストールされていることを確認します。
$ sudo yum list installed | grep logstash
logstash.x86_64 1:7.11.2-1 @logstash-7.x
下記コマンドで自動起動設定およびステータスを確認しておきます。
$ sudo systemctl daemon-reload
$ sudo systemctl enable logstash
$ sudo systemctl status logstash
5. プラグインのインストール
下記コマンドで2つのプラグインをインストールします。
Installation successfulと表示されればOKです。
$ sudo /usr/share/logstash/bin/logstash-plugin install logstash-input-google_cloud_storage
Validating logstash-input-google_cloud_storage
Installing logstash-input-google_cloud_storage
Installation successful
$ sudo /usr/share/logstash/bin/logstash-plugin install logstash-output-google_bigquery
Validating logstash-output-google_bigquery
Installing logstash-output-google_bigquery
Installation successful
下記コマンドでインストールされていることを確認します。
$ /usr/share/logstash/bin/logstash-plugin list logstash-input-google_cloud_storage
logstash-input-google_cloud_storage
$ /usr/share/logstash/bin/logstash-plugin list logstash-output-google_bigquery
logstash-output-google_bigquery
6. ログデータとテーブルスキーマ
GCSに配置されているログデータは下記の通りです。
【ログフォーマット】
"colums1","colums2","colums3","colums4","time_stamp"
【ログサンプル】
"data1","data2","data3","data4","2021/03/18 01:12:42"
格納先のBQのテーブルのスキーマは下記の通りです。
フィールド名 | タイプ | モード |
---|---|---|
colums1 | STRING | NULLABLE |
colums2 | STRING | NULLABLE |
colums3 | STRING | NULLABLE |
colums4 | STRING | NULLABLE |
time_stamp | TIMESTAMP | NULLABLE |
7. パイプライン定義ファイルの設定
パイプライン定義ファイル(logstash.conf)の設定をおこないます。
$ sudo vi /etc/logstash/conf.d/logstash.conf
下記の内容をlogstash.confに記載します。
input {
google_cloud_storage {
# GCSを見にいく間隔(秒)
interval => 60
# GCSのバケット名
bucket_id => "test-bucket"
# 取込むファイル名
file_matches => "LogData/20210318/testlog.*.gz"
# プレーンテキストとして読込む
codec => "plain"
}
}
filter {
csv {
# CSVのヘッダー情報
columns =>
[
"colums1",
"colums2",
"colums3",
"colums4",
"time_stamp"
]
}
date {
# 日付型としてマッチする
match => ["time_stamp", "yyyy/MM/dd HH:mm:ss"]
# タイムゾーン指定
timezone => "Asia/Tokyo"
}
}
output {
# BQに出力
google_bigquery {
# GCPのプロジェクト名
project_id => "test-project"
# 出力したいBQのデータセット名
dataset => "dev_dataset"
# BQに取り込みたいフィールド名と型を指定
csv_schema => "colums1:STRING,colums2:STRING,colums3:STRING,colums4:STRING,time_stamp:TIMESTAMP"
# テーブル名を指定
table_prefix => "log-table"
# テーブル名につく接続文字を消す
table_separator => ""
# テーブル名の後に付く日付フォーマットを消すことにより日付できない。
date_pattern => ""
# エラー時に出力されるファイルのディレクトリ名
error_directory => "/tmp/bigquery-errors"
# テーブルスキーマと一致しない余分な列値を無視する
ignore_unknown_values => true
}
}
【設定の解説】
・ 今回取り込むファイルはtestlog.gz
ですが、testlog.01.gz
でも取り込み可能にしています。
・ time_stampフィールドの日付形式をISO8601に変換するため、date filterを利用しています。
・ タイムゾーンにAsia/Tokyoを指定することでUTC時間でBQに格納しています。
・ BQへのデフォルト出力設定は、TablePrefix_%Y-%m-%dT%H:00
というテーブル名になります。
・ テーブル名のみを指定したい場合、table_separator
とdate_pattern
を空白にします。
・ 作成済みBQテーブルに格納する場合、パーティション分割テーブルを利用できます。
・ 今回は事前にパーティション分割テーブルを作成し、BQ出力で格納しています。
・ ログデータのフィールドのうち、テーブルに格納しないものはremove field
で削除します。
・ ignore_unknown_values
をtrueとすることで取り込まないことも可能です。
8. データ取り込み確認
Logstashによって、BQに格納されたデータはストリーミングインサートで取り込まれます。
よって、格納された直後は下記のような状態となります。
(最大90分後にはテーブルに格納されます)
まとめ
さて、いかがでしたでしょうか?
継続的な運用も視野に入れて、マネージドサービスに寄せる方法も多数あると思います。
特にデータ加工をせずにBQに入れるだけであれば、BigQuery Data Transfer Service[4]
データ加工もGUIのノーコードで実施するのであれば、Cloud Data Fusion[5]もアリだと思います。
自分たちの要件に合ったやり方で色々なデータエンジニアリングを楽しんでいきましょう!!
Discussion