Azure Synapse Pipeline - Data Flow でフォルダ間のデータの差分を取る方法
Azure Synapse Pipeline もしくは Azure Data Factory の Data Flow で、2つのフォルダに格納されているファイルのデータの差分抽出をしてみます。
ユースケースは以下になります。
-
oldフォルダには古いデータ、newフォルダには最新のデータが格納されている - データの項目は
Id、LastUpdated、Col1 - 主キーは
Id列で新規にデータが生成されると新しいIdが生成される - データが変更されると
LastUpdated列が更新される
Exists 変換の基本的な動作
Data Flow には Exists変換があり、2つのデータストリーム間の差分を抽出できますが、これはSQL文でいうところのWHERE EXISTSもしくはWHERE NOT EXISTSの動作になります。これはExist typeパラメータで設定します。Existsに設定すると前者、Doesn't existに設定すると後者の動作になります。
比較はExists conditionsパラメータで設定します。項目同士の比較、もしくはカスタム式での条件が設定できます。
詳細は以下のURLを参照してください。
追加及び更新されたデータの抽出
最新データに対してWHERE NOT EXISTS条件を適用します。つまり、
- 左ストリームに
newフォルダのデータ - 右ストリームに
oldフォルダのデータ
を設定し、Exist typeとして
Doesn't exist
Exists conditionsとして
(左ストリームの Id) == (右ストリームの Id)(左ストリームの LastUpdated) == (右ストリームの LastUpdated)
を設定します。これで、最新データにおいて追加、更新されたデータが抽出されます。
newフォルダのデータをソースとしたソース変換Newのデータ例:

oldフォルダのデータをソースとしたソース変換Oldのデータ例:

exists変換の設定例:

削除されたデータの抽出
古いデータに対してWHERE NOT EXISTS条件を適用します。つまり、
- 左ストリームに
oldフォルダのデータ - 右ストリームに
newフォルダのデータ
を設定し、Exist typeとして
Doesn't exist
Exists conditionsとして
(左ストリームの Id) == (右ストリーム Id)
を設定します。これで、最新データにおいて削除されたデータが抽出されます。
exists変換の設定例:

結果
2つのExists変換にて、以下のデータが抽出されました。
追加もしくは更新されたデータ

削除されたデータ

ポイント
追加・更新されたデータと削除されたデータを個別に抽出するところがポイントになります。
ただし、追加更新されたデータと削除されたデータの扱いは後段の処理で異なることが多いため、個別に抽出できるのはメリットになるかと思います。
また、Exists conditionsにより抽出条件が柔軟に変えられるので、様々なユースケースに適用できそうですね。
Discussion