【Cloud Data Fusion】パイプライン実行エラー検知方法
はじめに
データパイプラインの開発を行っているとデータをうまく加工処理出来ずに
遅くまでデバッグ対応に追われることがあります。
またパイプライン開発は終わり、本番デプロイ後に想定していなかった形式の
データがパイプライン上を流れてきて、分析基盤への取り込みに失敗していることに
気づかずに日々を過ごしてしまう、、なんてことはあるまじきかと思います。
今回はCloud Data Fusion(以降、CDF)でどこまでエラーハンドリングできるものなのか
簡単な調査と検証してみましたので、その備忘録的な内容になっています。
今回のやりたきこと
データパイプラインの開発や運用をしていて知りたいエラーに関する情報は
以下の3点かなと思います。
- エラーが発生したことを検知したい
- どこの処理でエラーになったのか知りたい
- エラーになったデータはどれか知りたい
エラー通知方法
CDFでエラーハンドリングするには、大きく2つの設定があるようです。
設定❶
パイプライン全体を通してのエラーを検知する方法です。
パイプライン開発画面(Studio)のConfigureタブを開くとPipeline alertの設定があります。
Pipeline alert
パイプラインの失敗(Fail)を検知し、以下の通知先にアクションを実行できます。
- Send Email: メールを送信
- Run Database Query: データベースにクエリを実行
- GCSDoneFileMarker: GCSにファイルを作成
- Make HTTP Call: HTTPリクエストを発行
設定❷
パイプライン内の個別処理の中でエラーを拾って通知する方法です。
CDFでは以下の3つのtransformation pluginでエラー通知ができます。
- JavaScript Transformation Plugin
- Wrangler Transformation Plugin
- UnionSplitter Transformation Plugin
AlertとError Port
【補足】
・ Alert
とError
の詳細説明は後述します。
結論としては...
今回は、Wranglerを使って動作検証を実施しました。
Wranglerのプロパティを開くと下部にError Handlingという項目があります。
Error Handling
Send to error portがエラーの特定と通知が可能で一番良さげでした。
詳細の動作を以下の表にまとめてみました。
(1と2はWranglerとしてFailedにならないため、Pipeline alertは発動しません)
# | 設定 | ❶Status | ❷件数 | ❸ログ出力 | エラーの特定 | エラーの通知 | Pipeline alert |
---|---|---|---|---|---|---|---|
1 | Skip Error | Succeeded | In,Outに表示 | エラー出力無 | 不可 | 不可 | 通知無 |
2 | Send to error port | Succeeded | In,Out,Errorに表示 | エラー出力無 | Error Collector経由で可 | Alert Publisher経由で可 | 通知無 |
3 | Fail pipeline | Failed | In,Outに表示 | エラー出力有 | 不可 | 不可 | 通知有 |
【補足】
・ Skip Errorはエラーをスキップしてしまうため、エラー件数は分かりますが、特定や通知はできませんでした。
・ Send to error portはエラーの特定と通知は可能ですが、パイプライン全体としては成功となってしまうようです。
・ Fail pipelineはエラー件数と通知は可能ですが、エラーの特定は難しいです。
❶Statusとは
パイプライン実行後に画面上部にSucceeded
とFailed
のいずれかの状態が表示されます。
❷件数とは
下記図はWranglerの例になりますが、Error Portに出力したイベント件数が1個となっています。
SourceとTransformationのPluginは出力(Out)
やエラー(Errors)
に件数が表示されます。
❸ログ出力とは
パイプライン実行後に画面上部のLogs[1] を開きます。
Fail pipelineの場合のみ、ERRORレベルのログが記録されます。
実行環境
Product | version |
---|---|
CDAP | 6.4.0 (Developer) |
Wrangler Transformation Plugin | 4.4.0 |
ErrorCollector Plugin | 2.6.0 |
GCS Sink Plugin | 0.17.0 |
Region | asia-northeast1 |
【構成図】
パイプライン構成図
【補足】
・ CDFはプライベートインスタンスで構築しています。
・ SubnetのPrivate Google Access(PGA)はオンにしています。
・ VPC Service Controls(以降、VPC-SC)は事前設定済みです。
設定手順
本投稿では、以下の2つの設定方法を解説します。
- Error Collectorを使ったエラーの特定
- Pipeline alertを使ったエラー通知
1. Error Collectorを使ったエラーの特定
Wranglerのプロパティを開き、On ErrorでSend to error port
を選択します。
Error Collectorをクリックします。
WranglerのError
から矢印でError Collectorで接続させます。
Error CollectorからGCSにエラーになったイベントと原因が記録されたデータを出力します。
(出力先にはSink Pluginを利用します)
これで完了になります。あとはWranglerの処理で失敗するとそのイベントと原因が記録された
データがGCSバケットに格納されますので、原因を特定してパイプラインを修正します。
今回は調査しかしませんでしたが、エラーを通知する場合は
WranglerのAlert
に以下のAlert Publisherを接続することで通知できそうです。
2. Pipeline alertを使ったエラー通知
WrangleやJavaScript以外のプラグインで失敗した場合の通知やエラー内容を確認するため
Pipeline alertの動作も確認しました。
Studio画面でConfigure
タブを開き、Pipeline alertを選択します。
Set alerts for your batch pipeline
で+ボタンをクリックします。
今回はメール送信とします。
送信元アドレス、宛先アドレス、件名、本文を記載します。
Saveで保存します。
パイプライン実行エラーになるとメール通知が飛んできます。
まとめ
さて、いかがでしたでしょうか?
意外と設定や動作が複雑なので、もう少しシンプルな仕組みにして欲しいと感じました。
Send to error portでエラー通知するには、Kafkaが必要であったりと少し大掛かりに
なってしまいそうですね。
Cloud Data Fusion 関連投稿
- 本格的なパイプライン作ってみた!
- プライベートインスタンスでS3からデータ収集する方法
- インスタンス作成でどハマりした話
- GCSバケット消えない問題
- ZIPファイルの取り込み時の苦戦とその解決方法
- パイプライン失敗時のSlack通知方法
Discussion