第1章 はじめに
Databricks Delta Live Tables (DLT) は、データパイプラインを宣言的に構築・運用するためのフレームワークです。本記事では、DLTパイプラインをJSON形式で定義する際の全設定項目について解説します。
DLTのJSON設定パラメータについては、以下の公式ドキュメントサイトを参照して設定することができます。
主要な公式ドキュメント
第2章 設定項目
1. 基本設定項目
DLTパイプラインの基本的な動作を制御する設定項目です。
設定項目 |
データ型 |
説明 |
デフォルト値 |
設定例 |
必須 |
id |
string |
パイプラインの一意識別子(UUID) |
自動生成 |
"123e4567-e89b-12d3-a456-426614174000" |
○ |
name |
string |
パイプライン名 |
なし |
"customer-data-pipeline" |
○ |
pipeline_type |
string |
パイプラインタイプ |
"WORKSPACE" |
"WORKSPACE" , "EXTERNAL"
|
× |
edition |
string |
DLTエディション |
"CORE" |
"CORE" , "PRO" , "ADVANCED"
|
× |
development |
boolean |
開発モード有効 |
false |
true , false
|
× |
continuous |
boolean |
連続実行モード |
false |
true , false
|
× |
channel |
string |
リリースチャンネル |
"CURRENT" |
"CURRENT" , "PREVIEW"
|
× |
photon |
boolean |
Photonエンジン使用 |
false |
true , false
|
× |
serverless |
boolean |
サーバーレス実行 |
false |
true , false
|
× |
2. クラスター設定
パイプライン実行に使用するクラスターの詳細設定です。
基本クラスター設定
設定項目 |
データ型 |
説明 |
デフォルト値 |
設定例 |
必須 |
label |
string |
クラスターラベル |
なし |
"default" , "maintenance"
|
○ |
node_type_id |
string |
ワーカーノードタイプ |
なし |
"i3.xlarge" , "m5.large"
|
○ |
driver_node_type_id |
string |
ドライバーノードタイプ |
node_type_id と同じ |
"i3.2xlarge" |
× |
num_workers |
integer |
固定ワーカー数 |
なし |
4 , 10
|
× |
spark_version |
string |
Sparkバージョン |
最新LTS |
"11.3.x-scala2.12" |
× |
policy_id |
string |
クラスターポリシーID |
なし |
"policy-123456" |
× |
オートスケーリング設定
設定項目 |
データ型 |
説明 |
デフォルト値 |
設定例 |
必須 |
min_workers |
integer |
最小ワーカー数 |
1 |
2 , 10
|
○ |
max_workers |
integer |
最大ワーカー数 |
なし |
20 , 100
|
○ |
mode |
string |
スケーリングモード |
"LEGACY" |
"ENHANCED" , "LEGACY"
|
× |
Sparkコンフィグ設定
設定項目 |
データ型 |
説明 |
デフォルト値 |
設定例 |
必須 |
spark_conf |
object |
Spark設定のマップ |
{} |
{"spark.sql.adaptive.enabled": "true"} |
× |
spark_env_vars |
object |
Spark環境変数 |
{} |
{"PYSPARK_PYTHON": "/databricks/python3/bin/python3"} |
× |
3. AWS/Azure/GCP固有設定
AWS設定 (aws_attributes)
設定項目 |
データ型 |
説明 |
デフォルト値 |
設定例 |
必須 |
first_on_demand |
integer |
オンデマンドインスタンス数 |
1 |
5 , 50
|
× |
availability |
string |
インスタンス利用タイプ |
"SPOT_WITH_FALLBACK" |
"ON_DEMAND" , "SPOT"
|
× |
zone_id |
string |
アベイラビリティゾーン |
"auto" |
"us-west-2a" , "auto"
|
× |
instance_profile_arn |
string |
IAMインスタンスプロファイル |
なし |
"arn:aws:iam::123456789012:instance-profile/databricks-role" |
× |
spot_bid_price_percent |
integer |
スポット入札価格(%) |
100 |
50 , 80
|
× |
ebs_volume_type |
string |
EBSボリュームタイプ |
"GENERAL_PURPOSE_SSD" |
"GENERAL_PURPOSE_SSD" , "THROUGHPUT_OPTIMIZED_HDD"
|
× |
ebs_volume_count |
integer |
EBSボリューム数 |
0 |
1 , 4
|
× |
ebs_volume_size |
integer |
EBSボリュームサイズ(GB) |
100 |
500 , 1000
|
× |
Azure設定 (azure_attributes)
設定項目 |
データ型 |
説明 |
デフォルト値 |
設定例 |
必須 |
availability |
string |
インスタンス利用タイプ |
"ON_DEMAND_AZURE" |
"ON_DEMAND_AZURE" , "SPOT_AZURE"
|
× |
first_on_demand |
integer |
オンデマンドインスタンス数 |
1 |
5 , 10
|
× |
spot_bid_max_price |
number |
スポット最大価格 |
-1 |
0.1 , 0.5
|
× |
GCP設定 (gcp_attributes)
設定項目 |
データ型 |
説明 |
デフォルト値 |
設定例 |
必須 |
use_preemptible_executors |
boolean |
プリエンプティブル使用 |
false |
true , false
|
× |
google_service_account |
string |
サービスアカウント |
なし |
"service-account@project.iam.gserviceaccount.com" |
× |
availability |
string |
インスタンス利用タイプ |
"ON_DEMAND_GCP" |
"ON_DEMAND_GCP" , "PREEMPTIBLE_GCP"
|
× |
4. パフォーマンス・Spark設定
Spark設定 (configuration)
設定項目 |
データ型 |
説明 |
デフォルト値 |
設定例 |
用途 |
spark.sql.adaptive.enabled |
string |
適応クエリ実行 |
"true" |
"true" , "false"
|
パフォーマンス |
spark.sql.adaptive.coalescePartitions.enabled |
string |
パーティション結合 |
"true" |
"true" , "false"
|
パフォーマンス |
spark.sql.shuffle.partitions |
string |
シャッフルパーティション数 |
"200" |
"500" , "1000"
|
並列処理 |
spark.default.parallelism |
string |
デフォルト並列度 |
CPU コア数 × 2 |
"500" , "1000"
|
並列処理 |
spark.sql.streaming.metricsEnabled |
string |
ストリーミングメトリクス |
"false" |
"true" , "false"
|
監視 |
spark.databricks.delta.autoCompact.enabled |
string |
自動圧縮 |
"auto" |
"true" , "false"
|
最適化 |
spark.databricks.delta.optimizeWrite.enabled |
string |
書き込み最適化 |
"auto" |
"true" , "false"
|
最適化 |
リソース設定
設定項目 |
データ型 |
説明 |
デフォルト値 |
設定例 |
用途 |
spark.executor.cores |
string |
Executorコア数 |
ノードタイプ依存 |
"4" , "8"
|
リソース |
spark.executor.memory |
string |
Executorメモリ |
ノードタイプ依存 |
"8g" , "16g"
|
リソース |
spark.driver.cores |
string |
ドライバーコア数 |
ノードタイプ依存 |
"4" , "8"
|
リソース |
spark.driver.memory |
string |
ドライバーメモリ |
ノードタイプ依存 |
"16g" , "32g"
|
リソース |
spark.executor.instances |
string |
Executor数 |
動的 |
"10" , "50"
|
リソース |
DLT固有設定
設定項目 |
データ型 |
説明 |
デフォルト値 |
設定例 |
用途 |
pipelines.trigger.interval |
string |
トリガー間隔 |
"500ms" |
"10 seconds" , "1 minute"
|
ストリーミング |
pipelines.maxBytesPerTrigger |
string |
トリガーあたり最大バイト数 |
なし |
"1GB" , "500MB"
|
ストリーミング |
pipelines.reset.allowed |
string |
リセット許可 |
"true" |
"true" , "false"
|
管理 |
5. データ管理設定
設定項目 |
データ型 |
説明 |
デフォルト値 |
設定例 |
必須 |
catalog |
string |
Unity Catalogカタログ名 |
なし |
"production" , "development"
|
× |
target |
string |
出力先パス(レガシー) |
なし |
"dbfs:/pipelines/my-pipeline" |
× |
storage |
string |
ストレージパス |
自動生成 |
"s3://my-bucket/pipelines/" |
× |
data_sampling |
boolean |
データサンプリング有効 |
false |
true , false
|
× |
Unity Catalog設定
設定項目 |
データ型 |
説明 |
デフォルト値 |
設定例 |
必須 |
catalog |
string |
カタログ名 |
なし |
"main" , "dev"
|
× |
schema |
string |
スキーマ名 |
"default" |
"bronze" , "silver"
|
× |
6. セキュリティ・権限設定
アクセス制御
設定項目 |
データ型 |
説明 |
デフォルト値 |
設定例 |
必須 |
permissions |
array |
権限設定 |
[] |
後述の権限オブジェクト |
× |
権限オブジェクト構造
設定項目 |
データ型 |
説明 |
設定例 |
user_name |
string |
ユーザー名 |
"user@company.com" |
group_name |
string |
グループ名 |
"data-engineers" |
service_principal_name |
string |
サービスプリンシパル名 |
"sp-analytics" |
permission_level |
string |
権限レベル |
"CAN_VIEW" , "CAN_RUN" , "CAN_MANAGE"
|
7. 監視・ログ設定
通知設定
設定項目 |
データ型 |
説明 |
デフォルト値 |
設定例 |
必須 |
notifications |
array |
通知設定 |
[] |
後述の通知オブジェクト |
× |
通知オブジェクト構造
設定項目 |
データ型 |
説明 |
設定例 |
alerts |
array |
アラート種類 |
["on-update-success", "on-update-failure"] |
email_recipients |
array |
通知先メール |
["admin@company.com"] |
8. ライブラリ設定
設定項目 |
データ型 |
説明 |
設定例 |
必須 |
libraries |
array |
ライブラリリスト |
後述のライブラリオブジェクト |
× |
ライブラリオブジェクト構造
設定項目 |
データ型 |
説明 |
設定例 |
notebook.path |
string |
ノートブックパス |
"/Shared/etl-notebook" |
file.path |
string |
Pythonファイルパス |
"/Workspace/Shared/etl.py" |
jar |
string |
JARファイルパス |
"dbfs:/FileStore/jars/custom.jar" |
maven.coordinates |
string |
Maven座標 |
"org.apache.spark:spark-avro_2.12:3.3.0" |
pypi.package |
string |
PyPIパッケージ |
"pandas==1.5.0" |
whl |
string |
Pythonホイールパス |
"dbfs:/FileStore/wheels/custom.whl" |
9. 設定例とベストプラクティス
環境別設定例
{
"development": true,
"continuous": false,
"edition": "CORE",
"clusters": [{
"autoscale": {
"min_workers": 1,
"max_workers": 4
}
}]
}
{
"development": false,
"continuous": true,
"edition": "ADVANCED",
"clusters": [{
"autoscale": {
"min_workers": 10,
"max_workers": 50,
"mode": "ENHANCED"
}
}]
}
パフォーマンス最適化のポイント
- 適切なクラスターサイズの選択
- Sparkパラメータの調整
- データクラスタリングの活用
- 自動最適化機能の有効活用
セキュリティ強化のポイント
- 最小権限の原則
- Unity Catalogの活用
- 適切なIAMロール設定
- ネットワークセキュリティの考慮
まとめ
DLTの設定項目は非常に多岐にわたりますが、用途に応じて適切な設定を行うことで、効率的で安全なデータパイプラインを構築できます。本記事の設定項目を参考に、要件に最適なDLTパイプラインを設計してください。
第3章 DLT 強化オートスケール(Enhanced Autoscaling)
強化オートスケールとは
強化オートスケール(Enhanced Autoscaling) は、DLTで利用可能な高度なクラスター自動スケーリング機能です。従来のLEGACY
モードと比較して、より効率的で応答性の高いスケーリングを提供します。
オートスケールを使用した Lakeflow 宣言型パイプラインのクラスター利用の最適化
設定方法
JSONファイルでの設定例:
{
"clusters": [
{
"label": "default",
"autoscale": {
"min_workers": 2,
"max_workers": 20,
"mode": "ENHANCED"
}
}
]
}
ENHANCEDモードの特徴
1. 高速スケールアップ
-
ワークロード予測: 処理キューの状況を監視し、事前にスケールアップを開始
-
並列起動: 複数のワーカーノードを同時に起動
-
起動時間短縮: ノード起動プロセスの最適化
2. インテリジェントスケールダウン
-
リソース使用率分析: CPU、メモリ、I/O使用率を総合的に判断
-
タスク完了予測: 実行中タスクの完了時間を予測してスケールダウン
-
段階的縮小: 急激な縮小を避け、安定性を重視
3. コスト最適化
-
スポットインスタンス活用: スポットインスタンスの中断リスクを考慮したスケーリング
-
リソース無駄削減: アイドル状態の早期検出と迅速な縮小
LEGACYモードとの比較
機能 |
ENHANCED |
LEGACY |
スケールアップ速度 |
高速(予測的) |
標準(反応的) |
スケールダウン精度 |
高精度 |
基本的 |
ワークロード予測 |
あり |
なし |
コスト効率 |
優秀 |
標準 |
安定性 |
高い |
標準 |
対応ワークロード |
ストリーミング・バッチ両方 |
主にバッチ |
具体的な動作メカニズム
1. スケールアップトリガー
CPU使用率 > 70% かつ タスクキュー > 10件
→ 即座に新しいワーカーを起動開始
2. スケールダウン判定
全ワーカーのCPU使用率 < 30% が 5分間継続
→ 段階的にワーカーを終了
3. 予測的スケーリング
過去のパターン分析 + 現在のワークロード傾向
→ 需要増加を事前に予測してスケールアップ
高スループット環境での活用
高負荷環境で強化オートスケールが活用されます:
{
"autoscale": {
"min_workers": 25,
"max_workers": 100,
"mode": "ENHANCED"
}
}
メリット
-
レイテンシ削減: トラフィック急増時の応答遅延を最小化
-
安定性向上: 大量データ処理時のクラスター安定性
-
コスト制御: 不要なリソースの迅速な解放
注意点と制限事項
1. 最小ワーカー数の考慮
"min_workers": 2,
"min_workers": 10,
"min_workers": 50
2. スケーリング間隔
-
クールダウン期間: スケールアップ後、一定期間は追加スケーリングを抑制
-
最大変化率: 一度に変更できるワーカー数に制限
3. コスト考慮
- ENHANCEDモードは頻繁なスケーリングによりコストが変動しやすい
- 予算制約がある場合は最大ワーカー数を適切に設定
最適化のベストプラクティス
1. ワークロード特性に応じた設定
ストリーミングワークロード
{
"min_workers": 5,
"max_workers": 50,
"mode": "ENHANCED"
}
バッチ処理ワークロード
{
"min_workers": 1,
"max_workers": 100,
"mode": "ENHANCED"
}
2. 監視項目
-
スケーリング頻度: 過度なスケーリングが発生していないか
-
リソース使用率: CPU、メモリ使用率の推移
-
コスト効率: 処理性能とコストのバランス
3. パフォーマンスチューニング
{
"configuration": {
"spark.sql.adaptive.enabled": "true",
"spark.sql.adaptive.coalescePartitions.enabled": "true",
"spark.databricks.cluster.profile": "serverless"
}
}
まとめ
強化オートスケールは、特に変動の激しいワークロードや高スループット要件がある環境で威力を発揮します。適切な設定により、パフォーマンスとコスト効率の両方を最適化できる強力な機能です。
ワークロードの特性を理解し、最小・最大ワーカー数を適切に設定することで、ENHANCEDモードの恩恵を最大限に活用できます。
Discussion