【Databricks】Delta Live Tables (DLT) 入門 - SQL編
はじめに
Delta Live Tables(DLT)は、Databricksが提供するデータパイプライン構築フレームワークです。従来のETLパイプラインでは、データの読み込み、変換、エラーハンドリング、依存関係管理などを全て手動でコーディングする必要がありました。
しかしDLTでは、「どうやって」データを処理するかではなく、「何を」実現したいかを記述するだけで、これらの複雑な処理が自動化されます。
私自身、業務でDLTを使い始めた時は「なぜテーブル作成にこんなに機能が?」と戸惑いましたが、一度慣れてしまえば記述するコード量もかなり少なく記述可能です。
この記事では、DLTの基本概念とSQLでの実装方法を、シンプルな例を使って解説します。
Delta Live Tablesとは
基本概念
DLTは、データ変換ロジックを記述するだけで、ストリーミングデータの処理、ワークロードに応じた自動スケーリング、組み込みの品質制御とエラーハンドリングを自動的に処理します。
データエンジニアは「データをどう変換したいか」に集中でき、「どのタイミングで実行するか」「エラーが起きたらどうするか」「どの順番で処理するか」といった運用面の課題から解放されます。
従来のアプローチとDLTの違いを見てみます。
従来のETL vs DLT
従来のETLアプローチ:
-- 手動でテーブル作成、エラーハンドリング、依存関係管理が必要
CREATE TABLE IF NOT EXISTS sales_clean AS
SELECT customer_id, product_id, amount, sale_date
FROM raw_sales
WHERE customer_id IS NOT NULL AND amount > 0;
-- ジョブの順序管理、エラー処理、スケジューリングは別途実装
DLTアプローチ:
-- 宣言的な定義のみ。依存関係、エラーハンドリング、品質チェックは自動
CREATE OR REFRESH LIVE TABLE sales_clean (
CONSTRAINT valid_customer EXPECT (customer_id IS NOT NULL) ON VIOLATION DROP ROW,
CONSTRAINT positive_amount EXPECT (amount > 0) ON VIOLATION DROP ROW
)
AS SELECT customer_id, product_id, amount, sale_date
FROM LIVE.raw_sales;
DLTの3つの主要な利点
1. 運用の自動化
従来のETLでは、テーブルAの処理が完了してからテーブルBを処理する、といった依存関係を手動で管理する必要がありました。また、処理が失敗した場合のリトライ処理や、データ量に応じたリソース調整も自前で実装が必要でした。
DLTでは、これらの運用上の複雑さが完全に自動化されます。テーブル間の依存関係は自動で検出され、適切な順序で処理が実行されます。
2. データ品質保証
従来のETLでは、データ品質チェックを別途実装し、問題があった場合の処理(ログ出力、アラート送信、データ除外など)も手動で作り込む必要がありました。
DLTでは、テーブル定義に品質ルールを宣言するだけで、自動的にチェックが実行され、違反データの処理も指定した方法で自動実行されます。
3. 可視性と監視
従来のETLでは、どのテーブルがどのテーブルに依存しているか、データ品質に問題がないか、処理時間はどの程度かかっているかなどを把握するために、別途監視システムの構築が必要でした。
DLTでは、データフローの可視化、品質メトリクスの追跡、エラーの詳細が組み込まれたUIで確認できます。
基本的なテーブルタイプ
DLTでは、用途に応じて3つのテーブルタイプを使い分けます。それぞれの特徴と使い分けを理解することが重要です。
1. Streaming Table(ストリーミングテーブル)
リアルタイムでデータが流れ込むような場合に使用します。ファイルが継続的にアップロードされる場合や、Kafkaなどからデータが送信される場合が典型的な使用ケースです。
CREATE OR REFRESH STREAMING TABLE raw_events
COMMENT "リアルタイムイベントデータ"
AS SELECT *
FROM STREAM(read_files('/path/to/events/', format => 'json'));
このテーブルは、新しいファイルが追加されるたびに自動的にデータを処理し、継続的にテーブルを更新します。
2. Live Table(ライブテーブル)
バッチ処理や集計処理に使用します。元データが更新されたときに再計算されるマテリアライズドビューのような動作をします。
CREATE OR REFRESH LIVE TABLE daily_sales
COMMENT "日次売上集計"
AS SELECT
sale_date,
SUM(amount) as total_sales,
COUNT(*) as transaction_count
FROM LIVE.raw_events
WHERE event_type = 'purchase'
GROUP BY sale_date;
このテーブルは、パイプラインが実行されるたびに元データから再計算され、最新の状態に更新されます。
3. View(ビュー)
パイプライン内でのみ使用される中間処理用のビューです。メタストアには保存されず、他のテーブルの計算に使用されます。
CREATE OR REFRESH TEMPORARY VIEW filtered_events
AS SELECT *
FROM LIVE.raw_events
WHERE event_type IN ('click', 'view', 'purchase');
このビューは、複雑な変換処理を段階的に分けたい場合や、複数のテーブルで共通の変換処理を使いたい場合に有効です。
データ品質管理(Expectations)
DLTの最大の特徴は、テーブル定義と同時にデータ品質ルールを宣言できることです。これにより、データの整合性を保ちながら、問題のあるデータへの対処も自動化されます。
Expectationsとは
Expectationsは、データが満たすべき条件を定義する機能です。SQLのCHECK制約に似ていますが、より柔軟で強力な機能を提供します。各レコードに対してチェックが実行され、違反した場合の動作を3パターンから選択できます。
基本的な品質チェック
CREATE OR REFRESH LIVE TABLE clean_customers (
-- 必須項目チェック:customer_idがNULLの場合は記録のみ
CONSTRAINT not_null_id EXPECT (customer_id IS NOT NULL),
-- 範囲チェック:年齢が0-120歳の範囲外でも記録のみ
CONSTRAINT valid_age EXPECT (age BETWEEN 0 AND 120),
-- フォーマットチェック:メールアドレスの形式をチェック
CONSTRAINT valid_email EXPECT (email RLIKE '^[^@]+@[^@]+\\.[^@]+$')
)
AS SELECT * FROM LIVE.raw_customers;
上記の例では、すべてのExpectationsは記録のみを行います。つまり、条件を満たさないデータがあってもテーブルには保存され、品質メトリクスとして違反件数が記録されます。
品質違反時の動作パターン
DLTでは、品質ルールに違反した場合の動作を3つのパターンから選択できます:
パターン1: EXPECT(記録のみ)
CONSTRAINT check_name EXPECT (condition)
動作: 条件を満たさないデータも保存するが、違反として記録される
使用場面: データの傾向を把握したい場合、軽微な品質問題の場合
パターン2: ON VIOLATION DROP ROW(除外)
CONSTRAINT check_name EXPECT (condition) ON VIOLATION DROP ROW
動作: 条件を満たさないレコードはテーブルに保存されない
使用場面: 不正なデータを確実に除外したい場合
パターン3: ON VIOLATION FAIL UPDATE(停止)
CONSTRAINT check_name EXPECT (condition) ON VIOLATION FAIL UPDATE
動作: 条件を満たさないデータがあると、パイプライン全体が停止する
使用場面: 絶対に満たされるべき重要な条件の場合
実際の業務では、この3つを組み合わせて段階的な品質制御を実装することが多いです。
実践例:売上データパイプライン
ここでは、典型的なMedallion Architecture(Bronze→Silver→Gold)を使って、売上データを処理するパイプラインを作成してみましょう。この手法は、生データから段階的にデータを加工・精製していく一般的なパターンです。
Bronze Layer(生データ層)
Bronzeレイヤーでは、外部システムからの生データをそのまま取り込みます。最小限の変換のみを行い、元データの形を保持することが重要です。
-- ファイルからの継続的取り込み
CREATE OR REFRESH STREAMING TABLE bronze_sales
COMMENT "S3からの生の売上データ"
AS SELECT
*,
CURRENT_TIMESTAMP() as ingestion_time,
_metadata.file_path as source_file
FROM STREAM(read_files(
'/mnt/raw/sales/',
format => 'json'
));
このテーブルでは、元のJSONデータに加えて、データ取り込み時刻とソースファイルのパスを追加しています。これにより、後でデータの出所や取り込みタイミングを追跡できるようになります。
Silver Layer(クリーニング層)
Silverレイヤーでは、データ品質チェックを行い、業務で使えるレベルまでデータをクリーニングします。ここで不正なデータを除外し、データ型の統一を行います。
CREATE OR REFRESH LIVE TABLE silver_sales (
-- 必須フィールドのチェック(これらがないとビジネス処理ができない)
CONSTRAINT valid_transaction EXPECT (transaction_id IS NOT NULL) ON VIOLATION DROP ROW,
CONSTRAINT valid_customer EXPECT (customer_id IS NOT NULL) ON VIOLATION DROP ROW,
CONSTRAINT valid_date EXPECT (sale_date IS NOT NULL) ON VIOLATION DROP ROW,
-- ビジネスルールのチェック(売上金額は正の値でなければならない)
CONSTRAINT positive_amount EXPECT (amount > 0) ON VIOLATION DROP ROW,
-- 異常値の検出(警告のみ、データは保持)
CONSTRAINT reasonable_amount EXPECT (amount <= 100000) -- 10万円を超える取引は要確認
)
COMMENT "クリーニング済み売上データ"
AS SELECT
transaction_id,
customer_id,
product_id,
CAST(amount AS DECIMAL(10,2)) as amount, -- 金額の精度を統一
CAST(sale_date AS DATE) as sale_date, -- 日付型に統一
ingestion_time
FROM LIVE.bronze_sales;
ここでのポイントは、致命的な問題(必須項目の欠損、負の金額)は除外し、軽微な問題(異常に高い金額)は警告として記録することです。
Gold Layer(ビジネス層)
Goldレイヤーでは、ビジネス用途に最適化された集計やマート化されたデータを作成します。レポートやダッシュボード、機械学習で直接使用できる形にします。
CREATE OR REFRESH LIVE TABLE gold_daily_summary
COMMENT "日次売上サマリー(レポート用)"
AS SELECT
sale_date,
COUNT(*) as transaction_count,
SUM(amount) as total_sales,
AVG(amount) as avg_transaction_value,
COUNT(DISTINCT customer_id) as unique_customers
FROM LIVE.silver_sales
GROUP BY sale_date;
CREATE OR REFRESH LIVE TABLE gold_customer_metrics
COMMENT "顧客別メトリクス(CRM用)"
AS SELECT
customer_id,
COUNT(*) as total_purchases,
SUM(amount) as total_spent,
AVG(amount) as avg_purchase_amount,
MAX(sale_date) as last_purchase_date,
-- 顧客セグメント分類
CASE
WHEN SUM(amount) >= 10000 THEN 'VIP'
WHEN SUM(amount) >= 1000 THEN 'Premium'
ELSE 'Standard'
END as customer_tier
FROM LIVE.silver_sales
GROUP BY customer_id;
この構成により、生データから段階的に価値のあるビジネス情報に変換していくことができます。
Pipeline設定と実行
DLTパイプラインを実際に動かすまでの手順を説明します。Databricksの画面操作と設定のポイントを押さえておきましょう。
1. Notebookの作成
まず、上記のSQLコードを含むnotebookを作成します。ファイル名は分かりやすく「sales_pipeline.sql」のようにしておくと良いでしょう。
2. Pipeline設定
Databricksの左サイドバーから「Delta Live Tables」を選択し、「Create Pipeline」をクリックします。
主要な設定項目:
-
Pipeline name:
sales_pipeline
(分かりやすい名前) - Source code: 作成したnotebook を指定
-
Target database:
sales_db
(データが格納されるデータベース名) -
Storage location:
/mnt/pipelines/sales/
(パイプラインの内部データ保存場所)
3. 実行モードの選択
DLTには2つの実行モードがあります:
Development Mode(開発モード)
- 用途: 開発・テスト段階
- 特徴: クラスターが起動状態を維持し、素早い反復開発が可能
- コスト: 高め(クラスターが常時稼働)
- 使用場面: コードの修正とテストを繰り返す開発フェーズ
Production Mode(本番モード)
- 用途: 本番運用
- 特徴: 処理が完了するとクラスターが自動停止
- コスト: 低め(必要な時のみクラスターが稼働)
- 使用場面: 定期実行やトリガー実行での本番運用
開発時はDevelopment Modeで素早く試行錯誤し、完成したらProduction Modeに切り替えるのが一般的です。
4. 実行とモニタリング
パイプラインを作成後、「Start」ボタンで実行開始します。実行中は以下の情報をリアルタイムで確認できます:
- データフロー図: テーブル間の依存関係の可視化
- 処理状況: 各テーブルの処理進捗
- 品質メトリクス: Expectationsの結果
- エラー情報: 問題が発生した場合の詳細
よくある問題と解決法
DLTを業務で使用する際に遭遇しやすい問題と、その具体的な解決方法を紹介します。
1. スキーマ変更エラー
問題: 既存テーブルにカラムを追加しようとした際に「スキーマの不一致」エラーが発生する
エラー例:
AnalysisException: A schema mismatch detected when writing to the Delta table
解決法: 明示的なスキーマ定義で段階的に対応
-- 解決法:明示的なスキーマ定義
CREATE OR REFRESH LIVE TABLE flexible_table (
id BIGINT,
name STRING,
amount DECIMAL(10,2),
new_field STRING -- 新しいカラムを追加
)
AS SELECT
id,
name,
amount,
CAST(NULL AS STRING) as new_field -- 既存データ用にNULLを設定
FROM LIVE.source_table;
ポイント: 新しいカラムには明示的にNULL値を設定し、段階的にデータを移行することで安全にスキーマ変更ができます。
2. 品質チェック失敗によるパイプライン停止
問題: ExpectationsでON VIOLATION FAIL UPDATE
を設定したが、データ品質の問題でパイプラインが頻繁に停止する
解決法: 段階的なアプローチで品質制御を実装
-- Step 1: まず警告レベルで問題を把握
CREATE OR REFRESH LIVE TABLE step1_warnings (
CONSTRAINT warn_high_amount EXPECT (amount <= 10000), -- 警告のみ
CONSTRAINT warn_future_date EXPECT (sale_date <= CURRENT_DATE()) -- 警告のみ
)
AS SELECT * FROM LIVE.raw_data;
-- Step 2: 明らかに不正なデータを除外
CREATE OR REFRESH LIVE TABLE step2_cleaned (
CONSTRAINT drop_negative EXPECT (amount > 0) ON VIOLATION DROP ROW, -- 除外
CONSTRAINT drop_null_id EXPECT (customer_id IS NOT NULL) ON VIOLATION DROP ROW -- 除外
)
AS SELECT * FROM LIVE.step1_warnings;
-- Step 3: 最重要な制約のみ停止条件とする
CREATE OR REFRESH LIVE TABLE step3_final (
CONSTRAINT critical_data_exists EXPECT (
(SELECT COUNT(*) FROM LIVE.step2_cleaned) > 0
) ON VIOLATION FAIL UPDATE -- 全データが失われた場合のみ停止
)
AS SELECT * FROM LIVE.step2_cleaned;
ポイント: 最初は全て警告で運用し、データの傾向を把握してから段階的に制約を厳しくしていきます。
3. パフォーマンス問題
問題: 大量データの処理で時間がかかりすぎる、またはメモリ不足エラーが発生する
解決法: パーティション設定と最適化オプションの活用
-- パーティション設定で検索性能を向上
CREATE OR REFRESH LIVE TABLE partitioned_sales
USING DELTA
PARTITIONED BY (sale_date) -- 日付でパーティション分割
TBLPROPERTIES (
'delta.autoOptimize.optimizeWrite' = 'true', -- 書き込み最適化
'delta.autoOptimize.autoCompact' = 'true' -- 自動コンパクション
)
AS SELECT * FROM LIVE.silver_sales;
-- 大きなテーブルの場合はクラスタリングも併用
CREATE OR REFRESH LIVE TABLE clustered_customers
USING DELTA
CLUSTER BY (customer_id) -- 顧客IDでクラスタリング
AS SELECT * FROM LIVE.silver_customers;
ポイント: よくクエリされるカラム(日付、ID等)でパーティションやクラスタリングを設定し、自動最適化を有効にすることで性能問題を軽減できます。
監視とデバッグ
DLTパイプラインを本番運用する際は、継続的な監視とトラブルシューティングが重要です。DLTには強力な監視機能が組み込まれています。
Event Logの活用
DLTは全ての処理履歴を「Event Log」として記録します。これにより、データ品質の推移や処理時間の変化を追跡できます。
-- データ品質メトリクスの確認
SELECT
details:flow_definition.output_dataset as table_name,
details:flow_progress.data_quality.expectations as quality_results,
timestamp,
event_type
FROM event_log()
WHERE event_type = 'flow_progress'
AND details:flow_definition.output_dataset = 'sales_clean'
ORDER BY timestamp DESC;
活用例:
- どのテーブルでExpectations違反が多いかを特定
- 処理時間の傾向分析
- エラーの発生パターンの把握
品質ダッシュボード
日常的な監視用に、データ品質の状況を可視化するテーブルを作成できます。
CREATE OR REFRESH LIVE TABLE quality_monitoring
COMMENT "日次データ品質監視レポート"
AS SELECT
CURRENT_DATE() as check_date,
'sales' as table_name,
COUNT(*) as total_records,
-- 品質問題の集計
COUNT(CASE WHEN amount IS NULL THEN 1 END) as null_amounts,
COUNT(CASE WHEN amount <= 0 THEN 1 END) as negative_amounts,
COUNT(CASE WHEN customer_id IS NULL THEN 1 END) as null_customer_ids,
-- 品質スコアの計算
ROUND(
(COUNT(*) - COUNT(CASE WHEN amount IS NULL OR amount <= 0 OR customer_id IS NULL THEN 1 END))
* 100.0 / COUNT(*),
2
) as quality_score_percentage,
-- 統計情報
AVG(amount) as avg_amount,
MIN(ingestion_time) as earliest_ingestion,
MAX(ingestion_time) as latest_ingestion
FROM LIVE.silver_sales
WHERE DATE(ingestion_time) = CURRENT_DATE();
このテーブルをBI ツールで可視化することで、データ品質の日常監視が可能になります。
まとめ
DLTは以下の点で従来のETLを大幅に改善し、データエンジニアリングの生産性を向上させます:
🚀 開発効率の向上
- 宣言的な記述: 「何をしたいか」だけを記述すれば良い
- 依存関係の自動管理: テーブル間の処理順序を自動で解決
- 組み込みのエラーハンドリング: 失敗時の処理やリトライが自動化
データエンジニアは複雑な制御ロジックを書く必要がなく、ビジネスロジックに集中できます。
🛡️ データ品質の自動保証
- Expectationsによる自動チェック: テーブル定義と同時に品質ルールも定義
- 段階的な品質制御: 警告→除外→停止の3段階で柔軟に対応
- リアルタイムでの品質監視: Event Logによる継続的な品質追跡
従来は別途実装が必要だったデータ品質管理が、テーブル定義に含めて宣言できるようになります。
📊 運用性の向上
- 自動スケーリング: データ量に応じた適切なリソース配分
- 包括的な監視機能: データフロー、品質メトリクス、処理時間の可視化
- 簡単なデプロイメント: Notebook作成→Pipeline設定→実行の3ステップ
導入時の推奨アプローチ
- 小さなパイプラインから始める: 1-2テーブルの簡単な変換から開始
-
品質チェックは段階的に追加: 最初は
EXPECT
のみ、徐々にDROP ROW
やFAIL UPDATE
を追加 - Event Logで継続的に監視: 品質メトリクスと処理時間を定期的にチェック
- 開発→本番の環境分離: Development Modeで開発、Production Modeで運用
DLTは確かに最初の学習コストはありますが、習得すれば従来のETL開発と比べて大幅な生産性向上が期待できます。特に、データ品質管理の自動化により、データエンジニアはより戦略的で価値の高い作業に時間を割けるようになります。
Discussion