Closed14

Azure Data Factory のマッピング データ フローの Delta Lake へのシンク変換の挙動メモ

nakazaxnakazax

Azure Data Factory のマッピング データ フローの Delta Lake へのシンク変換について、設定を変更した際の挙動をキャプチャ付きでメモする。試したデータ フローはソース変換とシンク変換のみの単純なもの。

環境情報

  • データ フローの構成
    • ソース変換:ADLS Gen2 の CSV ファイルを読み込む
    • シンク変換:ADLS Gen2 に Delta 形式で書き込む
  • 処理はデータ フローのデバッグ・Small (ドライバー 4 コア + エグゼキューター 4 コア) で実施

結論メモ

テーブル アクション

  • テーブル アクション = なしの場合、新規レコードが追記される (Append)
  • テーブル アクション = なし、かつアップサートを許可した場合、同一キーのレコードは更新され、そうでないレコードは追記される (Merge)
  • テーブル アクション = 上書きの場合、データは上書きされ (Overwrite)、Delta のバージョン履歴は残る
  • テーブル アクション = 切り詰めるの場合、シンクの前に実データも Delta のバージョン履歴も削除され、データが追記される

デルタ オプション

  • 自動圧縮は、Parquet ファイルの数が少ない場合は実施されない
  • 書き込みの最適化をオンにすると、出力される Parquet ファイルがある程度の大きさにまとめられる
    • ただし公式ドキュメントに記載のある通り、「書き込みプロセスが最適化されると、ETL ジョブ全体の速度は低下する」ため、この特性は理解して使うのが望ましい
nakazaxnakazax

デフォルト設定 - 1 回目

シンク変換設定

デフォルト設定のまま変更なし

  • 圧縮の種類:なし
  • バキューム:0
  • テーブル アクション:なし
  • 更新方法:挿入を許可
  • デルタ オプション - スキーマのマージ:チェックなし
  • 自動圧縮:チェックなし
  • 書き込みの最適化:チェックなし

ストレージ ブラウザー

  • 圧縮の種類がなしでも snappy 圧縮されている
  • Parquet ファイルが 4 ファイルに分かれている (処理を実行したクラスターのエグゼキューターのコア数と一致)

_delta_log フォルダ

Databricks SQL での確認

件数確認

SELECT count(*) FROM olist_orders_dataset;
count(1)
99441

テーブル メタデータ確認

DESCRIBE TABLE EXTENDED olist_orders_dataset;
col_name	data_type	comment
order_id	string	
customer_id	string	
order_status	string	
order_purchase_timestamp	timestamp	
order_approved_at	timestamp	
order_delivered_carrier_date	timestamp	
order_delivered_customer_date	timestamp	
order_estimated_delivery_date	timestamp	
		
# Partitioning		
Not partitioned		
		
# Detailed Table Information		
Catalog	hive_metastore	
Database	olist_enriched	
Table	olist_orders_dataset	
Location	abfss://olist-brazilian-ecommerce@sthinadsdemodfs.dfs.core.windows.net/enriched/olist_orders_dataset	
Provider	delta	
Owner	root	
External	true	
Type	EXTERNAL	
Table Properties	[delta.minReaderVersion=1,delta.minWriterVersion=2]	

テーブル履歴確認

DESCRIBE HISTORY olist_orders_dataset;
version timestamp userId userName operation operationParameters job notebook clusterId readVersion isolationLevel isBlindAppend operationMetrics userMetadata engineInfo
0 2022-09-19 13:48:20 WRITE {"mode":"Append","partitionBy":"[]"} True {"numFiles":"4","numOutputBytes":"10812334","numOutputRows":"99441"}
nakazaxnakazax

デフォルト設定 - 2 回目

ストレージ ブラウザー

  • Parquet ファイルが 4 個増え、計 8 個になった

_delta_log フォルダ

  • json ファイルが 1 個増え、計 2 個になった

件数確認

count(1)
198882

テーブル メタデータ確認

初回から変化なしのため省略

テーブル履歴確認

version timestamp userId userName operation operationParameters job notebook clusterId readVersion isolationLevel isBlindAppend operationMetrics userMetadata engineInfo
1 2022-09-19 14:07:32 WRITE {"mode":"Append","partitionBy":"[]"} 0.0 True {"numFiles":"4","numOutputBytes":"10812334","numOutputRows":"99441"}
0 2022-09-19 13:48:20 WRITE {"mode":"Append","partitionBy":"[]"} True {"numFiles":"4","numOutputBytes":"10812334","numOutputRows":"99441"}
nakazaxnakazax

上書き - 1 回目

シンク変換設定

  • テーブル アクションを上書きに変更

ストレージ ブラウザー

  • ファイルが 4 個増え、計 12 個に

_delta_log フォルダ

  • ファイルが 1 個増加

件数確認

  • 初回と同じ件数に戻った
count(1)
99441

テーブル メタデータ確認

初回から変化なしのため省略

テーブル履歴確認

  • モードが Append から "Overwrite" に変わった
version timestamp userId userName operation operationParameters job notebook clusterId readVersion isolationLevel isBlindAppend operationMetrics userMetadata engineInfo
2 2022-09-19 14:14:52 WRITE {"mode":"Overwrite","partitionBy":"[]"} 1.0 False {"numFiles":"4","numOutputBytes":"10812334","numOutputRows":"99441"}
1 2022-09-19 14:07:32 WRITE {"mode":"Append","partitionBy":"[]"} 0.0 True {"numFiles":"4","numOutputBytes":"10812334","numOutputRows":"99441"}
0 2022-09-19 13:48:20 WRITE {"mode":"Append","partitionBy":"[]"} True {"numFiles":"4","numOutputBytes":"10812334","numOutputRows":"99441"}
nakazaxnakazax

上書き - 2 回目

ストレージ ブラウザー

  • ファイルが 4 個増加

_delta_log フォルダー

  • ファイルが 1 個増加

件数確認

count(1)
99441

テーブル メタデータ確認

初回から変化なしのため省略

テーブル履歴確認

version timestamp userId userName operation operationParameters job notebook clusterId readVersion isolationLevel isBlindAppend operationMetrics userMetadata engineInfo
3 2022-09-19 14:22:57 WRITE {"mode":"Overwrite","partitionBy":"[]"} 2.0 False {"numFiles":"4","numOutputBytes":"10812334","numOutputRows":"99441"}
2 2022-09-19 14:14:52 WRITE {"mode":"Overwrite","partitionBy":"[]"} 1.0 False {"numFiles":"4","numOutputBytes":"10812334","numOutputRows":"99441"}
1 2022-09-19 14:07:32 WRITE {"mode":"Append","partitionBy":"[]"} 0.0 True {"numFiles":"4","numOutputBytes":"10812334","numOutputRows":"99441"}
0 2022-09-19 13:48:20 WRITE {"mode":"Append","partitionBy":"[]"} True {"numFiles":"4","numOutputBytes":"10812334","numOutputRows":"99441"}
nakazaxnakazax

切り詰める - 1 回目

シンク変換設定

  • [テーブル アクション] を [切り詰める] に変更

ストレージ ブラウザー

  • ファイルが 4 個に戻った

_delta_log フォルダー

  • ファイルが 1 個に戻った

件数確認

count(1)
99441

テーブル メタデータ確認

初回から変化なしのため省略

テーブル履歴確認

version timestamp userId userName operation operationParameters job notebook clusterId readVersion isolationLevel isBlindAppend operationMetrics userMetadata engineInfo
0 2022-09-19 14:27:21 WRITE {"mode":"Append","partitionBy":"[]"} True {"numFiles":"4","numOutputBytes":"10812334","numOutputRows":"99441"}
nakazaxnakazax

切り詰める & 自動圧縮

シンク変換設定

ストレージ ブラウザー

_delta_log フォルダー

件数確認

count(1)
99441

テーブル メタデータ確認

col_name	data_type	comment
order_id	string	
customer_id	string	
order_status	string	
order_purchase_timestamp	timestamp	
order_approved_at	timestamp	
order_delivered_carrier_date	timestamp	
order_delivered_customer_date	timestamp	
order_estimated_delivery_date	timestamp	
		
# Partitioning		
Not partitioned		
		
# Detailed Table Information		
Catalog	hive_metastore	
Database	olist_enriched	
Table	olist_orders_dataset	
Location	abfss://olist-brazilian-ecommerce@sthinadsdemodfs.dfs.core.windows.net/enriched/olist_orders_dataset	
Provider	delta	
Owner	root	
External	true	
Type	EXTERNAL	
Table Properties	[delta.minReaderVersion=1,delta.minWriterVersion=2]	

テーブル履歴確認

version timestamp userId userName operation operationParameters job notebook clusterId readVersion isolationLevel isBlindAppend operationMetrics userMetadata engineInfo
0 2022-09-19 14:35:30 WRITE {"mode":"Append","partitionBy":"[]"} True {"numFiles":"4","numOutputBytes":"10812334","numOutputRows":"99441"}
nakazaxnakazax

切り詰める & 書き込みの最適化

シンク変換設定

ストレージ ブラウザー

  • 1 ファイルにマージされた

_delta_log フォルダー

件数確認

count(1)
99441

テーブル メタデータ確認

col_name	data_type	comment
order_id	string	
customer_id	string	
order_status	string	
order_purchase_timestamp	timestamp	
order_approved_at	timestamp	
order_delivered_carrier_date	timestamp	
order_delivered_customer_date	timestamp	
order_estimated_delivery_date	timestamp	
		
# Partitioning		
Not partitioned		
		
# Detailed Table Information		
Catalog	hive_metastore	
Database	olist_enriched	
Table	olist_orders_dataset	
Location	abfss://olist-brazilian-ecommerce@sthinadsdemodfs.dfs.core.windows.net/enriched/olist_orders_dataset	
Provider	delta	
Owner	root	
External	true	
Type	EXTERNAL	
Table Properties	[delta.minReaderVersion=1,delta.minWriterVersion=2]	

テーブル履歴確認

version timestamp userId userName operation operationParameters job notebook clusterId readVersion isolationLevel isBlindAppend operationMetrics userMetadata engineInfo
0 2022-09-19 14:43:08 WRITE {"mode":"Append","partitionBy":"[]"} True {"numFiles":"1","numOutputBytes":"10571666","numOutputRows":"99441"}
nakazaxnakazax

アップサート - 1 回目

(一旦ストレージ アカウント上のフォルダを削除)

シンク変換設定

行の変換設定

ストレージ ブラウザー

_delta_log フォルダー

件数確認

count(1)
99441

テーブル メタデータ確認

col_name	data_type	comment
order_id	string	
customer_id	string	
order_status	string	
order_purchase_timestamp	timestamp	
order_approved_at	timestamp	
order_delivered_carrier_date	timestamp	
order_delivered_customer_date	timestamp	
order_estimated_delivery_date	timestamp	
		
# Partitioning		
Not partitioned		
		
# Detailed Table Information		
Catalog	hive_metastore	
Database	olist_enriched	
Table	olist_orders_dataset	
Location	abfss://olist-brazilian-ecommerce@sthinadsdemodfs.dfs.core.windows.net/enriched/olist_orders_dataset	
Provider	delta	
Owner	root	
External	true	
Type	EXTERNAL	
Table Properties	[delta.minReaderVersion=1,delta.minWriterVersion=2]	

テーブル履歴確認

version timestamp userId userName operation operationParameters job notebook clusterId readVersion isolationLevel isBlindAppend operationMetrics userMetadata engineInfo
0 2022-09-19 15:20:36 WRITE {"mode":"Append","partitionBy":"[]"} True {"numFiles":"1","numOutputBytes":"10571666","numOutputRows":"99441"}
nakazaxnakazax

アップサート - 2 回目

ストレージ ブラウザー

_delta_log フォルダー

件数確認

1 回目と同様のため省略

テーブル メタデータ確認

1 回目と同様のため省略

テーブル履歴確認

version timestamp userId userName operation operationParameters job notebook clusterId readVersion isolationLevel isBlindAppend operationMetrics userMetadata engineInfo
1 2022-09-19 15:27:39 MERGE {"deletePredicate":"(NOT ((source.rbac1902def9747a18c06c157773cee24 & 4) = 0))","insertPredicate":"((NOT ((source.rbac1902def9747a18c06c157773cee24 & 1) = 0)) OR (NOT ((source.rbac1902def9747a18c06c157773cee24 & 8) = 0)))","predicate":"(source.order_id = target.order_id)","updatePredicate":"((NOT ((source.rbac1902def9747a18c06c157773cee24 & 2) = 0)) OR (NOT ((source.rbac1902def9747a18c06c157773cee24 & 8) = 0)))"} 0.0 False {"numOutputRows":"99441","numSourceRows":"99441","numTargetFilesAdded":"1","numTargetFilesRemoved":"1","numTargetRowsCopied":"0","numTargetRowsDeleted":"0","numTargetRowsInserted":"0","numTargetRowsUpdated":"99441"}
0 2022-09-19 15:20:36 WRITE {"mode":"Append","partitionBy":"[]"} True {"numFiles":"1","numOutputBytes":"10571666","numOutputRows":"99441"}
nakazaxnakazax

スキーマのマージなし

シンク変換でスキーマのマージなしに列を追加した場合の挙動を確認 > DFExecutorUserError が発生




Job failed due to reason: at Sink 'sink1': cannot resolve `target.dummy1` in UPDATE clause given columns target.`order_id`, target.`customer_id`, target.`order_status`, target.`order_purchase_timestamp`, target.`order_approved_at`, target.`order_delivered_carrier_date`, target.`order_delivered_customer_date`, target.`order_estimated_delivery_date`;
nakazaxnakazax

スキーマのマージあり

同様のエラーが発生


以下のファイルを新規に入力として与える形でも同様のエラーが発生

order_id customer_id order_status order_purchase_timestamp order_approved_at order_delivered_carrier_date order_delivered_customer_date order_estimated_delivery_date dummy1col
20220920_1 9ef432eb6251297304e76186b10a928d delivered 2017-10-02 10:56:33 2017-10-02 11:07:15 2017-10-04 19:55:00 2017-10-10 21:25:13 2017-10-18 00:00:00 dummy1val



nakazaxnakazax

アップサートを許可 & 自動圧縮

(ADLS 上のファイルを削除)

1 回目実行後のストレージ アカウント

2 回目実行後のストレージ アカウント

version timestamp userId userName operation operationParameters job notebook clusterId readVersion isolationLevel isBlindAppend operationMetrics userMetadata engineInfo
2 2022-09-19 16:38:43 OPTIMIZE {"auto":"true","batchId":"0","predicate":"[]","zOrderBy":"[]"} 1.0 False {"maxFileSize":"10551259","minFileSize":"10551259","numAddedBytes":"10551259","numAddedFiles":"1","numRemovedBytes":"11735080","numRemovedFiles":"200","p25FileSize":"10551259","p50FileSize":"10551259","p75FileSize":"10551259"}
1 2022-09-19 16:38:34 MERGE {"deletePredicate":"(NOT ((source.r6b4f6e756a99445083c3f2ed4c36a1cc & 4) = 0))","insertPredicate":"((NOT ((source.r6b4f6e756a99445083c3f2ed4c36a1cc & 1) = 0)) OR (NOT ((source.r6b4f6e756a99445083c3f2ed4c36a1cc & 8) = 0)))","predicate":"(source.order_id = target.order_id)","updatePredicate":"((NOT ((source.r6b4f6e756a99445083c3f2ed4c36a1cc & 2) = 0)) OR (NOT ((source.r6b4f6e756a99445083c3f2ed4c36a1cc & 8) = 0)))"} 0.0 False {"numOutputRows":"99441","numSourceRows":"99441","numTargetFilesAdded":"200","numTargetFilesRemoved":"4","numTargetRowsCopied":"0","numTargetRowsDeleted":"0","numTargetRowsInserted":"0","numTargetRowsUpdated":"99441"}
0 2022-09-19 16:36:48 WRITE {"mode":"Append","partitionBy":"[]"} True {"numFiles":"4","numOutputBytes":"10812334","numOutputRows":"99441"}
このスクラップは2022/09/20にクローズされました