⚙️

【Cloud Data Fusion】本格的なパイプライン作ってみた!

15 min read

はじめに

これまでElastic社のETLツールであるLogstash[1]を多用して
データ分析に必要なデータ加工処理をしていました。
どハマりすぎて1冊本[2]を書いてしまう程にのめり込んでいました^^;

5年くらい前は、ETLツールと言えば、Fluentd[3]とLogstash
バッチ処理であれば、Embulk[4]がよく比較された上で採用されていました。
今でももちろん多くの商用環境で利用されています。

昨今はデータ基盤をパブリッククラウドに構築するケースが増えており
それに伴い、ETL(ELT含む)処理もサーバレスでマネージドなものにする動きがあります。

私がよく耳にするものでは、下記のようなクラウドETLサービスがあります。

  • 日系企業: trocco[5]Reckoner[6]
  • 外資企業: Fivetran[7]Xplenty[8]


クラウドETLサービス

と、前置きはこのくらいにして、今回はクラウドETLツールの1つである
Cloud Data Fusion[9]でセキュアに構築するデータパイプラインについて紹介します。

Cloud Data Fusionのパイプライン作成画面

今回実施したかったこと

オンプレの基幹システム(RDBMS)で保持しているデータをCSV抽出して
クラウドのデータ基盤で分析するユースケースを想定して、下記項目を要件としています。

  1. GUIベースのローコードETLツールでデータパイプラインを作成したい
  2. VPC Service Controlsの保護下でセキュアなパイプラインを作成したい
  3. Cloud Storage上のファイルパスを正規表現指定で収集してBigqueryに格納したい
  4. ZIPファイルを解凍し、CSVをパース(フィールド抽出)したい
  5. URLが格納されたフィールドのURLをデコードしたい
  6. 日付が格納されたフィールドの時刻のタイムゾーンを日本時間からUTCに変換したい
  7. BigQueryの日付パーティションが効いた状態でデータを格納したい

【補足】
・ Cloud Data Fusion(以降、CDF)はGCPのサービスなので、GCPで基盤構築しています。

利用環境

Product version
Cloud Data Fusion 2021年5月16日時点
VPC Service Controls 2021年5月16日時点
Cloud Strorage 2021年5月16日時点
BigQuery 2021年5月16日時点
CDAP 6.4.0
Region asia-northeast1

【全体構成】

全体構成

【パイプライン構成】

パイプライン構成

【補足】
・ SubnetのPrivate Google Access(PGA)はオンにしています。
・ VPC Service Controls(以降、VPC-SC)は事前設定済みです。
・ Cloud Storage(以降、GCS)のバケット作成、データ格納は事前実施済みです。
・ BigQuery(以降、BQ)のデータセット、テーブルは事前作成済みです。

事前設定したパラメータ

以下、事前設定されたパラメータ値になります。

1. VPC-SCの設定値

GCSとBQをVPC-SC境界内に保護することで機密データを保護します。

項目名 設定値
境界名 CloudDataFusionTest
境界のタイプ 標準境界(デフォルト)
構成のタイプ 適用(デフォルト)
保護するプロジェクト <保護対象のGCPプロジェクトを指定>
制限付きサービス BigQuery API、Google Cloud Storage API
VPCのアクセス可能なサービス すべてのサービス
アクセスレベル -
内向きポリシー -
下り(外向きポリシー) -

【補足】
・ 今回はパブリックにCDFでデータパイプライン設定するため、CDFを保護していません。
・ 接続元IPをAccess Context Managerで許可し、アクセスレベルで設定することも可能です。

2. IAMアカウントの設定値

以下、パイプラインを実行するDataprocが利用するサービスアカウントのロールになります。

項目名 設定値
サービスアカウント名 cdf-test@<project>.iam.gserviceaccount.com
IAMロール名 編集者, BigQuery編集者, Storageオブジェクト閲覧者


IAMロールの割り当て

【補足】
・ BQにデータを格納するため、BigQuery編集者を付与しています。
・ GCSからデータを取得するため、Storageオブジェクト閲覧者を付与しています。
・ CDFインスタンス作成時にCloud Data Fusion実行者が自動付与されます。

3. GCSの設定値

以下、ZIPファイルを格納するGCSバケットの設定になります。

項目名 設定値
バケット名 cdf-test (グローバルに一意な名前)
ロケーションタイプ Region
ロケーション asia-northeast-1
ストレージクラス Nearline (なんでもOK)
アクセス制御 均一
暗号化 (オプション) Googleが管理する鍵 (デフォルト)
保持ポリシー (オプション) 未設定 (デフォルト)
ラベル (オプション) 未設定 (デフォルト)

【補足】
・ 上記のバケットにdata_fusion_testフォルダを作成しています。
・ 利用するファイルはweblog_20210516000000_20210516235959.csv.zipです。

今回利用するログの中身
"test-user", "2021/05/16 10:11:13", "https://www.amazon.co.jp/Elastic-Stack%E5%AE%9F%E8%B7%B5%E3%82%AC%E3%82%A4%E3%83%89-Logstash-Beats%E7%B7%A8-impress/dp/4295009989"

4. BQの設定値

以下、データ格納先となるBQのデータセットとテーブルの情報になります。

項目名 設定値
データセット名 dataset_test
テーブル名 cdf_test

【スキーマ情報】

フィールド名 データ型 説明
user_name string ユーザ名
time_stamp timestamp アクセス日時
url string URL

【補足】
・ time_stampフィールドを日付パーティション[10]に利用しています。

実施手順

以下の順番に実施します。

  1. インスタンス作成
  2. CDFへのアクセス
  3. データソース指定
  4. ZIP解凍処理
  5. CSVパース処理
  6. URLデコード処理
  7. タイムゾーン変換処理
  8. データシンク指定
  9. パイプライン実行
  10. 実施結果確認

1. インスタンス作成

Cloud Data Fusion(以降、CDF)のインスタンスの作成時のオプションで
プライベートIPを有効化にチェックを入れるとCDFとDataprocのインスタンスが
プライベートIPで作成されます。

https://cloud.google.com/data-fusion/docs/how-to/create-private-ip

【補足】
・ 事前にデプロイ先のSubnetで限定公開のGoogleアクセスをオンにしておく必要があります。
 (VPCネットワークで該当リーションのサブネットをクリックします)

では、GCPコンソールでData Fusionを開き、インスタンスを作成をクリックします。

下記のパラメータでインスタンスを作成します。

項目名 設定値
インスタンス名 hibino1 (好きな名前でOK)
リージョン asia-northeast1
バージョン 6.4.0 (2021年5月時点の最新版)
エディション Developer
Dataprocサービスアカウント cdf-test@<project>.iam.gserviceaccount.com
ゾーン asia-northeast1-c (どこでもOK)
プライベートIPを有効化 ☑︎
Stackdriver Loggingサービスを有効にする ☑︎ (どちらでもOK)
Stackdriver Monitoringサービスを有効にする ☑︎ (どちらでもOK)

【補足】
・ DataprocサービスアカウントにIAMロール付与をする上で必要な設定は
 インスタンス作成でどハマりした話を参照ください。

・ エディションは、Developer、Basic、Enterpriseの3種類あります。
 機能の違いはなく、可用性と性能の要件に応じて、利用するエディションを選択ください。

https://cloud.google.com/data-fusion/pricing/?hl=JA

プライベートインスタンスを作成すると自動的にVPCピアリングが張られます。

15〜20分くらいでインスタンスが作成されます。

2. CDFへのアクセス

作成したインスタンスにおいて、インスタンスを表示をクリックします。

以下の画面が表示されれば、アクセスはOKです。次の手順からパイプラインを作成していきます!

3. データソース指定

画面左メニューを開き、Pipeline > Stadioをクリックします。

Sourceにあるデフォルト15種類のデータソースから選択します。
今回は、❶GCSをクリックします。すると、GCPのパネルが画面にプロットされます。

Source GCP追加

❷GCPのPropertiesをクリックして、以下の設定を行います。

項目名 設定値
Label GCS
Reference Name input_gcs_weblog (好きな名前でOK)
Path gs://cdf-test/data_fusion_test/
Format blob
Regex Path Filter /data_fusion_test/weblog_.*zip

Output Schemaは以下としています。

項目名 データ型
body bytes

【ポイント】
Pathには、オブジェクト参照範囲のGCSのバケットとフォルダパスを指定します。
・ ログファイル名には、タイムスタンプが入ることが多いため、正規表現で指定しています。
 バケット配下のKeyに対して、正規表現で対象オブジェクトのみに収集対象を限定できます。
・ ZIPファイルを一旦バイナリファイルで収集するため、Formatblobとしています。
 ZIPファイルの解凍は次の手順で実施します。
・ 出力データはバイナリがボコっとbodyに入ります。データ型はbytesとなります。


作成画面(参考)

【補足】
・ 上記のパラメータ以外はデフォルト値としています。

設定後、画面右上のValidateを押して、No errors found.となればOKです。

Validate OK画面

4. ZIP解凍処理

次は、ZIPファイルの解凍処理になります。
画面左のTransformからField Decompressorをクリックします。

Field Decompressor追加

❶先ほど作成したGCSから矢印で接続し、❷Propertiesを開きます。

GCSとField Decompressorの接続

Decompress FieldsbodyフィールドでZIPを指定します。
Output Schemaで出力するbodyフィールドのデータ型を文字列のstringにします。

ZIP解凍設定

今回も画面右上のValidateを押して、OKとなれば完了です。

5. CSVパース処理

次は、CSVパース処理になります。
画面左のTransformからCSV Parserをクリックします。

CSV Parserの追加

❶先ほど作成したField Decompressorから矢印で接続し、❷Propertiesを開きます。

Field DecompressorとCSV Parserの接続

Input field to Parsebodyと入力します。

CSVパース設定

Output Schemaで出力するフィールドは以下のようにします。

フィールド名 データ型
user_name string
time_stamp string
url string

【ポイント】
time_stampフィールドは、後でタイムゾーン変換処理をします。
 その時にtimestamp型に変換するので、今は文字列型として出力します。

画面右上のValidateを押して、OKとなれば完了です。

6. URLデコード処理

次は、URLデコード処理になりますが、Transformの中にURLデコード用のパーツがありません。
Transformにない場合は、Wranglerを使うことでより柔軟な処理を実施することができます。

https://github.com/data-integrations/wrangler

URLデコード処理と次の手順のタイムゾーン処理はこのWranglerを利用します。
Directivesという関数のようなものを簡単なコードとして書くことで実行することができます。

画面左のTransformからWranglerをクリックします。

Wranglerの追加

❶先ほど作成したCSV Parserから矢印で接続し、❷Propertiesを開きます。

CSV ParserとWranglerの接続

DirectivesRecipeに以下の構文を追記します。

url-decode url

以下のようになります。

URLデコード処理設定

今回の処理は以上です。このまま次のタイムゾーン変換処理に進みます。

【参考】
URL Decode

7. タイムゾーン変換処理

先ほどのURLデコード処理と続けて、DirectivesRecipeに以下の構文を追記します。

parse-as-datetime :time_stamp "yyyy/MM/dd HH:mm:ss"
datetime-to-timestamp :time_stamp "Asia/Tokyo"

以下のようになります。

タイムゾーン変換処理設定

Output Schemaで出力するフィールドは以下のようにします。

フィールド名 データ型
user_name string
time_stamp timestamp
url string

【ポイント】
parse-as-datetimeでstring型からdatetime型に変換しています。
datetime-to-timestampでタイムゾーンを指定して、timestamp型に変換しています。
・ タイムゾーンはUTC+09:00でもOKですが、可読性向上のため今回のような指定としています。

画面右上のValidateを押して、OKとなれば完了です。

【参考】
Parse as Datetime
Datetime To Timestamp

8. データシンク指定

最後は、BQのテーブルへのロード処理になります。

Sinkにあるデフォルト14種類のデータシンクから選択します。
今回は、BigQueryをクリックします。

BigQueryの追加

❶先ほど作成したWranglerから矢印で接続し、❷Propertiesを開きます。

WranglerとBigQueryの接続

以下の設定を行います。

項目名 設定値
Label BigQuery
Reference Name output_bq_weblog (好きな名前でOK)
Dataset dataset_test
Table cdf_test

画面下部のOutput Schemaで出力するフィールドにチェックを入れます。

画面右上のValidateを押して、OKとなれば完了です。

9. パイプライン実行

作成したパイプラインに名前をつけて保存します。

画面右上のパネルでSaveボタンを押します。

Saveボタン

今回は適当にcdf-testという名前をつけて保存します。

パイプライン名をつけて保存

事前テストとしてPreviewを行います。Previewをクリック後、Runを押します。

Previewの実行

1分程度でプレビューが終わります。
無事完了するとBigQueryのPreview Dataにデータが表示されます。

Preview Dataの確認

以下のようにスキーマに格納されていれば事前テストはOKです。

事前テストOK

再度、Previewボタンを押して、プレビューモードから抜け、Deployを押します。

Deployを開始

Runを押すとデプロイが開始されます。

パイプラインの実行

StatusがProvissioningとなります。

Provissioning

Dataprocクラスタも同時に起動し始めます。

Dataprocクラスタ起動中

パイプラインが完了するとStatusはSucceededとなり、Dataprocは停止されます。

Dataprocの実行結果

10. 実施結果確認

GCPコンソールでBigQueryを開き、該当のテーブルのプレビューを開きます。

以下のようにテーブルにデータが格納されていればOKです。

テーブル格納結果

日付パーティションが効いているか、以下のクエリで確認します。
(このクエリはレガシーSQLでの実行が必要となります)

パーティション確認用クエリ
SELECT * FROM [<project>.dataset_test.cdf_test$__PARTITIONS_SUMMARY__]

以下のようにpartition_idに日付が格納されていれば、OKです。

Partition_id確認結果

長文になってしまいましたが、以上になります。

まとめ

さて、いかがでしたでしょうか?

Cloud Data Fusionについて、試してみた系のブログもいくつかありますが
エンタープライズ用途での具体的なパイプラインの作成方法にまで踏み込んだものが
あまり見当たらななったため、今回本格的に1つありそうなケースで実行してみました。

これからデータエンジニアの活躍の場は、広がる一方だと思っています。
これまでは比較的ワンショットのための環境が多かったと思いますが
今後は、継続的に運用維持していくデータ基盤が増えていくことを考えると
運用者にとって、ラーニングコストの掛かるものや高い運用スキルを求めるものより
可能なら手軽なもので済む方がメリットの大きいケースもあると思います。

本投稿が参考になることを願っています^^

脚注
  1. Logstashの公式ページ ↩︎

  2. Elastic Stack実践ガイド[Logstash/Beats編] ↩︎

  3. Fluentdの公式ページ ↩︎

  4. Embulkの公式ページ ↩︎

  5. troccoの公式ページ ↩︎

  6. Reckonerの公式ページ ↩︎

  7. Fivetranとは ↩︎

  8. Xplentyの公式ページ ↩︎

  9. Cloud Data Fusionの公式ページ ↩︎

  10. パーティション分割テーブルの概要 ↩︎