⚙️

【Cloud Data Fusion】マクロ関数を利用したファイル指定方法

に公開

はじめに

今回は、Cloud Data Fusion (以降、CDF)のマクロ関数を使ったナレッジになります。
利用するマクロ関数は Logical Start Time functionになります。

【参考】
Logical Start Time function

今回のやりたきこと

今回、CDFで実現したいことは下記3点になります。

  1. LogstashがGoogle Cloud Storage (以降、GCS)に出力したログをBigQueryに取り込みたい。
  2. BigQueryに書き込み後に処理したログのみを処理済みフォルダに移動したい。
  3. CDFのパイプライン実行日の前日分のログをBigQueryへの書き込み対象としたい。


全体概要図

実現したいこと❶

LogstashとCDFの多段ETL構成としたのは、「CDFのプラグインで取得困難なデータソースがあり
Elastic社のFilebeat Module[1]を利用したため」という背景になります。

FilebeatではGCSに直接出力できないため、Logstashを介して出力しています。
(上記の概要図では、Filebeatは割愛しています)

実現したいこと❷

LogstashのGoogle Cloud Storage output pluginを利用しましたが
このプラグインでは出力バケットしか指定できず、フォルダを指定できませんでした。

バケット直下にログファイルが増えてしまい、日次バッチの処理時間が増えてしまうという
課題があったため、処理したファイルのみを処理済みフォルダに移動する必要がありました。

実現したいこと❸

LogstashがGCSに出力するタイミングは以下の2つのパラメータで決まっています。

パラメータ名 デフォルト値 説明
max_file_size_kbytes 10000 (kbytes) 最大ファイルサイズ(KB)を指定します
uploader_interval_secs 60 (second) GCSへの出力間隔(秒)を指定します

デフォルトでは、60秒間隔で出力するようになっていますが
その前に最大サイズの10,000KBに達するとファイルを出力するようになっています。
(GCSにファイルが出力されるタイミングをコントロールすることが困難でした)

処理したファイルのみを処理済みフォルダに移動しようとしても
パイプライン実行中にLogstashから新たにファイル出力されてしまい
処理していないファイルも移動されてしまうという課題がありました。

パイプライン実行日の前日分のファイルのみを日次処理するようにしました。

上記の❷と❸を実現するために Logical Start Time functionを利用します。

【参考】Logstashが出力するファイル名

LogstashのGoogle Cloud Storage output plugin[2]を使ってGCSにログを出力する場合
以下のパラメータに基づいて出力ファイル名が決まります。

パラメータ名 デフォルト値 説明
log_file_prefix "logstash_gcs" ファイルの先頭につける文字列を指定します
include_hostname true ファイル名にホスト名を含めるか指定します
date_pattern "%Y-%m-%dT%H:00" タイムスタンプの形式を指定します

上記のパラメータに基づき、以下のフォーマットでファイル名が決定します。
<log_file_prefix>_<hostname>_%Y-%m-%dT%H:00.part<3桁の数値>.log

2022年1月10日の11時台にvm01マシンが出力したログの場合
logstash_gcs_vm01_2022-01-10T11:00.part000.logになります。
(3桁の数値は1時間の間に000から順にカウントアップされていきます)

利用環境

Product version
CDAP 6.5.1 (Developer)
GCS Source Plugin 0.18.1
BigQuery Sink Plugin 0.18.1
Wrangler Plugin 4.5.1
GCS Move Action Plugin 0.18.1
Region us-west1

【構成図】
・以下のCDFパイプライン構成図になります。

パイプライン構成

【補足】
・収集したログをパースするため、Wrangler Plugin[3]を利用しています。
・処理完了後、ログファイルを移動するため、GCS Move Action Plugin[4]を利用しています。

実施手順

以下の手順で実施します。

  1. GCS Source Pluginの設定
  2. GCS Move Action Pluginの設定
  3. パイプライン実行結果の確認

【補足】
Wrangler Plugin及びBigQuery Sink Pluginの設定は省略しています。

1. GCS Source Pluginの設定

  • まず、取り込み対象ファイルは以下になります。


GCSバケットの対象ファイル

【補足】
・対象バケットはlog-bucket-2022とします。
・2022/01/11にパイプライン実行するため、取り込み対象ファイルは2022/01/10分とします。
・処理済みのファイルは01_Doneフォルダに移動します。

  • GCS Source PluginPropertiesを開きます。
  • Pathgs://log-bucket-2022と設定します。


Pathの設定

  • Regex Path Filter/logstash_gcs_${logicalStartTime(yyyy-MM-dd,1d,Asia/Tokyo)}.*.logと設定します。


Regex Path Filterの設定

【補足】
・ファイルはバケット直下に配置されるため、Pathにはバケットのみ設定します。
・ファイル名の指定には正規表現を利用するため、Regex Path Filterを利用します。
Logical Start Time functionは日付形式、遡る日時、タイムゾーンの順に記載します。

2. GCS Move Action Pluginの設定

  • GCS Move Action PluginPropertiesを開きます。
  • Source Pathgs://log-bucket-2022/logstash_gcs_${logicalStartTime(yyyy-MM-dd,1d,Asia/Tokyo)}と設定します。
  • Destination Pathgs://log-bucket-2022/01_Done/を設定します。


GCS Move Action Pluginの設定

【補足】
Source Pathに正規表現を利用できないため、ファイル名の前方一致としています。

3. パイプライン実行結果の確認

  • GCSバケットを確認します。
  • まずは、元々ログが配置されていたgs://log-bucket-2022配下を確認します。
  • 2022/01/10分の3つファイルが無くなっています。


GCSバケット直下

  • 01_Doneフォルダ配下にlogstash_gcs_2022-01-10というフォルダが作成されています。
  • そのフォルダ内にファイルが移動されていることがわかります。

    移動先フォルダ

Source Pathで指定したパス名でディレクトリ化されて
マッチしなかった文字列の部分だけがファイル名として移動していました。

以上、想定外の動きではありましたが、やりたいことは実現できました。

まとめ

さて、いかがでしたでしょうか?

今回は手動でパイプライン実行しましたが
スケジュール機能でDaily 10:00AMとすると
日本時間の毎日19時に自動でパイプラインが実行されます。

at least onceでログを取り込む1つの手法として参考になると幸いです^^

脚注
  1. Filebeat Module ↩︎

  2. Google Cloud Storage output plugin ↩︎

  3. Wrangler Plugin ↩︎

  4. GCS Move Action Plugin ↩︎

Discussion