😎

Azure Synapse Pipeline - Data Flow でフォルダ間のデータの差分を取る方法

2021/10/21に公開

Azure Synapse Pipeline もしくは Azure Data Factory の Data Flow で、2つのフォルダに格納されているファイルのデータの差分抽出をしてみます。

ユースケースは以下になります。

  • oldフォルダには古いデータ、newフォルダには最新のデータが格納されている
  • データの項目は IdLastUpdatedCol1
  • 主キーはId列で新規にデータが生成されると新しいIdが生成される
  • データが変更されるとLastUpdated列が更新される

Exists 変換の基本的な動作

Data Flow には Exists変換があり、2つのデータストリーム間の差分を抽出できますが、これはSQL文でいうところのWHERE EXISTSもしくはWHERE NOT EXISTSの動作になります。これはExist typeパラメータで設定します。Existsに設定すると前者、Doesn't existに設定すると後者の動作になります。

比較はExists conditionsパラメータで設定します。項目同士の比較、もしくはカスタム式での条件が設定できます。

詳細は以下のURLを参照してください。

https://docs.microsoft.com/ja-jp/azure/data-factory/data-flow-exists

追加及び更新されたデータの抽出

最新データに対してWHERE NOT EXISTS条件を適用します。つまり、

  • 左ストリームにnewフォルダのデータ
  • 右ストリームにoldフォルダのデータ

を設定し、Exist typeとして

  • Doesn't exist

Exists conditionsとして

  • (左ストリームの Id) == (右ストリームの Id)
  • (左ストリームの LastUpdated) == (右ストリームの LastUpdated)

を設定します。これで、最新データにおいて追加、更新されたデータが抽出されます。

newフォルダのデータをソースとしたソース変換Newのデータ例:

oldフォルダのデータをソースとしたソース変換Oldのデータ例:

exists変換の設定例:

削除されたデータの抽出

古いデータに対してWHERE NOT EXISTS条件を適用します。つまり、

  • 左ストリームにoldフォルダのデータ
  • 右ストリームにnewフォルダのデータ

を設定し、Exist typeとして

  • Doesn't exist

Exists conditionsとして

  • (左ストリームの Id) == (右ストリーム Id)

を設定します。これで、最新データにおいて削除されたデータが抽出されます。

exists変換の設定例:

結果

2つのExists変換にて、以下のデータが抽出されました。

追加もしくは更新されたデータ

削除されたデータ

ポイント

追加・更新されたデータと削除されたデータを個別に抽出するところがポイントになります。

ただし、追加更新されたデータと削除されたデータの扱いは後段の処理で異なることが多いため、個別に抽出できるのはメリットになるかと思います。

また、Exists conditionsにより抽出条件が柔軟に変えられるので、様々なユースケースに適用できそうですね。

Discussion