Databricks Certified Data Engineer Associateチートシート
Databricks Certified Data Engineer Associate チートシート
このチートシートは、Databricks Certified Data Engineer Associate の準備に役立つ主要概念と技術をまとめたものです。
1. 基本概念
データレイクハウス (Data Lakehouse)
- データレイクとデータウェアハウスの長所を組み合わせた統合分析プラットフォーム
- データレイクの利点: オープン性、柔軟性、ML サポート
- データウェアハウスの利点: 信頼性、強力なガバナンス、パフォーマンス
- 単一プラットフォームでデータエンジニアリング、分析、AI ワークロードをサポート
Databricks アーキテクチャ
- クラウドサービス層: マルチクラウド対応 (Azure, AWS, GCP)
- ランタイム層 (Databricks Runtime): Apache Spark, Delta Lake, システムライブラリ等のコアコンポーネント
- ワークスペース層 (Workspace): データエンジニアリング、分析、AI ワークロードを対話的に実装・実行するための UI
2. デプロイメントモデル
Databricksのアーキテクチャは、大きく分けてコントロールプレーン (Control Plane) と データプレーン (Data Plane) に分かれます。
コントロールプレーン (Control Plane)
- 場所: Databricks自身が管理するインフラストラクチャ上で動作します。
-
主なコンポーネント:
- Databricks Web アプリケーション (Web UI)
- ノートブック (Notebooks): 定義やメタデータ
- リポジトリ (Repos): メタデータや管理
- ワークフロー (Workflows): ジョブのスケジューリングや定義
- クラスター管理 (Cluster Management): クラスターの起動、監視、管理API
- その他、ワークスペース管理、メタストアなどのバックエンドサービス
データプレーン (Data Plane)
- 場所: 顧客のクラウドアカウント (Customer's Cloud Account: AWS, Azure, GCP) 内で動作します。
-
主なコンポーネント:
-
クラスター (Cluster virtual machines):
- 実際にデータ処理を行うコンピューティングリソース (VM)。ドライバーノードとワーカーノードが含まれます。
- 顧客のアカウント内で起動され、管理されます。
-
(オプション) 顧客管理のデータストレージ:
- DBFSのルートストレージや、外部ロケーションとして接続されたクラウドストレージ (S3, ADLS Gen2, GCS)。
- 処理対象のデータやDelta Lakeテーブル、ライブラリ、設定ファイルなどが格納されます。
-
クラスター (Cluster virtual machines):
- 重要: 実際のデータ処理 (コンピューティング) と主要なデータの保管場所は、常に顧客のクラウド環境内にあります。これにより、データのセキュリティとコントロールを顧客が維持できます。
3. コア技術と機能
Apache Spark
- Databricks の基盤。Spark 開発者によって設立
- 分散インメモリ処理
- バッチ処理とストリーム処理をサポート
- DataFrame API と SQL インターフェースを提供
対応言語
- Scala, Python, SQL, R, Java
対応データ
- 構造化、半構造化、非構造化 (画像、動画など)
DBFS (Databricks File System)
- 分散ファイルシステムの抽象化レイヤー
- 実データはデータプレーン内の基盤となるクラウドストレージ (Azure Blob/ADLS Gen2, S3 等) に永続化
- クラスターが終了してもデータは安全に保持
- クラスター作成時に自動的にマウントされる
4. クラスター (Compute)
定義
- 複数のノード (VM) が単一エンティティとして動作するセット
- ドライバーノード (Driver Node): マスターノード。ワーカーノードの調整とタスクの並列実行を管理
- ワーカーノード (Worker Node): 実際の処理を実行するノード
主な設定項目
- クラスター名: 任意の名前
-
ポリシー (Policy): 設定可能な範囲を制限 (例:
Unrestricted
) -
クラスターモード:
-
Multi Node
: ドライバー + 複数のワーカー -
Single Node
: ワーカーなし。テストや小規模開発向き
-
-
アクセスモード (Access Mode):
-
Single User
: 単一ユーザー専用。全言語サポート -
Shared
: 複数ユーザーで共有。SQL と Python のみサポート (Unity Catalog 対応)
-
- Databricks Runtime バージョン: Spark, Scala 等のライブラリバージョンを含むイメージ
- Photon アクセラレーション: Spark パフォーマンスを向上させる C++ 製ベクトル化クエリエンジン (任意)
- ワーカータイプ/ドライバータイプ: VM サイズ (コア数、メモリ等)
-
ワーカー数:
-
Enable autoscaling
: ワーカー数の最小・最大範囲を指定し自動増減 -
Disable autoscaling
: 固定数のワーカーを指定
-
- 自動終了 (Auto Termination): 指定時間アイドル状態が続くとクラスターを自動停止
クラスターの用途と種類
Databricks で利用するクラスターは、主にその用途によって以下の種類に分けられます。
-
ジョブクラスター (Job Cluster):
- 用途: Databricks ジョブの実行専用に作成されるクラスター。
- ライフサイクル: ジョブが開始されると自動的に起動し、ジョブが完了すると**自動的に終了 (Terminate)**します。
- 利点: この一時的な性質により、リソースが不要なときにコストが発生するのを防ぎます。そのため、本番環境の自動化されたスケジュールジョブには最もコスト効率が良く、強く推奨される選択肢です。
- 設定: Databricks Jobs の設定画面内で、ジョブ実行用の新しいクラスターとして定義します。
-
汎用クラスター (All-Purpose Cluster):
- 用途: インタラクティブな分析、データ探索、共同開発に使用されるクラスター。ノートブックからのアドホックなコード実行などに適しています。
- ライフサイクル: ユーザーが手動で起動し、手動で終了するか、設定された自動終了時間 (Auto Termination) に達するまで稼働し続けます。
- 利点: 複数のユーザーや複数のタスク (ノートブックセッションなど) で共有できます。デバッグや開発サイクルに適しています。
- 欠点: 常に稼働している場合や、アイドル時間が長い場合、ジョブクラスターよりもコストが高くなる可能性があります。本番の自動化ジョブには通常推奨されません。
- 設定: Compute UI から直接作成・管理します。
-
その他の用語:
- 本番クラスター (Production Cluster): これは Databricks の正式なクラスタータイプ名ではありません。本番環境のワークロード(通常はジョブ)を実行するために設定・使用されるクラスター(通常はジョブクラスター)を指す一般的な用語です。
- オンプレミスクラスター (On-premises Cluster): Databricks は基本的にクラウドベースのプラットフォームであり、標準的な構成ではオンプレミスでクラスターをホストしません。特定のハイブリッド構成は可能ですが、一般的な選択肢ではありません。
- サーバーレスコンピュート (Serverless Compute): 主に Databricks SQL Warehouse で利用可能で、クラスターの管理(起動、スケーリング、パッチ適用など)を Databricks が完全に管理するモデルです。一部のジョブタスク(特に SQL タスク)でも利用可能になる場合がありますが、ノートブックや Python スクリプトなど、より汎用的な本番ジョブにおいては、依然としてコスト効率とリソースの専用性からジョブクラスターが推奨されることが多いです。
DBU (Databricks Unit)
- 1 時間あたりの処理能力の単位
- VM 構成と実行時間に基づいて課金
5. ノートブック (Notebooks)
基本操作
-
作成:
Workspace
>Create
>Notebook
。名前、デフォルト言語 (Python, SQL, Scala, R) を選択 - クラスターへのアタッチ: コード実行前に、実行中のクラスターにアタッチする必要がある
- セル実行: セル単位で実行 (▶ ボタン or Shift + Enter)
言語切り替え
- ノートブック全体のデフォルト言語は変更可能
- セル単位で言語マジックコマンド (
%python
,%sql
,%scala
,%r
) を使用して切り替え可能
マジックコマンド (%)
-
%md
: Markdown セル -
%run
: 別のノートブックを実行 -
%fs
: ファイルシステム操作を実行 (例:%fs ls /databricks-datasets
)
dbutils (Databricks Utilities)
- 環境との対話や設定を行うためのユーティリティ群
-
dbutils.fs
: ファイルシステム操作 -
dbutils.secrets
: シークレット管理 -
dbutils.widgets
: パラメータ化されたノートブック作成
display() 関数
-
dbutils.fs.ls()
の結果や DataFrame を整形されたテーブル形式で表示 (最大 1000 行) - CSV ダウンロードや簡易グラフ描画機能も提供
DataFrame 操作の基本 (Python)
-
テーブルからの読み込み (
spark.table
): 既存のテーブル (Hive Metastore または Unity Catalog で管理) を DataFrame として読み込みます。# テーブル名を指定して読み込む (3レベルネームスペースを推奨) df_table = spark.table("my_catalog.my_schema.my_table") display(df_table)
-
SQL クエリの実行 (
spark.sql
): SQL クエリを実行し、その結果を DataFrame として取得します。# SQL クエリを実行して DataFrame を作成 query = "SELECT name, age FROM my_catalog.my_schema.users WHERE age > 30 ORDER BY age DESC" df_sql = spark.sql(query) display(df_sql)
6. Databricks Repos
目的
- Git プロバイダー (GitHub, Azure DevOps, GitLab, Bitbucket 等) との連携による、プロジェクトのソースコード管理 (ノートブック、スクリプト、設定ファイルなど)
設定 (Git Integration)
-
User Settings
>Git Integration
タブから Git プロバイダーを選択し設定
基本操作
- リポジトリのクローン (Clone)
- ブランチ作成・切り替え
- コミット & プッシュ
- プル
- マージ (通常 Git プロバイダー側でプルリクエストを作成・マージ)
7. Delta Lake
定義
- データレイクに信頼性 (ACID トランザクションなど) をもたらすオープンソースのストレージレイヤー
基盤技術
- データファイルは Parquet 形式で保存 (デフォルト)
- トランザクションログ (Transaction Log / Delta Log):
- テーブルに対する全ての変更操作 (トランザクション) の記録
- テーブルディレクトリ内の
_delta_log
ディレクトリに JSON ファイル (およびチェックポイント Parquet ファイル) として保存 - Single Source of Truth (信頼できる唯一の情報源) として機能
主な利点
- ACID トランザクション: データレイク上での一貫性のある操作を保証
- スケーラブルなメタデータ処理: 大量のメタデータを効率的に処理
- タイムトラベル (Time Travel): 過去のバージョンのデータをクエリ可能
- スキーマエンフォースメント & エボリューション: スキーマの変更を管理
- 更新/削除/マージ (Upsert/Merge): UPDATE, DELETE, MERGE INTO 操作をサポート
- バッチ/ストリーミング統合: 同じテーブルに対して両処理をシームレスに実行可能
- 監査履歴 (Audit History):
DESCRIBE HISTORY
でテーブルへの変更履歴を確認可能
8. Delta Lake テーブル操作 (SQL)
テーブル作成
CREATE TABLE [IF NOT EXISTS] <table_name> (
col_name1 col_type1,
col_name2 col_type2,
...
)
[USING DELTA] -- デフォルトだが明示的に指定可能
[PARTITIONED BY (col_name, ...)]
[LOCATION '<path>'] -- External Table を作成する場合
[COMMENT '<table_comment>']
[TBLPROPERTIES ('<key>'='<value>', ...)]; -- delta.autoOptimize.optimizeWrite = true など
データ操作
-- 挿入 (Insert)
INSERT INTO <table_name> VALUES (value1, value2, ...);
INSERT INTO <table_name> SELECT ... FROM <source>;
-- 上書き (Overwrite)
-- テーブル全体の上書き
INSERT OVERWRITE <table_name> SELECT ... FROM <source>;
-- 特定のパーティションの上書き (静的パーティション上書き)
INSERT OVERWRITE <table_name> PARTITION (partition_col='value') SELECT ... FROM <source> WHERE partition_col='value';
-- 動的パーティション上書き (Dynamic Partition Overwrite - 推奨)
-- 事前に SET spark.sql.sources.partitionOverwriteMode=dynamic; を実行
INSERT OVERWRITE <table_name> SELECT ... FROM <source_containing_partition_columns>;
-- 更新 (Update)
UPDATE <table_name> SET col_name1 = value1 WHERE <condition>;
-- 削除 (Delete)
DELETE FROM <table_name> WHERE <condition>;
-- マージ (Merge / Upsert)
MERGE INTO <target_table> AS target
USING <source_table_or_query> AS source
ON <merge_condition> -- 例: target.id = source.id
WHEN MATCHED [AND <condition>] THEN UPDATE SET col1 = source.col1, ... -- 一致した場合の更新
WHEN MATCHED [AND <condition>] THEN DELETE -- 一致した場合の削除
WHEN NOT MATCHED [AND <condition>] THEN INSERT (col1, ...) VALUES (source.col1, ...); -- 一致しなかった場合の挿入
Delta Lake での Overwrite と履歴:
INSERT OVERWRITE
はテーブル(または指定されたパーティション)の既存データを新しいデータセットで論理的に置き換えます。しかし、Delta Lake のトランザクションログにより、この操作は新しいバージョンとして記録されます。物理的なデータファイルはすぐには削除されません(VACUUM
されるまで保持されます)。
そのため、INSERT OVERWRITE
後でも:
-
Time Travel (タイムトラベル):
VERSION AS OF
やTIMESTAMP AS OF
を使って、上書き前のテーブル状態にアクセスできます。 -
Audit History (監査履歴):
DESCRIBE HISTORY <table_name>
を実行すると、INSERT OVERWRITE
操作を含むテーブルへの変更履歴を確認できます。
メタデータ確認
-
DESCRIBE DETAIL <table_name>
: テーブルの場所、フォーマット、サイズ、現在のバージョンに含まれるファイル数などを表示 -
DESCRIBE HISTORY <table_name>
: テーブルのバージョン履歴(操作内容、タイムスタンプ、バージョン番号、実行ユーザーなど)を表示
9. Delta Lake の高度な機能
タイムトラベル (Time Travel)
-- バージョン番号で指定: 指定したバージョン番号時点のテーブルデータを読み取る
SELECT * FROM <table_name> VERSION AS OF <version_number>;
-- 短縮構文: 上記と同じ
SELECT * FROM <table_name>@v<version_number>;
-- タイムスタンプで指定: 指定したタイムスタンプ以前の最新のテーブルデータを読み取る
SELECT * FROM <table_name> TIMESTAMP AS OF '<timestamp_string>'; -- 例: '2023-10-26 10:00:00'
# DataFrame API (Python)
df = spark.read.option("versionAsOf", <version_number>).table("<table_name>")
df = spark.read.option("timestampAsOf", "<timestamp_string>").table("<table_name>")
テーブルリストア (Rollback)
-- バージョン番号で指定: テーブルを指定したバージョン番号の状態に戻す (新しいバージョンが作成される)
RESTORE TABLE <table_name> TO VERSION AS OF <version_number>;
-- タイムスタンプで指定: テーブルを指定したタイムスタンプ以前の最新の状態に戻す (新しいバージョンが作成される)
RESTORE TABLE <table_name> TO TIMESTAMP AS OF '<timestamp_string>';
最適化 (OPTIMIZE)
-- ファイル圧縮 (Compaction): テーブル内の小さなファイルを結合して大きなファイルにし、クエリパフォーマンスを向上させる
OPTIMIZE <table_name> [WHERE <partition_filter>]
-- Z-Ordering: 指定した列に基づいてデータの物理的な配置を最適化し、フィルタリング時のデータスキップを効率化する (カラムナストレージの利点を活かす)。高カーディナリティ列に有効。
OPTIMIZE <table_name> [WHERE <partition_filter>] ZORDER BY (col1, col2, ...)
バキューム (VACUUM)
-- 古い未使用ファイルの削除: テーブルのトランザクションログによって参照されなくなったデータファイル(未使用ファイル)のうち、指定された保持期間 (retention threshold) よりも古いものを物理的に削除します。
VACUUM <table_name> [RETAIN <N> HOURS] -- RETAIN句で保持期間を変更可能 (デフォルト7日 = 168時間)
-- ドライラン: 削除対象のファイルリストを確認 (実際の削除は行わない)
VACUUM <table_name> DRY RUN
デフォルト動作と保持期間 (Retention Threshold): デフォルトでは、VACUUM
は7日間 (168時間) よりも古い未使用ファイルのみを削除します。この保持期間は、以下のような理由で重要な安全機能として機能します。
- 長時間クエリの保護: 実行中の長時間クエリが、削除対象となりうる古いバージョンのファイルを参照している可能性があります。保持期間があることで、これらのクエリが完了するまでファイルが保護されます。
-
タイムトラベルの保証: ユーザーが
TIMESTAMP AS OF
やVERSION AS OF
を使用して過去のテーブル状態にアクセスできるように、一定期間データファイルが保持されます。
ファイルの削除基準:
- 削除されるファイル: トランザクションログから参照されなくなり(未使用)、かつ設定された保持期間よりも古いファイル。
- 削除されないファイル: トランザクションログから参照されなくなった(未使用)が、保持期間よりも新しいファイル。これらは誤削除を防ぐために保持されます。
注意: VACUUM
の保持期間 (RETAIN N HOURS
) は、タイムトラベルでアクセスしたい期間や、最も長く実行される可能性のあるクエリの時間を考慮して設定する必要があります。デフォルト(7日)より短く設定すると、意図しないデータ損失やクエリエラーのリスクがあります (デフォルトでは、安全のため0時間保持は許可されていません)。
10. リレーショナルエンティティ
データベース (Database) / スキーマ (Schema)
- Databricks では同義 (
CREATE DATABASE
=CREATE SCHEMA
) - テーブルやビューなどのオブジェクトの論理的なグループ化
- メタストア (Hive Metastore または Unity Catalog Metastore) 内に定義が保存
-
LOCATION
句を指定せずにCREATE DATABASE
(またはCREATE SCHEMA
) を実行した場合、そのデータベース (スキーマ) はデフォルトの場所に紐づけられます。Hive Metastore の場合は通常dbfs:/user/hive/warehouse/<database_name>.db
です。Unity Catalog の場合は、カタログに設定されたデフォルトのマネージドストレージロケーション内になります。
テーブルの種類
-
Managed テーブル:
- データとメタデータの両方をメタストアが管理する場所に格納(デフォルトはDBFS内の特定のパス、またはUCのマネージドストレージロケーション)。
-
DROP TABLE
を実行すると、メタデータとデータファイルの両方が削除される。
-
External テーブル (Unmanaged テーブル):
- データは
LOCATION
句で指定された外部パス(例: S3, ADLS Gen2)に格納。メタデータのみメタストアで管理。 -
DROP TABLE
を実行すると、メタデータのみが削除され、データファイルは指定された場所に残る。
- データは
メタデータ確認コマンド (例)
データベース、テーブル、ビューなどのメタデータを確認するための基本的な SQL コマンドです。
-- データベース/スキーマ情報の表示:
-- 基本情報 (コメント、場所など)
DESCRIBE DATABASE sales_db;
-- または (スキーマはデータベースと同義)
-- DESCRIBE SCHEMA sales_db;
実行結果サンプル (DESCRIBE DATABASE sales_db;
):
+-------------------------+----------------------------------------------------------+---------+
| database_description_item| database_description_value | comment |
+-------------------------+----------------------------------------------------------+---------+
| Database Name | sales_db | |
| Description | Sales department operational data | |
| Location | dbfs:/user/hive/warehouse/sales_db.db | |
| Owner | <owner_principal> | |
+-------------------------+----------------------------------------------------------+---------+
(注: Unity Catalog 環境の場合、Location
は通常 Unity Catalog のマネージドストレージパスになります)
-- 詳細情報 (プロパティなどを含む)
-- ユーザー指定の通り、データベース/スキーマに対する正しい構文
DESCRIBE DATABASE EXTENDED sales_db;
-- または (スキーマはデータベースと同義)
-- DESCRIBE SCHEMA EXTENDED sales_db;
実行結果サンプル (DESCRIBE DATABASE EXTENDED sales_db;
):
+-------------------------+----------------------------------------------------------+---------+
| database_description_item| database_description_value | comment |
+-------------------------+----------------------------------------------------------+---------+
| Database Name | sales_db | |
| Description | Sales department operational data | |
| Location | dbfs:/user/hive/warehouse/sales_db.db | |
| Owner | <owner_principal> | |
| Properties | ((key1=value1), (key2=value2)) | |
+-------------------------+----------------------------------------------------------+---------+
-- テーブル/ビュー情報の表示:
-- 基本的な列情報 (列名、データ型、コメント)
DESCRIBE orders;
実行結果サンプル (DESCRIBE orders;
):
+------------------+-----------+---------------------+
| col_name | data_type | comment |
+------------------+-----------+---------------------+
| order_id | bigint | Unique order ID |
| customer_id | string | Customer identifier |
| order_timestamp | timestamp | Time of the order |
| total_amount | double | Total order amount |
| order_status | string | Status of the order |
+------------------+-----------+---------------------+
-- 詳細情報 (列情報に加え、テーブルプロパティ、場所、オーナー、作成時間など)
-- こちらはテーブルやビューに対する構文
DESCRIBE EXTENDED orders;
実行結果サンプル (DESCRIBE EXTENDED orders;
):
+----------------------------+----------------------------------------------------------+-----------------------+
| col_name | data_type | comment |
+----------------------------+----------------------------------------------------------+-----------------------+
| order_id | bigint | Unique order ID |
| customer_id | string | Customer identifier |
| order_timestamp | timestamp | Time of the order |
| total_amount | double | Total order amount |
| order_status | string | Status of the order |
| | | |
| # Detailed Table Information | | |
| Database | sales_db | |
| Table | orders | |
| Owner | <owner_principal> | |
| Created Time | Mon Apr 21 10:15:30 UTC 2025 | |
| Last Access | UNKNOWN | |
| Created By | Databricks Runtime Version X.Y | |
| Type | MANAGED | |
| Provider | delta | |
| Table Properties | [delta.minReaderVersion=1,delta.minWriterVersion=2] | |
| Location | dbfs:/user/hive/warehouse/sales_db.db/orders | |
| Serde Library | org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe | |
| InputFormat | org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat | |
| OutputFormat | org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat | |
+----------------------------+----------------------------------------------------------+-----------------------+
-- ビューの基本情報 (列名、データ型、コメント)
DESCRIBE high_value_orders_view;
実行結果サンプル (DESCRIBE high_value_orders_view;
):
+------------------+-----------+---------------------+
| col_name | data_type | comment |
+------------------+-----------+---------------------+
| order_id | bigint | Unique order ID |
| customer_id | string | Customer identifier |
| total_amount | double | Total order amount |
+------------------+-----------+---------------------+
-- ビューの詳細情報 (列情報に加え、ビュー定義など)
DESCRIBE EXTENDED high_value_orders_view;
実行結果サンプル (DESCRIBE EXTENDED high_value_orders_view;
):
+----------------------------+----------------------------------------------------------+---------------------+
| col_name | data_type | comment |
+----------------------------+----------------------------------------------------------+---------------------+
| order_id | bigint | Unique order ID |
| customer_id | string | Customer identifier |
| total_amount | double | Total order amount |
| | | |
| # Detailed View Information | | |
| View Catalog | hive_metastore | |
| View Database | sales_db | |
| View Name | high_value_orders_view | |
| View Text | SELECT order_id, customer_id, total_amount FROM orders WHERE total_amount > 1000 | |
| View Original Text | SELECT order_id, customer_id, total_amount FROM orders WHERE total_amount > 1000 | |
| Owner | <owner_principal> | |
| Table Properties | [] | |
+----------------------------+----------------------------------------------------------+---------------------+
-- Delta テーブル固有のメタデータ確認 (セクション 8, 9 で詳述)
-- DESCRIBE DETAIL <delta_table_name>;
-- DESCRIBE HISTORY <delta_table_name>;
11. テーブル作成とビュー
CTAS (Create Table As Select)
CREATE [OR REPLACE] TABLE <table_name>
[COMMENT '<comment>']
[PARTITIONED BY (col_name, ...)]
[LOCATION '<path>'] -- External Table を作成する場合
AS SELECT ... FROM <source_table> WHERE ...;
テーブルのクローン (Clone)
Delta テーブルの特定バージョンのコピーを作成します。メタデータとデータのコピー方法が異なります。
-- ディープクローン (Deep Clone): メタデータとデータファイルの両方を完全にコピー。ソーステーブルとは独立。
CREATE TABLE <target_table_name> DEEP CLONE <source_table_name> [VERSION AS OF <version>] [LOCATION '<path>']
-- シャロークローン (Shallow Clone): メタデータのみをコピーし、データファイルはソーステーブルを参照。高速だがソーステーブルのVACUUMに影響される。
CREATE TABLE <target_table_name> SHALLOW CLONE <source_table_name> [VERSION AS OF <version>] [LOCATION '<path>']
ビュー (Views)
保存されたクエリ。実行時に基となるテーブルからデータを取得します。データ自体は保持しません。
-
標準ビュー (View):
CREATE VIEW <view_name> AS SELECT ...
- メタストアに永続化され、権限があれば複数セッションからアクセス可能。
-
一時ビュー (Temporary View):
CREATE TEMPORARY VIEW <view_name> AS SELECT ...
- 現在の Spark セッション内でのみ有効。セッション終了時に破棄される。
-
グローバル一時ビュー (Global Temporary View):
CREATE GLOBAL TEMPORARY VIEW <view_name> AS SELECT ...
- 現在の Spark アプリケーション (クラスター) 内で有効。スキーマ
global_temp
を通じてアクセス (SELECT * FROM global_temp.<view_name>
)。
- 現在の Spark アプリケーション (クラスター) 内で有効。スキーマ
12. ファイルへの直接クエリ
基本構文
SELECT * FROM <file_format>.`<path_to_files>`
-
<file_format>
:json
,parquet
,csv
,text
,delta
,avro
,orc
,binaryFile
など - パスはバックティック (
`
) で囲む。ワイルドカード (*
) も使用可能。 - 例:
SELECT * FROM json.\
/mnt/mydata/users/*.json``
ファイルからのデータロード (Delta テーブルへ)
-- CTAS を使用 (シンプルなケース)
CREATE TABLE <delta_table_name>
AS SELECT * FROM <file_format>.`<path_to_files>`;
-- オプション指定が必要な場合 (推奨: 一時ビュー経由)
-- Step 1: ファイルから一時ビューを作成 (スキーマ推論やオプション指定)
CREATE OR REPLACE TEMPORARY VIEW <temp_view_name>
USING <file_format>
OPTIONS (
path '<path_to_files>',
header 'true', -- CSV の場合など
inferSchema 'true', -- スキーマ推論する場合
delimiter ',' -- CSV の場合など
-- その他フォーマット固有オプション
);
-- Step 2: 一時ビューから Delta テーブルを作成
CREATE OR REPLACE TABLE <delta_table_name>
AS SELECT * FROM <temp_view_name>;
13. 高度な SQL 変換
JSON / 構造化データ操作
-
ネストされたフィールドへのアクセス:
- Struct型: ドット (
.
) を使用 (例:address.city
) - JSON文字列 (StringType): コロン (
:
) を使用して抽出 (例:json_col:address.city
)。get_json_object()
関数も使用可能。
- Struct型: ドット (
-
配列 (Array) の操作:
-
explode()
: 配列の各要素を個別の行に展開 -
posexplode()
: 配列の各要素とそのインデックス (位置) を個別の行に展開 - 要素アクセス:
array_col[index]
(0ベースインデックス) -
array_contains()
,size()
,slice()
,concat()
など多数の関数あり
-
-
マップ (Map) の操作:
- 要素アクセス:
map_col[key]
-
map_keys()
,map_values()
など
- 要素アクセス:
-
Struct の作成:
struct(col1, col2, ...)
-
JSON 文字列への変換:
to_json()
-
JSON 文字列からのパース:
from_json()
(スキーマ指定が必要) -
高階関数 (Higher-Order Functions): 配列やマップに対するラムダ関数を用いた複雑な操作を可能にします。
-
TRANSFORM(array, function)
: 配列の各要素に関数を適用し、新しい配列を返します。-- 例1: 配列の各要素を2倍にする SELECT array_col, TRANSFORM(array_col, x -> x * 2) AS doubled_array FROM VALUES (array(1, 2, 3)), (array(4, 5)) AS data(array_col);
実行結果サンプル (例1):
+-----------+-------------+ | array_col | doubled_array| +-----------+-------------+ | [1, 2, 3] | [2, 4, 6] | | [4, 5] | [8, 10] | +-----------+-------------+
-- 例2: Struct 配列から特定のフィールドを抽出する SELECT items_array, TRANSFORM(items_array, item -> item.name) AS item_names FROM VALUES (array(struct('apple' AS name, 50 AS price), struct('banana' AS name, 30 AS price))), (array(struct('orange' AS name, 80 AS price))) AS orders(items_array);
実行結果サンプル (例2):
+-------------------------------------------------+--------------------+ | items_array | item_names | +-------------------------------------------------+--------------------+ | [{"name":"apple","price":50}, {"name":"banana","price":30}] | ["apple", "banana"]| | [{"name":"orange","price":80}] | ["orange"] | +-------------------------------------------------+--------------------+
-
FILTER(array, function)
: 配列の要素のうち、関数がtrue
を返すものだけで構成される新しい配列を返します。-- 例1: 配列から正の数のみを抽出する SELECT numbers_array, FILTER(numbers_array, x -> x > 0) AS positive_numbers FROM VALUES (array(-1, 0, 1, 2, -3)), (array(-5, -8)) AS data(numbers_array);
実行結果サンプル (例1):
+------------------+------------------+ | numbers_array | positive_numbers | +------------------+------------------+ | [-1, 0, 1, 2, -3] | [1, 2] | | [-5, -8] | [] | +------------------+------------------+
-- 例2: Struct 配列から特定の条件に合う要素をフィルタリング SELECT items_array, FILTER(items_array, item -> item.price > 100) AS expensive_items FROM VALUES (array(struct('laptop' AS name, 1200 AS price), struct('mouse' AS name, 25 AS price))), (array(struct('keyboard' AS name, 75 AS price))) AS orders(items_array);
実行結果サンプル (例2):
+---------------------------------------------------+--------------------------------+ | items_array | expensive_items | +---------------------------------------------------+--------------------------------+ | [{"name":"laptop","price":1200}, {"name":"mouse","price":25}] | [{"name":"laptop","price":1200}] | | [{"name":"keyboard","price":75}] | [] | +---------------------------------------------------+--------------------------------+
-
その他:
EXISTS
(配列に関数を満たす要素が存在するか),REDUCE
(配列要素を単一値に集約),MAP_FILTER
,MAP_ZIP_WITH
など。
-
PIVOT
行を列に変換します。集計関数と組み合わせる必要があります。
SELECT * FROM (
SELECT <grouping_col>, <pivot_col>, <aggregate_col> FROM <source_table>
)
PIVOT (
<aggregation_function>(<aggregate_col>) -- 例: SUM(amount)
FOR <pivot_col> -- どの列の値で新しい列を作成するか (例: year)
IN (<value1>, <value2>, ...) -- 新しい列名となる値 (例: 2022, 2023)
);
ユーザー定義関数 (UDF)
SQL 式で表現できないカスタムロジックを実装します。パフォーマンス影響に注意。
-
SQL UDF:
CREATE FUNCTION [IF NOT EXISTS] <function_name> (param1 type1, ...) RETURNS return_type LANGUAGE SQL -- デフォルト [COMMENT '<comment>'] RETURN <sql_expression_or_logic>; -- SQL 式で定義
-
Python UDF: (ノートブックセッション内、または Unity Catalog に登録)
# セッション内UDF def my_py_udf(col1, col2): # Python ロジック return result spark.udf.register("my_registered_udf", my_py_udf, ReturnType()) # SQL から呼び出し: SELECT my_registered_udf(col_a, col_b) FROM table
-- UC SQL UDF (Python) CREATE FUNCTION square (x INT) RETURNS INT LANGUAGE PYTHON AS $$ def square(x): return x * x return square(x) $$;
14. Spark Structured Streaming
基本概念
- データストリームを無限に続くテーブル (Unbounded Table) として扱う。
- マイクロバッチ処理 (Micro-batch Processing) または 継続的処理 (Continuous Processing) で動作。
- ステートフル処理 (Stateful Processing) と ステートレス処理 (Stateless Processing) をサポート。
- 耐障害性 (Fault Tolerance) と Exactly-Once 処理セマンティクス (Delta Lake シンク使用時など)。
spark.readStream
)
ストリームの読み取り (# Delta テーブルをストリームとして読み取る
stream_df = spark.readStream.format("delta").table("my_delta_table")
# ファイルソース (例: JSON)
stream_df = (spark.readStream
.format("json")
.schema(my_schema) # スキーマ指定推奨
.load("/path/to/streaming/files"))
# Kafka など他のソースもサポート
df.writeStream
)
ストリームの書き込み (query = (stream_df # または変換後の DataFrame (transformed_df)
.writeStream
.format("delta") # シンクのフォーマット (Delta Lake 推奨)
.outputMode("append") # append, complete, update
.trigger(processingTime='1 minute') # 処理トリガー (例: 1分ごと)
.option("checkpointLocation", "/path/to/checkpoint") # 必須: 状態管理用
.table("my_target_delta_table")) # Delta テーブルに書き込む場合
# .start("/path/to/output") # ファイルに書き込む場合
# query.awaitTermination() # フォアグラウンドで実行する場合
重要な設定
-
outputMode
:-
"append"
(デフォルト): 新しく追加された行のみをシンクに追加 (集計なしの場合)。 -
"complete"
: トリガーごとに結果テーブル全体が再計算され上書き (集計ありの場合に使用)。 -
"update"
: 変更された行のみがシンクに書き込まれる (集計あり、completeが不可能な場合)。
-
-
trigger
:-
processingTime='interval'
(例:'5 minutes'
,'1 hour'
): 指定間隔でマイクロバッチ処理。 デフォルトでは、トリガー間隔を指定しない場合、データは半秒 (500ミリ秒) ごとに処理されます。これはtrigger(processingTime="500ms")
と同等です。 -
once=True
: 現在利用可能なデータを単一バッチで処理して停止。手動実行やジョブでのバッチ処理に利用。 -
availableNow=True
: (Databricks 独自) 現在利用可能なデータを複数マイクロバッチで処理し、完了後に停止。once=True
よりスケーラブル。 -
continuous='interval'
(例:'1 second'
): 低レイテンシの継続的処理 (実験的、制約あり)。
-
-
checkpointLocation
: ストリームの状態 (オフセット、集計状態など) を保存する場所。耐障害性と Exactly-Once のために必須。
15. インクリメンタルデータインジェスチョン
新しいデータファイルのみを効率的にテーブルに取り込む方法。
COPY INTO (SQL)
COPY INTO <target_delta_table>
FROM <source_location> -- 例: '/mnt/landing/data', 's3://bucket/path'
FILEFORMAT = <format> -- CSV, JSON, PARQUET, AVRO, ORC, TEXT, BINARYFILE
[PATTERN = '<glob_pattern>'] -- 例: '*.csv', 'year=*/month=*/day=*/*.json'
[FORMAT_OPTIONS ('<option>' = '<value>', ...)] -- 例: header 'true', inferSchema 'true'
[COPY_OPTIONS ('<option>' = '<value>', ...)]; -- 例: mergeSchema 'true', force 'true' (冪等性を無視)
-
特徴:
- 冪等 (Idempotent): 同じコマンドを再実行しても、既にロードされたファイルは自動的にスキップされる。
- SQL ベースでシンプル。
- トランザクション内で実行される。
- 数千ファイル程度の比較的小〜中規模のロードに適している。
Auto Loader (Structured Streaming)
クラウドストレージ上の新しいデータファイルを自動的かつ効率的に検出・処理する Structured Streaming ソース。
readStream
で format("cloudFiles")
を指定することにより、Auto Loader が有効になります。
# Python API
stream_df = (spark.readStream
.format("cloudFiles") # Auto Loader 指定
.option("cloudFiles.format", "<file_format>") # json, csv, parquet など
.option("cloudFiles.schemaLocation", "<schema_location_path>") # スキーマ情報と処理済みファイルのインデックスを保存 (必須)
.option("cloudFiles.inferColumnTypes", "true") # 型推論 (任意)
.option("cloudFiles.schemaHints", "col1 type1, col2 type2") # スキーマヒント (任意)
# その他フォーマット固有オプション (例: .option("header", "true"))
.load("<source_directory_path>"))
query = (stream_df.writeStream
.format("delta")
.option("checkpointLocation", "<checkpoint_location_path>") # ストリーミングクエリのチェックポイント
.option("mergeSchema", "true") # スキーマ進化を許可
.trigger(availableNow=True) # バッチ的なインクリメンタル処理に推奨
.table("<target_delta_table>"))
-- SQL (DLT での使用例、直接 SQL でも可能)
CREATE OR REFRESH STREAMING LIVE TABLE bronze_table
AS SELECT * FROM cloud_files(
"<source_path>",
"<file_format>",
map(
"cloudFiles.schemaLocation", "<schema_location_path>",
"cloudFiles.inferColumnTypes", "true",
"<option_key>", "<option_value>"
-- 例: "header", "true"
)
);
-
特徴:
- 非常にスケーラブル (数百万ファイル以上に対応)。
- クラウドネイティブなファイル検出方法を使用 (効率的)。
- スキーマ推論とスキーマ進化 (
cloudFiles.schemaLocation
で管理) をサポート。 -
checkpointLocation
により Exactly-Once 保証。 - 大規模または継続的なファイルインジェスチョンのための Databricks ベストプラクティス。
16. マルチホップアーキテクチャ (メダリオン)
データ処理パイプラインを段階的に構築するための設計パターン。データの品質と構造を徐々に向上させる。
レイヤー
-
ブロンズ (Bronze / Raw):
- ソースシステムから取り込んだ生データをそのまま、または最小限の変換 (メタデータ追加など) で格納。
- Delta Lake 形式で保存し、履歴保持 (タイムトラベル) を可能にする。
- スキーマはソースに近い形。
-
シルバー (Silver / Cleansed / Enriched):
- ブロンズデータをクレンジング (欠損値処理、重複排除)、変換、結合、フィルター、エンリッチメント (他のデータソースとの結合) を行ったテーブル。
- 検証済みで信頼性が高く、分析しやすい構造になっている。ビジネスロジックの一部が適用される。
- アドホック分析や特徴量エンジニアリングの入力として利用。
-
ゴールド (Gold / Aggregated / Curated):
- 特定のビジネス要件やユースケース (BIレポート、ダッシュボード、MLアプリケーションなど) に合わせて集計・最適化されたテーブル。
- 非正規化され、特定の分析クエリに対して高性能。
- ビジネスユーザーやアプリケーションが直接利用する層。
利点
- シンプルで理解しやすい構造。
- インクリメンタル ETL: 各層へのデータ移動を増分処理 (例: Structured Streaming, DLT) で効率化。
- 再処理容易性: 問題発生時に、ブロンズ層やシルバー層から下流のテーブルを再構築可能。
- データ品質の段階的向上とガバナンス強化。
- バッチ/ストリーミング統合: 各ホップを柔軟に構成可能。
17. Delta Live Tables (DLT)
宣言的な ETL パイプラインを簡単に構築、管理、運用するためのフレームワーク。
定義と目的
- SQL または Python を使用して、データ変換ロジックを宣言的に定義。
- パイプライン内のテーブル間の依存関係を自動的に解決し、実行順序を管理。
- データ品質制約 (Expectations) を定義し、監視・適用。
- インフラ管理 (クラスターのプロビジョニング、スケーリング、障害回復) を自動化。
- インクリメンタル処理とバッチ処理の両方をサポート。
- スコープと他のツールとの関係: DLT は、ETL パイプライン内部のデータフローと依存関係の管理、品質チェック、自動化に特化したフレームワークです。DLT 自体は、パイプライン実行の前後に別の種類のタスク(例: 特定のノートブックやスクリプトの実行)をオーケストレーションする機能は持ちません。そのような広範なワークフロー管理のためには Databricks Jobs が使用され、DLT パイプラインの実行自体が Jobs の一つのタスクとしてスケジュール・実行されるのが一般的な構成です。
DLT テーブル/ビューの宣言 (SQL)
-- ストリーミングLIVEテーブル (インクリメンタル処理)
CREATE OR REFRESH STREAMING LIVE TABLE <table_name>
[COMMENT '<comment>']
[TBLPROPERTIES (<key> = <value>, ...)]
AS SELECT ... FROM STREAM(LIVE.<upstream_table>); -- 上流がストリーミングの場合
-- LIVEテーブル (バッチ処理、トリガーごとに全再計算)
CREATE OR REFRESH LIVE TABLE <table_name>
[COMMENT '<comment>']
[TBLPROPERTIES (<key> = <value>, ...)]
AS SELECT ... FROM LIVE.<upstream_table>;
-- LIVEビュー (中間的な変換、データは永続化されない)
CREATE LIVE VIEW <view_name>
[COMMENT '<comment>']
AS SELECT ... FROM LIVE.<upstream_table_or_view>;
-
LIVE.
プレフィックスで同じDLTパイプライン内の他のテーブル/ビューを参照。 -
STREAM()
関数で上流テーブルからの増分読み取りを指定。 -
重要: DLT パイプライン内で定義されたテーブルやビュー (例:
bronze_table
) を、同じパイプライン内の別の DLT クエリ (例: シルバーテーブルの定義) から参照する場合、必ずLIVE.
スキーマ修飾子を使用する必要があります (例:SELECT * FROM LIVE.bronze_table
)。これにより、DLT はパイプライン内の依存関係を正しく解決します。
Auto Loader との連携 (SQL)
cloud_files()
関数を使用して DLT パイプラインのソースとして Auto Loader を簡単に利用可能。
CREATE OR REFRESH STREAMING LIVE TABLE bronze_landing_data
COMMENT "Raw data loaded from cloud storage"
AS SELECT * FROM cloud_files(
"<source_path>",
"<file_format>",
map(
"cloudFiles.schemaLocation", "<schema_location_path_for_this_source>",
"cloudFiles.inferColumnTypes", "true",
"header", "true" -- 例: CSVオプション
-- その他 Auto Loader オプション
)
);
データ品質 (Expectations / Constraints)
データセットに対して品質ルールを定義し、違反時のアクションを指定。
-- テーブル定義内で制約を定義
CREATE OR REFRESH LIVE TABLE validated_data (
CONSTRAINT <constraint_name> EXPECT (<condition>) [ON VIOLATION <action>] -- 例: id IS NOT NULL
-- 複数の制約を定義可能
)
[COMMENT '<comment>']
AS SELECT ... FROM LIVE.source_data;
-- または SELECT 文にインラインで定義 (Python)
-- @dlt.expect_or_drop("valid_id", "id IS NOT NULL")
-- def python_dlt_function(): ...
-
ON VIOLATION
アクション:-
DROP ROW
: 制約違反レコードをターゲットテーブルから破棄。 -
FAIL UPDATE
: パイプライン更新を失敗させる。 -
EXPECT OR WARN
(デフォルト): レコードは保持されるが、メトリクスで違反が記録される。
-
DLT パイプライン実行トリガー
DLT パイプラインの更新方法を制御します。パイプライン設定で選択します。
-
Continuous Pipeline mode (継続実行モード):
- 動作: パイプラインがアクティブな間、クラスターは常に実行され、データソースからの新しい入力データを継続的に処理します。更新が完了するとすぐに新しいデータの処理を開始します。
- ユースケース: データ到着後、可能な限り低いレイテンシで結果を利用可能にする必要がある場合。ストリーミング処理に適しています。
- コスト: クラスターが常時稼働するため、コストが高くなる可能性があります。
-
Triggered Pipeline mode (トリガー実行モード):
- 動作: パイプラインはスケジュールまたは手動トリガーに基づいて1回実行されます。新しいデータを処理するためにクラスターを起動し、処理完了後にパイプラインはシャットダウンします。
- ユースケース: 定期的なバッチ更新(例: 1時間ごと、毎日)。データ鮮度よりもコスト効率が重視される場合。
- コスト: 実行中のみクラスターが稼働するため、コスト効率が良いです。
DLT パイプライン開発/本番モード
DLT パイプラインを開発フェーズと本番運用フェーズで最適化するためのモードです。パイプライン設定で選択します。
-
Development Mode (開発モード):
- 目的: パイプラインの反復的な開発とテスト。
- 動作: クラスター設定の再利用、自動終了無効化、リトライ処理の簡略化により、迅速なデバッグと変更が可能。インフラの更新は行われません。
- 注意: 本番環境の継続的なジョブには非推奨。
-
Production Mode (本番モード):
- 目的: 本番環境での信頼性の高い継続的なデータ処理。
- 動作: パイプライン実行ごとに最適化されたクラスターを起動・管理(自動終了含む)。堅牢なエラーリトライ機能を提供。インフラの更新も自動で行われます。
- 特徴: スケーラビリティ、信頼性、リソース効率に優れる。
18. Change Data Capture (CDC) と Delta Live Tables (DLT)
Change Data Capture (CDC) とは
Change Data Capture (CDC) は、データベースや他のデータソースで発生したデータの変更(挿入、更新、削除)を識別し、捕捉(キャプチャ)するプロセスです。これにより、ソースシステム全体をコピーするのではなく、変更されたデータのみを下流のシステム(例: データウェアハウス、データレイクハウス)に効率的に反映させることができます。
典型的な CDC レコードには、変更されたデータ本体に加えて、以下のメタデータが含まれることが多いです。
- 変更操作の種類 (例:
INSERT
,UPDATE
,DELETE
) - 変更が発生した順序を示す情報 (例: タイムスタンプ、ログシーケンス番号)
APPLY CHANGES INTO
DLT における CDC 処理: Delta Live Tables (DLT) は、この CDC データをターゲットとなる Delta Lake テーブルに適用するための強力な宣言的機能 APPLY CHANGES INTO
を提供します。これにより、複雑なマージロジックや履歴管理 (SCD Type 1, SCD Type 2) を SQL でシンプルに記述できます。
APPLY CHANGES INTO
は、CDC ソースデータストリームを受け取り、指定されたキーに基づいてターゲットテーブルにレコードをマージします。変更の順序を考慮し、削除操作も適切に処理します。
APPLY CHANGES INTO
構文の詳細
-- まず、ターゲットとなる Delta Live Table を定義します。
-- このテーブルは CDC 変更を受け取る最終的な状態または履歴を保持します。
CREATE OR REFRESH STREAMING LIVE TABLE <target_table_name>
TBLPROPERTIES ("key" = "value", ...) -- オプション: テーブルプロパティ
AS SELECT ... ; -- オプション: 初期ロードやスキーマ定義のため
-- 次に、ソースとなる CDC データを含むストリーミングテーブル (例: ブロンズ層) を定義します。
-- このテーブルには通常、操作タイプやシーケンス情報が含まれます。
CREATE OR REFRESH STREAMING LIVE TABLE <cdc_source_table_name>
COMMENT "Raw CDC feed from source system";
-- AS SELECT ... FROM cloud_files(...) などでソースからロード
-- APPLY CHANGES INTO を使って、CDCソースからターゲットテーブルに変更を適用します。
APPLY CHANGES INTO LIVE.<target_table_name> -- 変更を適用するターゲットテーブル
FROM STREAM(LIVE.<cdc_source_table_name>) -- 変更データを含むソースストリーム
KEYS (<primary_key_col1>, <primary_key_col2>, ...) -- レコードを一意に識別するキー(複数指定可)
[APPLY AS DELETE WHEN <operation_col> = 'DELETE_INDICATOR'] -- ソース内のどの条件が削除操作を示すか指定 (例: operation = 'D')
[APPLY AS TRUNCATE WHEN <operation_col> = 'TRUNCATE_INDICATOR'] -- (稀) ソース内のどの条件がTruncate操作を示すか
SEQUENCE BY <sequence_col> -- 変更の順序を決定する列 (タイムスタンプやバージョン番号など)。値が大きい方が新しい変更。
[COLUMNS { * EXCEPT (<metadata_col1>, ...) | <col1>, <col2>, ... }] -- ターゲットに含める列。デフォルトは * EXCEPT (キー列, sequence列, operation列)。
[STORED AS { SCD TYPE 1 | SCD TYPE 2 }] -- 変更履歴の保存方法 (デフォルト: SCD TYPE 1)
[TRACK HISTORY ON (<col1>, <col2>, ...)]; -- SCD TYPE 2 の場合、履歴を追跡する列を指定 (デフォルト: キー以外の全列)
主要な句の説明:
-
KEYS (<primary_key_cols>)
: (必須) ソースとターゲットのレコードをマッチングさせるための主キー列。複合キーも可能です。 -
SEQUENCE BY <sequence_col>
: (必須) 同じキーに対して複数の変更が発生した場合、どの変更が最新かを判断するための列。通常、イベントタイムスタンプやバージョン番号が使われます。DLT はこの列の値が最も大きいレコードを最新の変更として扱います。 -
APPLY AS DELETE WHEN <condition>
: (オプション) ソースレコードがターゲットテーブルからの削除を表す条件を指定します。例えば、CDC フィードにoperation
列があり、値 'D' が削除を示す場合、APPLY AS DELETE WHEN operation = 'D'
と記述します。 -
STORED AS SCD TYPE 1 | SCD TYPE 2
: (オプション) Slowly Changing Dimension (SCD) のタイプを指定します。-
SCD TYPE 1
(デフォルト): ターゲットテーブルには常に最新のレコード状態のみが保持されます。更新は既存のレコードを上書きし、削除はレコードを物理的に削除します。履歴は保持されません。 -
SCD TYPE 2
: ターゲットテーブルに変更履歴が保持されます。更新が発生すると、古いレコードは「非アクティブ」とマークされ(通常、終了日タイムスタンプが付与される)、新しいバージョンのレコードが「アクティブ」として挿入されます。削除もレコードを「非アクティブ」としてマークします。DLT はこれを実現するために__START_AT
,__END_AT
,__CURRENT
といった管理列を自動的にターゲットテーブルに追加・管理します。
-
-
TRACK HISTORY ON (<cols>)
: (STORED AS SCD TYPE 2
の場合のみオプション) どの列の変更が新しい履歴レコードの作成(バージョニング)を引き起こすかを指定します。デフォルトでは、キー列とシーケンス列以外のすべての列の変更が履歴追跡の対象となります。特定の列(例: 最終更新日など)の変更では新しいバージョンを作成したくない場合に指定します。 -
COLUMNS { * EXCEPT (<cols>) | <cols_list> }
: (オプション) ソースストリームからターゲットテーブルに含める列を指定します。デフォルト (* EXCEPT (...)
) では、キー列、シーケンス列、APPLY AS DELETE/TRUNCATE
で使用される列を除いたすべての列がターゲットに含まれます。
具体的なクエリサンプル
シナリオ:
Databricks の外部に存在するデータベースの customers
テーブルからキャプチャされた CDC データが、bronze_cdc_customers
という DLT テーブルにロードされているとします。このテーブルには以下の列があると仮定します。
-
customer_id
(INT): 顧客ID (主キー) -
name
(STRING): 顧客名 -
email
(STRING): メールアドレス -
city
(STRING): 都市 -
operation
(STRING): 操作タイプ ('I' = Insert, 'U' = Update, 'D' = Delete) -
event_timestamp
(TIMESTAMP): 変更イベントのタイムスタンプ (シーケンス用)
例1: SCD Type 1 (最新状態のみ保持)
ターゲットテーブル silver_customers_scd1
を作成し、常に最新の顧客情報を保持します。
-- ターゲットテーブル (最初は空でも良い)
CREATE OR REFRESH STREAMING LIVE TABLE silver_customers_scd1 (
customer_id INT,
name STRING,
email STRING,
city STRING
)
COMMENT "Latest customer information (SCD Type 1)";
-- ソースCDCテーブル (cloud_filesなどでロードされていると仮定)
CREATE OR REFRESH STREAMING LIVE TABLE bronze_cdc_customers
COMMENT "Raw CDC feed for customers";
-- AS SELECT ... FROM cloud_files(...)
-- APPLY CHANGES を使って SCD Type 1 で変更を適用
APPLY CHANGES INTO LIVE.silver_customers_scd1
FROM STREAM(LIVE.bronze_cdc_customers)
KEYS (customer_id) -- 主キーを指定
APPLY AS DELETE WHEN operation = 'D' -- 'D' 操作で削除
SEQUENCE BY event_timestamp -- タイムスタンプで順序付け
STORED AS SCD TYPE 1; -- 明示的に Type 1 を指定 (デフォルトだが明確化のため)
-- COLUMNS 句を省略しているので、キー(customer_id), シーケンス(event_timestamp), 操作(operation) 以外の列 (name, email, city) がターゲットに含まれる
結果: silver_customers_scd1
テーブルには、各 customer_id
について最新の name
, email
, city
が格納されます。削除された顧客のレコードはテーブルから物理的に削除されます。
例2: SCD Type 2 (変更履歴を保持)
ターゲットテーブル silver_customers_scd2
を作成し、顧客情報の変更履歴を保持します。email
の変更のみ履歴を追跡し、city
の変更は最新情報で上書きする (履歴追跡対象外とする) ようにします。
-- ターゲットテーブル (DLTが管理列を追加するので、基本列のみ定義)
CREATE OR REFRESH STREAMING LIVE TABLE silver_customers_scd2 (
customer_id INT,
name STRING,
email STRING,
city STRING
-- DLTが __START_AT, __END_AT, __CURRENT 列を自動追加
)
COMMENT "Customer history (SCD Type 2), tracking email changes";
-- ソースCDCテーブル (例1と同様)
CREATE OR REFRESH STREAMING LIVE TABLE bronze_cdc_customers
COMMENT "Raw CDC feed for customers";
-- AS SELECT ... FROM cloud_files(...)
-- APPLY CHANGES を使って SCD Type 2 で変更を適用
APPLY CHANGES INTO LIVE.silver_customers_scd2
FROM STREAM(LIVE.bronze_cdc_customers)
KEYS (customer_id)
APPLY AS DELETE WHEN operation = 'D'
SEQUENCE BY event_timestamp
STORED AS SCD TYPE 2 -- Type 2 を指定
TRACK HISTORY ON (name, email); -- name または email の変更があった場合に新しい履歴レコードを作成 (cityの変更は現在のレコードを上書き)
-- COLUMNS 句は省略
結果: silver_customers_scd2
テーブルには、顧客ごとの変更履歴が格納されます。
- 新しい顧客が追加される (
operation = 'I'
) と、__CURRENT
が true のレコードが挿入されます。 - 顧客の
name
またはemail
が更新される (operation = 'U'
) と、既存のレコードの__CURRENT
が false になり__END_AT
が設定され、新しい__CURRENT
が true のレコードが挿入されます。 - 顧客の
city
のみが更新された場合、TRACK HISTORY ON
で指定されていないため、現在の__CURRENT = true
のレコードのcity
フィールドが直接上書きされ、新しい履歴レコードは作成されません。 - 顧客が削除される (
operation = 'D'
) と、既存のレコードの__CURRENT
が false になり__END_AT
が設定されます。
APPLY CHANGES INTO
を使用することで、開発者は CDC データの複雑なマージロジックや履歴管理ロジックを低レベルで実装する必要がなくなり、宣言的にデータパイプラインを構築できます。
19. Databricks ジョブ (Workflows)
目的
- Databricks プラットフォームにおける主要なオーケストレーションサービス。
- 複数のタスク (ノートブック, Python スクリプト, JAR, SQL クエリ, Delta Live Tables パイプライン, Python Wheel, dbt タスクなど) を順序付けて実行 (依存関係定義)。
- これらのタスク間の依存関係を定義し、複雑なワークフローを構築・管理。
- 並列実行による時間短縮。
- スケジュール実行 (定期的、またはイベントトリガー)。
- パラメータ渡しによる動的な実行。
- 実行監視、アラート通知、エラーハンドリング。
タスク設定
- タスク名
- タスクタイプ:
Notebook
,Python Script
,JAR
,SQL
,DLT Pipeline
,Python Wheel
,dbt
, etc. - ソースコードの場所 (Workspace, Repos, DBFS)
- 実行するクラスター (Job Cluster or All-Purpose Cluster)
- パラメータ (Widgets や Task Values)
- 依存関係 (Depends On): タスク間の実行順序を定義 (DAG: 有向非巡回グラフ)。
- リトライポリシー、タイムアウト設定。
スケジュール
- ジョブを定期的に実行するように設定可能 (例: 毎時、毎日、毎週)。
- Cron 構文も使用可能で、柔軟なスケジュール設定ができる。
- ファイル到着などのイベントによるトリガー実行も可能 (File Arrival Trigger)。
修復実行 (Repair Run / Run failed tasks)
ジョブ実行が一部のタスクで失敗した場合に、失敗したジョブ実行を効率的に完了させるための機能です。
- 失敗したタスクとその下流にある依存タスクのみを再実行します(UIでは "Run failed tasks" として表示されることもあります)。
- 正常に完了したタスクは再実行されず、その結果が再利用されます。
- これにより、ジョブ全体を最初から再実行する場合と比較して、時間とコストを大幅に節約できます。
例:
複数のタスクから成るジョブで、最後のタスクだけが失敗した場合、「Repair Run」を実行すると、その失敗した最後のタスクのみが再実行され、それ以前の正常に完了したタスク(例: 数時間かかった処理)を繰り返す必要がなく、最小限の実行時間でジョブを完了させることができます。
20. Databricks SQL (DBSQL)
SQL アナリスト向けのデータウェアハウジングおよび SQL 分析環境。データレイクハウス上のデータに対する BI やアドホック分析に最適化。
概要
- 使い慣れた SQL インターフェースを提供。
- 高性能な SQL 実行エンジン (Photon 利用の SQL Warehouse)。
- データカタログ (Unity Catalog) との統合。
- BI ツール (Tableau, Power BI など) との接続性。
主要コンポーネント
-
SQL Warehouse (旧 SQL Endpoint):
- DBSQL クエリを実行するための専用コンピューティングリソース。
- 高い並列性と低いレイテンシを実現するよう最適化されている (Photon デフォルト有効)。
- サイズ (Tシャツサイズ:
2X-Small
〜4X-Large
) とクラスタリング、スケーリング設定を選択可能。 - Serverless オプションも利用可能 (一部リージョン)。
-
SQL Editor:
- SQL クエリの作成、実行、結果確認、視覚化を行う Web ベースのエディタ。
- スキーマブラウザ、オートコンプリート、クエリ履歴機能。
-
Queries:
- 作成した SQL クエリを保存、管理、共有。
-
リフレッシュスケジュールの設定: 保存したクエリは、定期的に自動再実行するようにスケジュールを設定できます。
- 設定方法: Databricks SQL の UI で対象のクエリを開き、エディタ画面の上部または横にある「スケジュール (Schedule)」または類似のオプションを選択します。ここで、クエリを自動的に再実行する頻度(例: 毎時、毎日、毎週など)と時刻を設定できます。
- 目的: 設定されたスケジュールに従ってクエリがバックグラウンドで実行され、クエリの結果(テーブル表示や関連する視覚化)が自動的に更新されます。これは、ダッシュボードなどで利用されるクエリのデータを常に最新に保つために役立ちます。
- パラメータ化クエリもサポート。
-
Dashboards:
- 複数のクエリ結果 (テーブル、視覚化) を組み合わせてインタラクティブなレポートを作成・共有。
- 自動更新設定、パラメータ化ダッシュボード。
-
Alerts:
- クエリ結果が特定の閾値を満たしたときに通知 (Email, Slack, Webhook など) を送信する設定。
21. データオブジェクト権限 (Unity Catalog 以前 / Hive Metastore)
Databricks プラットフォーム上の様々なオブジェクトに対するアクセス制御 (主に Hive Metastore 環境)。Unity Catalog が有効な場合は、UC の権限モデル (セクション 22) が優先されます。
対象データオブジェクト (Hive Metastore)
-
CATALOG
(通常はhive_metastore
のみ) -
DATABASE
/SCHEMA
TABLE
VIEW
-
FUNCTION
(UDF) -
ANY FILE
(DBFS への直接アクセス) -
ANONYMOUS FUNCTION
(一時関数)
権限の種類 (Hive Metastore)
-
SELECT
: テーブル/ビューの読み取り、DESCRIBE
-
MODIFY
: テーブルへのデータの追加、削除、変更 (INSERT
,UPDATE
,DELETE
,MERGE
,TRUNCATE
,INSERT OVERWRITE
) -
USAGE
: カタログやデータベース/スキーマの使用 (その中のオブジェクトにアクセスするために必要) -
READ_METADATA
: オブジェクトとそのメタデータの表示 (DESCRIBE
) -
CREATE
: データベース/スキーマ内のテーブル/ビュー作成、カタログ内のデータベース/スキーマ作成 -
EXECUTE
: UDF の実行 -
ALL PRIVILEGES
: 上記全ての権限
主な SQL コマンド (Hive Metastore)
GRANT <privilege | ALL PRIVILEGES> ON <object_type> <object_name> TO <principal>; -- principal は user email or group name
DENY <privilege> ON <object_type> <object_name> TO <principal>; -- GRANT より優先
REVOKE <privilege> ON <object_type> <object_name> FROM <principal>;
SHOW GRANTS [ON <object_type> <object_name>];
SHOW GRANTS <principal> [ON <object_type> <object_name>];
注意: テーブル ACL (アクセス制御リスト) を有効にする必要があります。
22. Unity Catalog (UC)
Databricks 上のデータと AI アセットのための一元化されたガバナンスソリューション。
アーキテクチャ
-
メタストア (Metastore):
- UC の最上位コンテナ。アカウント内のデータアセット (テーブル、ファイル、関数、モデルなど) のメタデータを保持。
- リージョンごとに作成し、同じリージョン内の複数ワークスペースにアタッチ可能。
-
カタログ (Catalog):
- メタストア内のトップレベルの名前空間。データ資産の論理的なグループ化 (例: 環境別、部門別)。デフォルトは
main
。 -
hive_metastore
カタログは、ワークスペースローカルな Hive Metastore へのアクセスを提供 (UC管理外)。
- メタストア内のトップレベルの名前空間。データ資産の論理的なグループ化 (例: 環境別、部門別)。デフォルトは
-
スキーマ (Schema / Database):
- カタログ内のセカンドレベルの名前空間。テーブル、ビュー、関数などを格納。
-
管理オブジェクト:
- 外部ロケーション (External Location): クラウドストレージパス (S3 バケット、ADLS Gen2 コンテナなど) へのアクセスを許可するセキュアなオブジェクト。External Table のデータ格納場所として使用。
- ストレージ認証情報 (Storage Credential): クラウドストレージへのアクセスに使用される認証情報 (例: Azure Managed Identity, AWS IAM Role, GCP Service Account)。外部ロケーションと関連付ける。
-
3 レベル名前空間:
<catalog_name>.<schema_name>.<object_name>
(テーブル、ビュー、関数、モデルなど) を使用してデータアセットにアクセス。
セキュリティモデル
- プリンシパル (Principals): 権限を付与される対象 (Users, Service Principals, Groups)。アカウントレベルで管理。
-
オーナーシップ (Ownership):
- オブジェクト作成者がデフォルトオーナー。オーナーはオブジェクトに対する全権限を持ち、他のプリンシパルへの権限付与/剥奪が可能。
- オーナーシップは、SQL コマンド (
ALTER <object_type> <object_name> OWNER TO <principal>;
) または Databricks UI (Data Explorer) を通じて移譲可能です。
-
UI での所有権変更手順 (例: テーブル):
- Data Explorer に移動: Databricks ワークスペースの左側のナビゲーションメニューから「データ (Data)」または「カタログ (Catalog)」を選択して Data Explorer を開きます。
- テーブルを選択: 所有権を変更したいテーブルをカタログ、スキーマを通じて見つけ、クリックしてテーブルの詳細ページを開きます。
- Owner フィールドを見つける: テーブルの詳細ページ(通常は「詳細 (Details)」タブや概要セクション)に、現在の所有者を示す「Owner」フィールドがあります。
- 所有者を編集: 適切な権限(通常は現在の所有者、またはメタストア/カタログ/スキーマの管理者など、オブジェクトタイプによる)があれば、「Owner」フィールドの横にある編集アイコン(例: 鉛筆アイコン)をクリックできます。
- 新しい所有者を選択: 新しい所有者となるユーザーまたはグループを検索・選択し、変更を保存します。
- 権限継承 (Privilege Inheritance): 上位オブジェクト (カタログ、スキーマ) で付与された権限は、下位オブジェクト (スキーマ、テーブル) にデフォルトで継承される。
-
主な権限:
- メタストアレベル:
CREATE CATALOG
,CREATE EXTERNAL LOCATION
,CREATE STORAGE CREDENTIAL
など - カタログレベル:
USE CATALOG
(カタログへのアクセス),CREATE SCHEMA
- スキーマレベル:
USE SCHEMA
(スキーマへのアクセス),CREATE TABLE
,CREATE VIEW
,CREATE FUNCTION
,CREATE MODEL
- テーブル/ビューレベル:
SELECT
(読み取り),MODIFY
(書き込み:INSERT
,UPDATE
,DELETE
,MERGE
,INSERT OVERWRITE
,TRUNCATE
) - 関数/モデルレベル:
EXECUTE
- 外部ロケーションレベル:
READ FILES
,WRITE FILES
,CREATE EXTERNAL TABLE
- ストレージ認証情報レベル:
CREATE EXTERNAL LOCATION
(その認証情報を使う権限)
- メタストアレベル:
-
主な SQL コマンド: (Hive Metastore と類似だが
DENY
は非推奨)GRANT <privilege> ON <securable_object> TO <principal>; REVOKE <privilege> ON <securable_object> FROM <principal>; SHOW GRANTS ON <securable_object>; SHOW GRANTS TO <principal>; ALTER <securable_object> OWNER TO <principal>; -- オーナー変更 (SQLコマンド)
主な利点
- 一元化されたガバナンス: 複数ワークスペース/クラウド間でデータアクセス制御を一元管理。
- 標準 SQL ベースの権限管理: 使い慣れた
GRANT
/REVOKE
構文。 - 詳細なアクセス制御: テーブル、ビュー、行、列レベルのアクセス制御 (行/列レベルは Databricks SQL または Photon 対応クラスターが必要)。
- データリネージ: テーブルおよび列レベルで、データがどのように生成され、どこで使用されているかを自動追跡 (クエリ実行時にキャプチャ)。
- データ共有 (Delta Sharing): オープンスタンダードに基づき、組織内外とのセキュアなライブデータ共有。
- 監査ログ: データアクセスに関する詳細な監査ログを一元的に記録。
- データディスカバリ: カタログエクスプローラーや検索機能による容易なデータ検索と理解。
Unity Catalog における役割と責任
役割 | 主なスコープ | 主な責任 | キーポイント |
---|---|---|---|
Cloud Admin | クラウドプラットフォーム | クラウドストレージ、IAMロール/サービスプリンシパル等の作成・管理 | Databricks外部のインフラ担当。UCの前提条件を提供。 |
Account Admin | Databricksアカウント全体 | メタストア作成、アカウントレベル管理、ユーザー管理、ワークスペース管理、請求管理 | Databricksの最高管理者。UCの初期設定を行う。 |
Metastore Admin | 特定のUnity Catalogメタストア | カタログ管理、メタストア全体の権限管理、所有権移転、Storage Credential/External Location管理、Delta Sharing | 特定メタストア内の構造と全体アクセス制御を担当。 |
Data Owner | 特定のデータオブジェクト | 所有オブジェクトの管理、所有オブジェクトへの権限付与 | 特定データ資産の責任者。そのデータへのアクセスを制御。 |
Discussion