Closed14
Azure Data Factory のマッピング データ フローの Delta Lake へのシンク変換の挙動メモ

Azure Data Factory のマッピング データ フローの Delta Lake へのシンク変換について、設定を変更した際の挙動をキャプチャ付きでメモする。試したデータ フローはソース変換とシンク変換のみの単純なもの。
環境情報
- データ フローの構成
- ソース変換:ADLS Gen2 の CSV ファイルを読み込む
- シンク変換:ADLS Gen2 に Delta 形式で書き込む
- 処理はデータ フローのデバッグ・Small (ドライバー 4 コア + エグゼキューター 4 コア) で実施
結論メモ
テーブル アクション
- テーブル アクション = なしの場合、新規レコードが追記される (Append)
- テーブル アクション = なし、かつアップサートを許可した場合、同一キーのレコードは更新され、そうでないレコードは追記される (Merge)
- テーブル アクション = 上書きの場合、データは上書きされ (Overwrite)、Delta のバージョン履歴は残る
- テーブル アクション = 切り詰めるの場合、シンクの前に実データも Delta のバージョン履歴も削除され、データが追記される
デルタ オプション
- 自動圧縮は、Parquet ファイルの数が少ない場合は実施されない
- 公式ドキュメントにも「自動圧縮は、少なくとも 50 個のファイルがある場合にのみ開始されます」との記述がある
- https://learn.microsoft.com/ja-jp/azure/data-factory/format-delta#delta-sink-optimization-options
- 書き込みの最適化をオンにすると、出力される Parquet ファイルがある程度の大きさにまとめられる
- ただし公式ドキュメントに記載のある通り、「書き込みプロセスが最適化されると、ETL ジョブ全体の速度は低下する」ため、この特性は理解して使うのが望ましい

デフォルト設定 - 1 回目
シンク変換設定
デフォルト設定のまま変更なし
- 圧縮の種類:なし
- バキューム:0
- テーブル アクション:なし
- 更新方法:挿入を許可
- デルタ オプション - スキーマのマージ:チェックなし
- 自動圧縮:チェックなし
- 書き込みの最適化:チェックなし
ストレージ ブラウザー
- 圧縮の種類がなしでも snappy 圧縮されている
- Parquet ファイルが 4 ファイルに分かれている (処理を実行したクラスターのエグゼキューターのコア数と一致)
_delta_log フォルダ
Databricks SQL での確認
件数確認
SELECT count(*) FROM olist_orders_dataset;
count(1) |
---|
99441 |
テーブル メタデータ確認
DESCRIBE TABLE EXTENDED olist_orders_dataset;
col_name data_type comment
order_id string
customer_id string
order_status string
order_purchase_timestamp timestamp
order_approved_at timestamp
order_delivered_carrier_date timestamp
order_delivered_customer_date timestamp
order_estimated_delivery_date timestamp
# Partitioning
Not partitioned
# Detailed Table Information
Catalog hive_metastore
Database olist_enriched
Table olist_orders_dataset
Location abfss://olist-brazilian-ecommerce@sthinadsdemodfs.dfs.core.windows.net/enriched/olist_orders_dataset
Provider delta
Owner root
External true
Type EXTERNAL
Table Properties [delta.minReaderVersion=1,delta.minWriterVersion=2]
テーブル履歴確認
DESCRIBE HISTORY olist_orders_dataset;
version | timestamp | userId | userName | operation | operationParameters | job | notebook | clusterId | readVersion | isolationLevel | isBlindAppend | operationMetrics | userMetadata | engineInfo |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 2022-09-19 13:48:20 | WRITE | {"mode":"Append","partitionBy":"[]"} | True | {"numFiles":"4","numOutputBytes":"10812334","numOutputRows":"99441"} |

デフォルト設定 - 2 回目
ストレージ ブラウザー
- Parquet ファイルが 4 個増え、計 8 個になった
_delta_log フォルダ
- json ファイルが 1 個増え、計 2 個になった
件数確認
count(1) |
---|
198882 |
テーブル メタデータ確認
初回から変化なしのため省略
テーブル履歴確認
version | timestamp | userId | userName | operation | operationParameters | job | notebook | clusterId | readVersion | isolationLevel | isBlindAppend | operationMetrics | userMetadata | engineInfo |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
1 | 2022-09-19 14:07:32 | WRITE | {"mode":"Append","partitionBy":"[]"} | 0.0 | True | {"numFiles":"4","numOutputBytes":"10812334","numOutputRows":"99441"} | ||||||||
0 | 2022-09-19 13:48:20 | WRITE | {"mode":"Append","partitionBy":"[]"} | True | {"numFiles":"4","numOutputBytes":"10812334","numOutputRows":"99441"} |

上書き - 1 回目
シンク変換設定
- テーブル アクションを上書きに変更
ストレージ ブラウザー
- ファイルが 4 個増え、計 12 個に
_delta_log フォルダ
- ファイルが 1 個増加
件数確認
- 初回と同じ件数に戻った
count(1) |
---|
99441 |
テーブル メタデータ確認
初回から変化なしのため省略
テーブル履歴確認
- モードが
Append
から "Overwrite" に変わった
version | timestamp | userId | userName | operation | operationParameters | job | notebook | clusterId | readVersion | isolationLevel | isBlindAppend | operationMetrics | userMetadata | engineInfo |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
2 | 2022-09-19 14:14:52 | WRITE | {"mode":"Overwrite","partitionBy":"[]"} | 1.0 | False | {"numFiles":"4","numOutputBytes":"10812334","numOutputRows":"99441"} | ||||||||
1 | 2022-09-19 14:07:32 | WRITE | {"mode":"Append","partitionBy":"[]"} | 0.0 | True | {"numFiles":"4","numOutputBytes":"10812334","numOutputRows":"99441"} | ||||||||
0 | 2022-09-19 13:48:20 | WRITE | {"mode":"Append","partitionBy":"[]"} | True | {"numFiles":"4","numOutputBytes":"10812334","numOutputRows":"99441"} |

上書き - 2 回目
ストレージ ブラウザー
- ファイルが 4 個増加
_delta_log フォルダー
- ファイルが 1 個増加
件数確認
count(1) |
---|
99441 |
テーブル メタデータ確認
初回から変化なしのため省略
テーブル履歴確認
version | timestamp | userId | userName | operation | operationParameters | job | notebook | clusterId | readVersion | isolationLevel | isBlindAppend | operationMetrics | userMetadata | engineInfo |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
3 | 2022-09-19 14:22:57 | WRITE | {"mode":"Overwrite","partitionBy":"[]"} | 2.0 | False | {"numFiles":"4","numOutputBytes":"10812334","numOutputRows":"99441"} | ||||||||
2 | 2022-09-19 14:14:52 | WRITE | {"mode":"Overwrite","partitionBy":"[]"} | 1.0 | False | {"numFiles":"4","numOutputBytes":"10812334","numOutputRows":"99441"} | ||||||||
1 | 2022-09-19 14:07:32 | WRITE | {"mode":"Append","partitionBy":"[]"} | 0.0 | True | {"numFiles":"4","numOutputBytes":"10812334","numOutputRows":"99441"} | ||||||||
0 | 2022-09-19 13:48:20 | WRITE | {"mode":"Append","partitionBy":"[]"} | True | {"numFiles":"4","numOutputBytes":"10812334","numOutputRows":"99441"} |

切り詰める - 1 回目
シンク変換設定
- [テーブル アクション] を [切り詰める] に変更
ストレージ ブラウザー
- ファイルが 4 個に戻った
_delta_log フォルダー
- ファイルが 1 個に戻った
件数確認
count(1) |
---|
99441 |
テーブル メタデータ確認
初回から変化なしのため省略
テーブル履歴確認
version | timestamp | userId | userName | operation | operationParameters | job | notebook | clusterId | readVersion | isolationLevel | isBlindAppend | operationMetrics | userMetadata | engineInfo |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 2022-09-19 14:27:21 | WRITE | {"mode":"Append","partitionBy":"[]"} | True | {"numFiles":"4","numOutputBytes":"10812334","numOutputRows":"99441"} |

切り詰める & 自動圧縮
シンク変換設定
ストレージ ブラウザー
_delta_log フォルダー
件数確認
count(1) |
---|
99441 |
テーブル メタデータ確認
col_name data_type comment
order_id string
customer_id string
order_status string
order_purchase_timestamp timestamp
order_approved_at timestamp
order_delivered_carrier_date timestamp
order_delivered_customer_date timestamp
order_estimated_delivery_date timestamp
# Partitioning
Not partitioned
# Detailed Table Information
Catalog hive_metastore
Database olist_enriched
Table olist_orders_dataset
Location abfss://olist-brazilian-ecommerce@sthinadsdemodfs.dfs.core.windows.net/enriched/olist_orders_dataset
Provider delta
Owner root
External true
Type EXTERNAL
Table Properties [delta.minReaderVersion=1,delta.minWriterVersion=2]
テーブル履歴確認
version | timestamp | userId | userName | operation | operationParameters | job | notebook | clusterId | readVersion | isolationLevel | isBlindAppend | operationMetrics | userMetadata | engineInfo |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 2022-09-19 14:35:30 | WRITE | {"mode":"Append","partitionBy":"[]"} | True | {"numFiles":"4","numOutputBytes":"10812334","numOutputRows":"99441"} |

切り詰める & 書き込みの最適化
シンク変換設定
ストレージ ブラウザー
- 1 ファイルにマージされた
_delta_log フォルダー
件数確認
count(1) |
---|
99441 |
テーブル メタデータ確認
col_name data_type comment
order_id string
customer_id string
order_status string
order_purchase_timestamp timestamp
order_approved_at timestamp
order_delivered_carrier_date timestamp
order_delivered_customer_date timestamp
order_estimated_delivery_date timestamp
# Partitioning
Not partitioned
# Detailed Table Information
Catalog hive_metastore
Database olist_enriched
Table olist_orders_dataset
Location abfss://olist-brazilian-ecommerce@sthinadsdemodfs.dfs.core.windows.net/enriched/olist_orders_dataset
Provider delta
Owner root
External true
Type EXTERNAL
Table Properties [delta.minReaderVersion=1,delta.minWriterVersion=2]
テーブル履歴確認
version | timestamp | userId | userName | operation | operationParameters | job | notebook | clusterId | readVersion | isolationLevel | isBlindAppend | operationMetrics | userMetadata | engineInfo |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 2022-09-19 14:43:08 | WRITE | {"mode":"Append","partitionBy":"[]"} | True | {"numFiles":"1","numOutputBytes":"10571666","numOutputRows":"99441"} |

アップサート - 1 回目
(一旦ストレージ アカウント上のフォルダを削除)
シンク変換設定
行の変換設定
ストレージ ブラウザー
_delta_log フォルダー
件数確認
count(1) |
---|
99441 |
テーブル メタデータ確認
col_name data_type comment
order_id string
customer_id string
order_status string
order_purchase_timestamp timestamp
order_approved_at timestamp
order_delivered_carrier_date timestamp
order_delivered_customer_date timestamp
order_estimated_delivery_date timestamp
# Partitioning
Not partitioned
# Detailed Table Information
Catalog hive_metastore
Database olist_enriched
Table olist_orders_dataset
Location abfss://olist-brazilian-ecommerce@sthinadsdemodfs.dfs.core.windows.net/enriched/olist_orders_dataset
Provider delta
Owner root
External true
Type EXTERNAL
Table Properties [delta.minReaderVersion=1,delta.minWriterVersion=2]
テーブル履歴確認
version | timestamp | userId | userName | operation | operationParameters | job | notebook | clusterId | readVersion | isolationLevel | isBlindAppend | operationMetrics | userMetadata | engineInfo |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 2022-09-19 15:20:36 | WRITE | {"mode":"Append","partitionBy":"[]"} | True | {"numFiles":"1","numOutputBytes":"10571666","numOutputRows":"99441"} |

アップサート - 2 回目
ストレージ ブラウザー
_delta_log フォルダー
件数確認
1 回目と同様のため省略
テーブル メタデータ確認
1 回目と同様のため省略
テーブル履歴確認
version | timestamp | userId | userName | operation | operationParameters | job | notebook | clusterId | readVersion | isolationLevel | isBlindAppend | operationMetrics | userMetadata | engineInfo |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
1 | 2022-09-19 15:27:39 | MERGE | {"deletePredicate":"(NOT ((source.rbac1902def9747a18c06c157773cee24 & 4) = 0))","insertPredicate":"((NOT ((source.rbac1902def9747a18c06c157773cee24 & 1) = 0)) OR (NOT ((source.rbac1902def9747a18c06c157773cee24 & 8) = 0)))","predicate":"(source.order_id = target.order_id )","updatePredicate":"((NOT ((source.rbac1902def9747a18c06c157773cee24 & 2) = 0)) OR (NOT ((source.rbac1902def9747a18c06c157773cee24 & 8) = 0)))"} |
0.0 | False | {"numOutputRows":"99441","numSourceRows":"99441","numTargetFilesAdded":"1","numTargetFilesRemoved":"1","numTargetRowsCopied":"0","numTargetRowsDeleted":"0","numTargetRowsInserted":"0","numTargetRowsUpdated":"99441"} | ||||||||
0 | 2022-09-19 15:20:36 | WRITE | {"mode":"Append","partitionBy":"[]"} | True | {"numFiles":"1","numOutputBytes":"10571666","numOutputRows":"99441"} |

スキーマのマージなし
シンク変換でスキーマのマージなしに列を追加した場合の挙動を確認 > DFExecutorUserError が発生
Job failed due to reason: at Sink 'sink1': cannot resolve `target.dummy1` in UPDATE clause given columns target.`order_id`, target.`customer_id`, target.`order_status`, target.`order_purchase_timestamp`, target.`order_approved_at`, target.`order_delivered_carrier_date`, target.`order_delivered_customer_date`, target.`order_estimated_delivery_date`;

スキーマのマージあり
同様のエラーが発生
以下のファイルを新規に入力として与える形でも同様のエラーが発生
order_id | customer_id | order_status | order_purchase_timestamp | order_approved_at | order_delivered_carrier_date | order_delivered_customer_date | order_estimated_delivery_date | dummy1col |
---|---|---|---|---|---|---|---|---|
20220920_1 | 9ef432eb6251297304e76186b10a928d | delivered | 2017-10-02 10:56:33 | 2017-10-02 11:07:15 | 2017-10-04 19:55:00 | 2017-10-10 21:25:13 | 2017-10-18 00:00:00 | dummy1val |

アップサートを許可 & 自動圧縮
(ADLS 上のファイルを削除)
1 回目実行後のストレージ アカウント
2 回目実行後のストレージ アカウント
version | timestamp | userId | userName | operation | operationParameters | job | notebook | clusterId | readVersion | isolationLevel | isBlindAppend | operationMetrics | userMetadata | engineInfo |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
2 | 2022-09-19 16:38:43 | OPTIMIZE | {"auto":"true","batchId":"0","predicate":"[]","zOrderBy":"[]"} | 1.0 | False | {"maxFileSize":"10551259","minFileSize":"10551259","numAddedBytes":"10551259","numAddedFiles":"1","numRemovedBytes":"11735080","numRemovedFiles":"200","p25FileSize":"10551259","p50FileSize":"10551259","p75FileSize":"10551259"} | ||||||||
1 | 2022-09-19 16:38:34 | MERGE | {"deletePredicate":"(NOT ((source.r6b4f6e756a99445083c3f2ed4c36a1cc & 4) = 0))","insertPredicate":"((NOT ((source.r6b4f6e756a99445083c3f2ed4c36a1cc & 1) = 0)) OR (NOT ((source.r6b4f6e756a99445083c3f2ed4c36a1cc & 8) = 0)))","predicate":"(source.order_id = target.order_id )","updatePredicate":"((NOT ((source.r6b4f6e756a99445083c3f2ed4c36a1cc & 2) = 0)) OR (NOT ((source.r6b4f6e756a99445083c3f2ed4c36a1cc & 8) = 0)))"} |
0.0 | False | {"numOutputRows":"99441","numSourceRows":"99441","numTargetFilesAdded":"200","numTargetFilesRemoved":"4","numTargetRowsCopied":"0","numTargetRowsDeleted":"0","numTargetRowsInserted":"0","numTargetRowsUpdated":"99441"} | ||||||||
0 | 2022-09-19 16:36:48 | WRITE | {"mode":"Append","partitionBy":"[]"} | True | {"numFiles":"4","numOutputBytes":"10812334","numOutputRows":"99441"} |
このスクラップは2022/09/20にクローズされました