【Cloud Data Fusion】本格的なパイプライン作ってみた!
はじめに
これまでElastic社のETLツールであるLogstash[1]を多用して
データ分析に必要なデータ加工処理をしていました。
どハマりすぎて1冊本[2]を書いてしまう程にのめり込んでいました^^;
5年くらい前は、ETLツールと言えば、Fluentd[3]とLogstash
バッチ処理であれば、Embulk[4]がよく比較された上で採用されていました。
今でももちろん多くの商用環境で利用されています。
昨今はデータ基盤をパブリッククラウドに構築するケースが増えており
それに伴い、ETL(ELT含む)処理もサーバレスでマネージドなものにする動きがあります。
私がよく耳にするものでは、下記のようなクラウドETLサービスがあります。
クラウドETLサービス
と、前置きはこのくらいにして、今回はクラウドETLツールの1つである
Cloud Data Fusion[9]でセキュアに構築するデータパイプラインについて紹介します。
Cloud Data Fusionのパイプライン作成画面
今回実施したかったこと
オンプレの基幹システム(RDBMS)で保持しているデータをCSV抽出して
クラウドのデータ基盤で分析するユースケースを想定して、下記項目を要件としています。
- GUIベースのローコードETLツールでデータパイプラインを作成したい
- VPC Service Controlsの保護下でセキュアなパイプラインを作成したい
- Cloud Storage上のファイルパスを正規表現指定で収集してBigqueryに格納したい
- ZIPファイルを解凍し、CSVをパース(フィールド抽出)したい
- URLが格納されたフィールドのURLをデコードしたい
- 日付が格納されたフィールドの時刻のタイムゾーンを日本時間からUTCに変換したい
- BigQueryの日付パーティションが効いた状態でデータを格納したい
【補足】
・ Cloud Data Fusion(以降、CDF)はGCPのサービスなので、GCPで基盤構築しています。
利用環境
| Product | version |
|---|---|
| Cloud Data Fusion | 2021年5月16日時点 |
| VPC Service Controls | 2021年5月16日時点 |
| Cloud Strorage | 2021年5月16日時点 |
| BigQuery | 2021年5月16日時点 |
| CDAP | 6.4.0 |
| Region | asia-northeast1 |
【全体構成】
全体構成
【パイプライン構成】
パイプライン構成
【補足】
・ SubnetのPrivate Google Access(PGA)はオンにしています。
・ VPC Service Controls(以降、VPC-SC)は事前設定済みです。
・ Cloud Storage(以降、GCS)のバケット作成、データ格納は事前実施済みです。
・ BigQuery(以降、BQ)のデータセット、テーブルは事前作成済みです。
事前設定したパラメータ
以下、事前設定されたパラメータ値になります。
1. VPC-SCの設定値
GCSとBQをVPC-SC境界内に保護することで機密データを保護します。
| 項目名 | 設定値 |
|---|---|
| 境界名 | CloudDataFusionTest |
| 境界のタイプ | 標準境界(デフォルト) |
| 構成のタイプ | 適用(デフォルト) |
| 保護するプロジェクト | <保護対象のGCPプロジェクトを指定> |
| 制限付きサービス | BigQuery API、Google Cloud Storage API |
| VPCのアクセス可能なサービス | すべてのサービス |
| アクセスレベル | - |
| 内向きポリシー | - |
| 下り(外向きポリシー) | - |
【補足】
・ 今回はパブリックにCDFでデータパイプライン設定するため、CDFを保護していません。
・ 接続元IPをAccess Context Managerで許可し、アクセスレベルで設定することも可能です。
2. IAMアカウントの設定値
以下、パイプラインを実行するDataprocが利用するサービスアカウントのロールになります。
| 項目名 | 設定値 |
|---|---|
| サービスアカウント名 | cdf-test@<project>.iam.gserviceaccount.com |
| IAMロール名 | 編集者, BigQuery編集者, Storageオブジェクト閲覧者 |
IAMロールの割り当て
【補足】
・ BQにデータを格納するため、BigQuery編集者を付与しています。
・ GCSからデータを取得するため、Storageオブジェクト閲覧者を付与しています。
・ CDFインスタンス作成時にCloud Data Fusion実行者が自動付与されます。
3. GCSの設定値
以下、ZIPファイルを格納するGCSバケットの設定になります。
| 項目名 | 設定値 |
|---|---|
| バケット名 | cdf-test (グローバルに一意な名前) |
| ロケーションタイプ | Region |
| ロケーション | asia-northeast-1 |
| ストレージクラス | Nearline (なんでもOK) |
| アクセス制御 | 均一 |
| 暗号化 (オプション) | Googleが管理する鍵 (デフォルト) |
| 保持ポリシー (オプション) | 未設定 (デフォルト) |
| ラベル (オプション) | 未設定 (デフォルト) |
【補足】
・ 上記のバケットにdata_fusion_testフォルダを作成しています。
・ 利用するファイルはweblog_20210516000000_20210516235959.csv.zipです。
"test-user", "2021/05/16 10:11:13", "https://www.amazon.co.jp/Elastic-Stack%E5%AE%9F%E8%B7%B5%E3%82%AC%E3%82%A4%E3%83%89-Logstash-Beats%E7%B7%A8-impress/dp/4295009989"
4. BQの設定値
以下、データ格納先となるBQのデータセットとテーブルの情報になります。
| 項目名 | 設定値 |
|---|---|
| データセット名 | dataset_test |
| テーブル名 | cdf_test |
【スキーマ情報】
| フィールド名 | データ型 | 説明 |
|---|---|---|
| user_name | string | ユーザ名 |
| time_stamp | timestamp | アクセス日時 |
| url | string | URL |
【補足】
・ time_stampフィールドを日付パーティション[10]に利用しています。
実施手順
以下の順番に実施します。
- インスタンス作成
- CDFへのアクセス
- データソース指定
- ZIP解凍処理
- CSVパース処理
- URLデコード処理
- タイムゾーン変換処理
- データシンク指定
- パイプライン実行
- 実施結果確認
1. インスタンス作成
Cloud Data Fusion(以降、CDF)のインスタンスの作成時のオプションで
プライベートIPを有効化にチェックを入れるとCDFとDataprocのインスタンスが
プライベートIPで作成されます。
【補足】
・ 事前にデプロイ先のSubnetで限定公開のGoogleアクセスをオンにしておく必要があります。
(VPCネットワークで該当リーションのサブネットをクリックします)
では、GCPコンソールでData Fusionを開き、インスタンスを作成をクリックします。
下記のパラメータでインスタンスを作成します。
| 項目名 | 設定値 |
|---|---|
| インスタンス名 | hibino1 (好きな名前でOK) |
| リージョン | asia-northeast1 |
| バージョン | 6.4.0 (2021年5月時点の最新版) |
| エディション | Developer |
| Dataprocサービスアカウント | cdf-test@<project>.iam.gserviceaccount.com |
| ゾーン | asia-northeast1-c (どこでもOK) |
| プライベートIPを有効化 | ☑︎ |
| Stackdriver Loggingサービスを有効にする | ☑︎ (どちらでもOK) |
| Stackdriver Monitoringサービスを有効にする | ☑︎ (どちらでもOK) |
【補足】
・ DataprocサービスアカウントにIAMロール付与をする上で必要な設定は
インスタンス作成でどハマりした話を参照ください。
・ エディションは、Developer、Basic、Enterpriseの3種類あります。
機能の違いはなく、可用性と性能の要件に応じて、利用するエディションを選択ください。
プライベートインスタンスを作成すると自動的にVPCピアリングが張られます。
15〜20分くらいでインスタンスが作成されます。
2. CDFへのアクセス
作成したインスタンスにおいて、インスタンスを表示をクリックします。
以下の画面が表示されれば、アクセスはOKです。次の手順からパイプラインを作成していきます!
3. データソース指定
画面左メニューを開き、Pipeline > Stadioをクリックします。
Sourceにあるデフォルト15種類のデータソースから選択します。
今回は、❶GCSをクリックします。すると、GCPのパネルが画面にプロットされます。
Source GCP追加
❷GCPのPropertiesをクリックして、以下の設定を行います。
| 項目名 | 設定値 |
|---|---|
| Label | GCS |
| Reference Name | input_gcs_weblog (好きな名前でOK) |
| Path | gs://cdf-test/data_fusion_test/ |
| Format | blob |
| Regex Path Filter | /data_fusion_test/weblog_.*zip |
Output Schemaは以下としています。
| 項目名 | データ型 |
|---|---|
| body | bytes |
【ポイント】
・ Pathには、オブジェクト参照範囲のGCSのバケットとフォルダパスを指定します。
・ ログファイル名には、タイムスタンプが入ることが多いため、正規表現で指定しています。
バケット配下のKeyに対して、正規表現で対象オブジェクトのみに収集対象を限定できます。
・ ZIPファイルを一旦バイナリファイルで収集するため、Formatをblobとしています。
ZIPファイルの解凍は次の手順で実施します。
・ 出力データはバイナリがボコっとbodyに入ります。データ型はbytesとなります。
作成画面(参考)
【補足】
・ 上記のパラメータ以外はデフォルト値としています。
設定後、画面右上のValidateを押して、No errors found.となればOKです。
Validate OK画面
4. ZIP解凍処理
次は、ZIPファイルの解凍処理になります。
画面左のTransformからField Decompressorをクリックします。
Field Decompressor追加
❶先ほど作成したGCSから矢印で接続し、❷Propertiesを開きます。
GCSとField Decompressorの接続
❸Decompress FieldsをbodyフィールドでZIPを指定します。
❹Output Schemaで出力するbodyフィールドのデータ型を文字列のstringにします。
ZIP解凍設定
今回も画面右上のValidateを押して、OKとなれば完了です。
5. CSVパース処理
次は、CSVパース処理になります。
画面左のTransformからCSV Parserをクリックします。
CSV Parserの追加
❶先ほど作成したField Decompressorから矢印で接続し、❷Propertiesを開きます。
Field DecompressorとCSV Parserの接続
❸Input field to Parseにbodyと入力します。
CSVパース設定
❹Output Schemaで出力するフィールドは以下のようにします。
| フィールド名 | データ型 |
|---|---|
| user_name | string |
| time_stamp | string |
| url | string |
【ポイント】
・ time_stampフィールドは、後でタイムゾーン変換処理をします。
その時にtimestamp型に変換するので、今は文字列型として出力します。
画面右上のValidateを押して、OKとなれば完了です。
6. URLデコード処理
次は、URLデコード処理になりますが、Transformの中にURLデコード用のパーツがありません。
Transformにない場合は、Wranglerを使うことでより柔軟な処理を実施することができます。
URLデコード処理と次の手順のタイムゾーン処理はこのWranglerを利用します。
Directivesという関数のようなものを簡単なコードとして書くことで実行することができます。
画面左のTransformからWranglerをクリックします。
Wranglerの追加
❶先ほど作成したCSV Parserから矢印で接続し、❷Propertiesを開きます。
CSV ParserとWranglerの接続
DirectivesのRecipeに以下の構文を追記します。
url-decode url
以下のようになります。
URLデコード処理設定
今回の処理は以上です。このまま次のタイムゾーン変換処理に進みます。
【参考】
・ URL Decode
7. タイムゾーン変換処理
先ほどのURLデコード処理と続けて、DirectivesのRecipeに以下の構文を追記します。
parse-as-datetime :time_stamp "yyyy/MM/dd HH:mm:ss"
datetime-to-timestamp :time_stamp "Asia/Tokyo"
以下のようになります。
タイムゾーン変換処理設定
Output Schemaで出力するフィールドは以下のようにします。
| フィールド名 | データ型 |
|---|---|
| user_name | string |
| time_stamp | timestamp |
| url | string |
【ポイント】
・ parse-as-datetimeでstring型からdatetime型に変換しています。
・ datetime-to-timestampでタイムゾーンを指定して、timestamp型に変換しています。
・ タイムゾーンはUTC+09:00でもOKですが、可読性向上のため今回のような指定としています。
画面右上のValidateを押して、OKとなれば完了です。
【参考】
・ Parse as Datetime
・ Datetime To Timestamp
8. データシンク指定
最後は、BQのテーブルへのロード処理になります。
Sinkにあるデフォルト14種類のデータシンクから選択します。
今回は、BigQueryをクリックします。
BigQueryの追加
❶先ほど作成したWranglerから矢印で接続し、❷Propertiesを開きます。
WranglerとBigQueryの接続
以下の設定を行います。
| 項目名 | 設定値 |
|---|---|
| Label | BigQuery |
| Reference Name | output_bq_weblog (好きな名前でOK) |
| Dataset | dataset_test |
| Table | cdf_test |
画面下部のOutput Schemaで出力するフィールドにチェックを入れます。
画面右上のValidateを押して、OKとなれば完了です。
9. パイプライン実行
作成したパイプラインに名前をつけて保存します。
画面右上のパネルでSaveボタンを押します。
Saveボタン
今回は適当にcdf-testという名前をつけて保存します。
パイプライン名をつけて保存
事前テストとしてPreviewを行います。Previewをクリック後、Runを押します。
Previewの実行
1分程度でプレビューが終わります。
無事完了するとBigQueryのPreview Dataにデータが表示されます。
Preview Dataの確認
以下のようにスキーマに格納されていれば事前テストはOKです。
事前テストOK
再度、Previewボタンを押して、プレビューモードから抜け、Deployを押します。
Deployを開始
Runを押すとデプロイが開始されます。
パイプラインの実行
StatusがProvissioningとなります。
Provissioning
Dataprocクラスタも同時に起動し始めます。
Dataprocクラスタ起動中
パイプラインが完了するとStatusはSucceededとなり、Dataprocは停止されます。
Dataprocの実行結果
10. 実施結果確認
GCPコンソールでBigQueryを開き、該当のテーブルのプレビューを開きます。
以下のようにテーブルにデータが格納されていればOKです。
テーブル格納結果
日付パーティションが効いているか、以下のクエリで確認します。
(このクエリはレガシーSQLでの実行が必要となります)
SELECT * FROM [<project>.dataset_test.cdf_test$__PARTITIONS_SUMMARY__]
以下のようにpartition_idに日付が格納されていれば、OKです。
Partition_id確認結果
長文になってしまいましたが、以上になります。
まとめ
さて、いかがでしたでしょうか?
Cloud Data Fusionについて、試してみた系のブログもいくつかありますが
エンタープライズ用途での具体的なパイプラインの作成方法にまで踏み込んだものが
あまり見当たらななったため、今回本格的に1つありそうなケースで実行してみました。
これからデータエンジニアの活躍の場は、広がる一方だと思っています。
これまでは比較的ワンショットのための環境が多かったと思いますが
今後は、継続的に運用維持していくデータ基盤が増えていくことを考えると
運用者にとって、ラーニングコストの掛かるものや高い運用スキルを求めるものより
可能なら手軽なもので済む方がメリットの大きいケースもあると思います。
本投稿が参考になることを願っています^^
Cloud Data Fusion 関連投稿
- インスタンス作成でどハマりした話
- GCSバケット消えない問題
- プライベートインスタンスでS3からデータ収集する方法
- パイプライン実行エラー検知方法
- ZIPファイルの取り込み時の苦戦とその解決方法
- パイプライン失敗時のSlack通知方法
Discussion