Azure Synapse Pipeline - Data Flow でフォルダ間のデータの差分を取る方法
Azure Synapse Pipeline もしくは Azure Data Factory の Data Flow で、2つのフォルダに格納されているファイルのデータの差分抽出をしてみます。
ユースケースは以下になります。
-
old
フォルダには古いデータ、new
フォルダには最新のデータが格納されている - データの項目は
Id
、LastUpdated
、Col1
- 主キーは
Id
列で新規にデータが生成されると新しいId
が生成される - データが変更されると
LastUpdated
列が更新される
Exists 変換の基本的な動作
Data Flow には Exists
変換があり、2つのデータストリーム間の差分を抽出できますが、これはSQL文でいうところのWHERE EXISTS
もしくはWHERE NOT EXISTS
の動作になります。これはExist type
パラメータで設定します。Exists
に設定すると前者、Doesn't exist
に設定すると後者の動作になります。
比較はExists conditions
パラメータで設定します。項目同士の比較、もしくはカスタム式での条件が設定できます。
詳細は以下のURLを参照してください。
追加及び更新されたデータの抽出
最新データに対してWHERE NOT EXISTS
条件を適用します。つまり、
- 左ストリームに
new
フォルダのデータ - 右ストリームに
old
フォルダのデータ
を設定し、Exist type
として
Doesn't exist
Exists conditions
として
(左ストリームの Id) == (右ストリームの Id)
(左ストリームの LastUpdated) == (右ストリームの LastUpdated)
を設定します。これで、最新データにおいて追加、更新されたデータが抽出されます。
new
フォルダのデータをソースとしたソース変換New
のデータ例:
old
フォルダのデータをソースとしたソース変換Old
のデータ例:
exists
変換の設定例:
削除されたデータの抽出
古いデータに対してWHERE NOT EXISTS
条件を適用します。つまり、
- 左ストリームに
old
フォルダのデータ - 右ストリームに
new
フォルダのデータ
を設定し、Exist type
として
Doesn't exist
Exists conditions
として
(左ストリームの Id) == (右ストリーム Id)
を設定します。これで、最新データにおいて削除されたデータが抽出されます。
exists
変換の設定例:
結果
2つのExists
変換にて、以下のデータが抽出されました。
追加もしくは更新されたデータ
削除されたデータ
ポイント
追加・更新されたデータと削除されたデータを個別に抽出するところがポイントになります。
ただし、追加更新されたデータと削除されたデータの扱いは後段の処理で異なることが多いため、個別に抽出できるのはメリットになるかと思います。
また、Exists conditions
により抽出条件が柔軟に変えられるので、様々なユースケースに適用できそうですね。
Discussion