【Cloud Data Fusion】マクロ関数を利用したファイル指定方法
はじめに
今回は、Cloud Data Fusion (以降、CDF)のマクロ関数を使ったナレッジになります。
利用するマクロ関数は Logical Start Time function
になります。
【参考】
・Logical Start Time function
今回のやりたきこと
今回、CDFで実現したいことは下記3点になります。
- LogstashがGoogle Cloud Storage (以降、GCS)に出力したログをBigQueryに取り込みたい。
- BigQueryに書き込み後に処理したログのみを処理済みフォルダに移動したい。
- CDFのパイプライン実行日の前日分のログをBigQueryへの書き込み対象としたい。
全体概要図
実現したいこと❶
LogstashとCDFの多段ETL構成としたのは、「CDFのプラグインで取得困難なデータソースがあり
Elastic社のFilebeat Module[1]を利用したため」という背景になります。
FilebeatではGCSに直接出力できないため、Logstashを介して出力しています。
(上記の概要図では、Filebeatは割愛しています)
実現したいこと❷
LogstashのGoogle Cloud Storage output plugin
を利用しましたが
このプラグインでは出力バケットしか指定できず、フォルダを指定できませんでした。
バケット直下にログファイルが増えてしまい、日次バッチの処理時間が増えてしまうという
課題があったため、処理したファイルのみを処理済みフォルダに移動する必要がありました。
実現したいこと❸
LogstashがGCSに出力するタイミングは以下の2つのパラメータで決まっています。
パラメータ名 | デフォルト値 | 説明 |
---|---|---|
max_file_size_kbytes | 10000 (kbytes) | 最大ファイルサイズ(KB)を指定します |
uploader_interval_secs | 60 (second) | GCSへの出力間隔(秒)を指定します |
デフォルトでは、60秒間隔で出力するようになっていますが
その前に最大サイズの10,000KBに達するとファイルを出力するようになっています。
(GCSにファイルが出力されるタイミングをコントロールすることが困難でした)
処理したファイルのみを処理済みフォルダに移動しようとしても
パイプライン実行中にLogstashから新たにファイル出力されてしまい
処理していないファイルも移動されてしまうという課題がありました。
パイプライン実行日の前日分のファイルのみを日次処理するようにしました。
上記の❷と❸を実現するために Logical Start Time function
を利用します。
【参考】Logstashが出力するファイル名
LogstashのGoogle Cloud Storage output plugin
[2]を使ってGCSにログを出力する場合
以下のパラメータに基づいて出力ファイル名が決まります。
パラメータ名 | デフォルト値 | 説明 |
---|---|---|
log_file_prefix | "logstash_gcs" | ファイルの先頭につける文字列を指定します |
include_hostname | true | ファイル名にホスト名を含めるか指定します |
date_pattern | "%Y-%m-%dT%H:00" | タイムスタンプの形式を指定します |
上記のパラメータに基づき、以下のフォーマットでファイル名が決定します。
<log_file_prefix>_<hostname>_%Y-%m-%dT%H:00.part<3桁の数値>.log
2022年1月10日の11時台にvm01マシンが出力したログの場合
logstash_gcs_vm01_2022-01-10T11:00.part000.log
になります。
(3桁の数値は1時間の間に000から順にカウントアップされていきます)
利用環境
Product | version |
---|---|
CDAP | 6.5.1 (Developer) |
GCS Source Plugin | 0.18.1 |
BigQuery Sink Plugin | 0.18.1 |
Wrangler Plugin | 4.5.1 |
GCS Move Action Plugin | 0.18.1 |
Region | us-west1 |
【構成図】
・以下のCDFパイプライン構成図になります。
パイプライン構成
【補足】
・収集したログをパースするため、Wrangler Plugin
[3]を利用しています。
・処理完了後、ログファイルを移動するため、GCS Move Action Plugin
[4]を利用しています。
実施手順
以下の手順で実施します。
- GCS Source Pluginの設定
- GCS Move Action Pluginの設定
- パイプライン実行結果の確認
【補足】
・Wrangler Plugin
及びBigQuery Sink Plugin
の設定は省略しています。
1. GCS Source Pluginの設定
- まず、取り込み対象ファイルは以下になります。
GCSバケットの対象ファイル
【補足】
・対象バケットはlog-bucket-2022
とします。
・2022/01/11にパイプライン実行するため、取り込み対象ファイルは2022/01/10分
とします。
・処理済みのファイルは01_Done
フォルダに移動します。
-
GCS Source Plugin
のPropertiesを開きます。 -
Pathに
gs://log-bucket-2022
と設定します。
Pathの設定
-
Regex Path Filterに
/logstash_gcs_${logicalStartTime(yyyy-MM-dd,1d,Asia/Tokyo)}.*.log
と設定します。
Regex Path Filterの設定
【補足】
・ファイルはバケット直下に配置されるため、Pathにはバケットのみ設定します。
・ファイル名の指定には正規表現を利用するため、Regex Path Filterを利用します。
・Logical Start Time function
は日付形式、遡る日時、タイムゾーンの順に記載します。
2. GCS Move Action Pluginの設定
-
GCS Move Action Plugin
のPropertiesを開きます。 -
Source Pathに
gs://log-bucket-2022/logstash_gcs_${logicalStartTime(yyyy-MM-dd,1d,Asia/Tokyo)}
と設定します。 -
Destination Pathに
gs://log-bucket-2022/01_Done/
を設定します。
GCS Move Action Pluginの設定
【補足】
・Source Pathに正規表現を利用できないため、ファイル名の前方一致としています。
3. パイプライン実行結果の確認
- GCSバケットを確認します。
- まずは、元々ログが配置されていた
gs://log-bucket-2022
配下を確認します。 - 2022/01/10分の3つファイルが無くなっています。
GCSバケット直下
-
01_Done
フォルダ配下にlogstash_gcs_2022-01-10
というフォルダが作成されています。 - そのフォルダ内にファイルが移動されていることがわかります。
移動先フォルダ
Source Pathで指定したパス名でディレクトリ化されて
マッチしなかった文字列の部分だけがファイル名として移動していました。
以上、想定外の動きではありましたが、やりたいことは実現できました。
まとめ
さて、いかがでしたでしょうか?
今回は手動でパイプライン実行しましたが
スケジュール機能でDaily 10:00AMとすると
日本時間の毎日19時に自動でパイプラインが実行されます。
at least once
でログを取り込む1つの手法として参考になると幸いです^^
Discussion