【Cloud Data Fusion】パイプライン実行エラー検知方法

5 min read読了の目安(約5200字

はじめに

データパイプラインの開発を行っているとデータをうまく加工処理出来ずに
遅くまでデバッグ対応に追われることがあります。

またパイプライン開発は終わり、本番デプロイ後に想定していなかった形式の
データがパイプライン上を流れてきて、分析基盤への取り込みに失敗していることに
気づかずに日々を過ごしてしまう、、なんてことはあるまじきかと思います。

今回はCloud Data Fusion(以降、CDF)でどこまでエラーハンドリングできるものなのか
簡単な調査と検証してみましたので、その備忘録的な内容になっています。

今回のやりたきこと

データパイプラインの開発や運用をしていて知りたいエラーに関する情報は
以下の3点かなと思います。

  • エラーが発生したことを検知したい
  • どこの処理でエラーになったのか知りたい
  • エラーになったデータはどれか知りたい

エラー通知方法

CDFでエラーハンドリングするには、大きく2つの設定があるようです。

設定❶

パイプライン全体を通してのエラーを検知する方法です。
パイプライン開発画面(Studio)のConfigureタブを開くとPipeline alertの設定があります。

Pipeline alert

パイプラインの失敗(Fail)を検知し、以下の通知先にアクションを実行できます。

  • Send Email: メールを送信
  • Run Database Query: データベースにクエリを実行
  • GCSDoneFileMarker: GCSにファイルを作成
  • Make HTTP Call: HTTPリクエストを発行

設定❷

パイプライン内の個別処理の中でエラーを拾って通知する方法です。
CDFでは以下の3つのtransformation pluginでエラー通知ができます。

  • JavaScript Transformation Plugin
  • Wrangler Transformation Plugin
  • UnionSplitter Transformation Plugin


AlertとError Port

【補足】
AlertErrorの詳細説明は後述します。

結論としては...

今回は、Wranglerを使って動作検証を実施しました。
Wranglerのプロパティを開くと下部にError Handlingという項目があります。

Error Handling

Send to error portがエラーの特定と通知が可能で一番良さげでした。
詳細の動作を以下の表にまとめてみました。
(1と2はWranglerとしてFailedにならないため、Pipeline alertは発動しません)

# 設定 ❶Status ❷件数 ❸ログ出力 エラーの特定 エラーの通知 Pipeline alert
1 Skip Error Succeeded In,Outに表示 エラー出力無 不可 不可 通知無
2 Send to error port Succeeded In,Out,Errorに表示 エラー出力無 Error Collector経由で可 Alert Publisher経由で可 通知無
3 Fail pipeline Failed In,Outに表示 エラー出力有 不可 不可 通知有

【補足】
Skip Errorはエラーをスキップしてしまうため、エラー件数は分かりますが、特定や通知はできませんでした。
Send to error portはエラーの特定と通知は可能ですが、パイプライン全体としては成功となってしまうようです。
Fail pipelineはエラー件数と通知は可能ですが、エラーの特定は難しいです。

❶Statusとは

パイプライン実行後に画面上部にSucceededFailedのいずれかの状態が表示されます。

❷件数とは

下記図はWranglerの例になりますが、Error Portに出力したイベント件数が1個となっています。
SourceとTransformationのPluginは出力(Out)エラー(Errors)に件数が表示されます。

❸ログ出力とは

パイプライン実行後に画面上部のLogs[1] を開きます。
Fail pipelineの場合のみ、ERRORレベルのログが記録されます。

実行環境

Product version
CDAP 6.4.0 (Developer)
Wrangler Transformation Plugin 4.4.0
ErrorCollector Plugin 2.6.0
GCS Sink Plugin 0.17.0
Region asia-northeast1

【構成図】

パイプライン構成図

【補足】
・ CDFはプライベートインスタンスで構築しています。
・ SubnetのPrivate Google Access(PGA)はオンにしています。
・ VPC Service Controls(以降、VPC-SC)は事前設定済みです。

設定手順

本投稿では、以下の2つの設定方法を解説します。

  1. Error Collectorを使ったエラーの特定
  2. Pipeline alertを使ったエラー通知

1. Error Collectorを使ったエラーの特定

Wranglerのプロパティを開き、On ErrorSend to error portを選択します。

Error Collectorをクリックします。

WranglerのErrorから矢印でError Collectorで接続させます。

Error CollectorからGCSにエラーになったイベントと原因が記録されたデータを出力します。
(出力先にはSink Pluginを利用します)

これで完了になります。あとはWranglerの処理で失敗するとそのイベントと原因が記録された
データがGCSバケットに格納されますので、原因を特定してパイプラインを修正します。

今回は調査しかしませんでしたが、エラーを通知する場合は
WranglerのAlertに以下のAlert Publisherを接続することで通知できそうです。

  • Kafka Alert Publisher[2]
  • Transactional Alert Publisher[3]

2. Pipeline alertを使ったエラー通知

WrangleやJavaScript以外のプラグインで失敗した場合の通知やエラー内容を確認するため
Pipeline alertの動作も確認しました。

Studio画面でConfigureタブを開き、Pipeline alertを選択します。
Set alerts for your batch pipelineで+ボタンをクリックします。

今回はメール送信とします。

送信元アドレス、宛先アドレス、件名、本文を記載します。

Saveで保存します。

パイプライン実行エラーになるとメール通知が飛んできます。

まとめ

さて、いかがでしたでしょうか?

意外と設定や動作が複雑なので、もう少しシンプルな仕組みにして欲しいと感じました。
Send to error portでエラー通知するには、Kafkaが必要であったりと少し大掛かりに
なってしまいそうですね。

脚注
  1. ログ表示方法 ↩︎

  2. Kafka Alert Publisher ↩︎

  3. Transactional Alert Publisher ↩︎