Databricks Certified Data Engineer Professionalチートシート
Databricksデータエンジニア Professional チートシート
このチートシートは、Databricksプラットフォームにおけるデータエンジニアリングの主要な概念、コマンド、ベストプラクティスをまとめたものです。Bronzeレイヤーへのデータ取り込み、ストリーミング処理、Delta Lakeの高度な機能、Slowly Changing Dimensions、CDC処理、CDF、Join戦略、ジョブオーケストレーション、テスト、権限管理、モニタリングなどをカバーしています。
Bronzeレイヤー、Auto Loader、ストリーミング、Delta Lake基本操作、SCD
1. Bronzeレイヤーへのデータ取り込みパターン
目的: ソースデータをBronzeテーブルにどのようにマッピングするか決定する。
-
Singleplex (1対1):
- 各データソース/トピックを個別のBronzeテーブルに取り込む。
- 利点: シンプル、従来のバッチ処理に適している。
- 欠点: ストリーミングソースが多い場合、同時実行ジョブ数の制限に達する可能性がある。
-
Multiplex (多対1):
- 複数のデータソース/トピックを単一のBronzeテーブルに取り込む。
- 利点: ストリーミングソースが多い場合に効率的。ジョブ数を削減。
-
実装:
- ソース: Kafka、またはAuto Loaderを使用したクラウドストレージ上のファイル (JSONなど)。
- テーブル構造:
topic
列 (ソース/トピック識別子)、value
列 (実際のデータ、JSON形式など) を含むことが多い。 - 後続処理: Silverレイヤーへの移行時に
topic
列でフィルタリングする。
2. Auto Loader
目的: クラウドストレージから増分データを効率的に取り込む。
-
設定 (PySpark):
(spark.readStream .format("cloudFiles") .option("cloudFiles.format", "json") # or "csv", "parquet", etc. .option("cloudFiles.schemaLocation", "/path/to/schema/checkpoint") # スキーマ追跡・進化のため .schema(your_schema) # スキーマ指定 (推論も可能) .load("/path/to/source/directory/") )
-
スキーマ進化:
-
cloudFiles.schemaLocation
を指定すると、スキーマの追跡と進化が有効になる。 - 新しい列が検出された場合、自動的にテーブルスキーマが更新される (書き込み時に
.option("mergeSchema", "true")
が必要)。
-
-
実行モード:
-
.trigger(availableNow=True)
: 利用可能なすべての新規ファイルを処理し、完了後にストリームを停止するバッチモード。 -
.trigger(processingTime='1 minute')
: 継続的なストリーミング処理 (指定間隔ごと)。
-
3. ストリーミング処理
-
基本:
- 読み込み:
spark.readStream...
- 書き込み:
df.writeStream...
- 読み込み:
-
JSONパース (Kafkaデータ例):
from pyspark.sql.functions import col, from_json from pyspark.sql.types import StructType, StructField, StringType, IntegerType # Define your schema kafka_schema = StructType([...]) # Kafkaのトップレベルスキーマ (topic, key, value, timestampなど) payload_schema = StructType([...]) # value列内のJSONスキーマ df = (spark.readStream.format("delta").table("bronze_multiplex") .filter(col("topic") == "orders") # Multiplexテーブルから特定トピックを抽出 .select(from_json(col("value").cast("string"), payload_schema).alias("payload")) # valueをパース .select("payload.*") # パースしたフィールドを展開 )
-- SQLでのJSONパース SELECT from_json(CAST(value AS STRING), 'schema_string').* FROM bronze_multiplex WHERE topic = 'orders';
-
ストリーミング一時ビュー:
- SQLでストリーミングデータを扱うために、DataFrameからストリーミング一時ビューを作成できる。
df.createOrReplaceTempView("streaming_temp_view")
-- streaming_temp_view を通常のテーブルのようにクエリ可能 (ストリーミングとして扱われる) SELECT * FROM streaming_temp_view WHERE amount > 100;
-
foreachBatch
:- 各マイクロバッチに対してカスタムの書き込みロジックを実行できる。
- MERGE操作 (Upsertや重複排除) など、標準の
writeStream
では直接サポートされない複雑な書き込み処理に有用。
def upsert_microbatch(microBatchDF, batchId): # MERGEロジックなどをここに記述 microBatchDF.createOrReplaceTempView("microbatch_data") # Sparkセッション取得 (Runtime 11.3+): microBatchDF.sparkSession spark = microBatchDF.sparkSession spark.sql(""" MERGE INTO target_table t USING microbatch_data s ON t.key = s.key WHEN MATCHED THEN UPDATE SET ... WHEN NOT MATCHED THEN INSERT ... """) (streamingDF .writeStream .foreachBatch(upsert_microbatch) .option("checkpointLocation", "/path/to/checkpoint") .trigger(availableNow=True) .start() )
-
注意:
foreachBatch
内でSparkセッションを取得する方法はDatabricks Runtimeのバージョンによって異なる場合がある。
4. Delta Lake テーブル操作
-
CHECK制約:
- データ品質を強制するルール。
- 追加:
ALTER TABLE table_name ADD CONSTRAINT constraint_name CHECK (condition);
- 削除:
ALTER TABLE table_name DROP CONSTRAINT constraint_name;
-
動作:
- 制約に違反するデータを含む書き込み操作は失敗する (トランザクション全体がロールバック)。
- 制約追加時、テーブル内の既存データも検証される。違反があれば追加失敗。
-
ストリーミングでの考慮事項: 制約違反でジョブを止めずに不正データを除外したい場合は、書き込み前に
WHERE
句でフィルタリングするか、別の「隔離」テーブルに書き込むなどの対策が必要。
-
ストリーミングでの重複排除:
-
dropDuplicates()
: 静的DataFrameと同様に使用可能。 -
withWatermark()
: 状態管理のオーバーヘッドを抑えるためにdropDuplicates()
と併用する。指定した時間内の遅延レコードのみを考慮して重複チェックを行う。deduplicated_df = (streamingDF .withWatermark("event_timestamp", "30 seconds") # 30秒遅延まで許容 .dropDuplicates(["unique_id"]) )
-
Insert-only Merge (
foreachBatch
を使用): マイクロバッチ内の重複だけでなく、ターゲットテーブルに既に存在するレコードとの重複も排除する確実な方法。-- foreachBatch 内で実行するMERGE文の例 MERGE INTO silver_table t USING microbatch_updates s ON t.unique_id = s.unique_id -- 結合キーで一致確認 WHEN NOT MATCHED THEN INSERT * -- 一致しない場合のみ挿入
-
5. Slowly Changing Dimensions (SCD)
目的: 時間とともに変化するディメンションデータをどのように管理するか。
-
Type 1 (上書き):
- 最新の値で既存のレコードを上書きする。履歴は保持されない。
- 実装: 通常の
UPDATE
やMERGE
のWHEN MATCHED THEN UPDATE
句を使用。
-
Type 2 (履歴保持):
- 変更が発生すると新しいレコードを追加し、古いレコードを非アクティブとしてマークする。完全な履歴を保持。
-
必要な追加列:
-
is_current
(BOOLEAN): レコードが現在有効かを示すフラグ。 -
effective_date
(TIMESTAMP/DATE): レコードが有効になった日時。 -
end_date
(TIMESTAMP/DATE): レコードが無効になった日時 (現在のレコードはNULL)。
-
-
実装 (
foreachBatch
とMERGE
):-
古いレコードを閉じる (マークする):
WHEN MATCHED AND target.is_current = true AND source.data <> target.data
(データに変更があった場合) に、target.is_current = false
,target.end_date = source.effective_date
(または現在のタイムスタンプ) を設定。 -
新しい/変更されたレコードを挿入:
WHEN NOT MATCHED
(または別のロジックで更新レコードを挿入対象として準備) で、新しいレコードをis_current = true
,end_date = NULL
として挿入。
- テキストの例では、更新レコードを意図的に
NOT MATCHED
で挿入するために、まず更新対象をMATCHED
で処理し、その後、更新レコード自体をNOT MATCHED BY SOURCE
のようなロジック(または一時ビューへの再挿入など)で挿入対象としている可能性がある点に注意。具体的なMERGE文は要確認。
-
古いレコードを閉じる (マークする):
-
現在レコードのみのビュー/テーブル: SCD Type 2 テーブルから
WHERE is_current = true
でフィルタリングすることで、常に最新のレコードセットを取得できる。これはバッチ処理で上書き作成 (CREATE OR REPLACE TABLE
) されることが多い。
-
Delta Time Travel vs SCD Type 2:
- Time Travel: 短期間のバージョン管理、偶発的な変更からの回復向け。
VACUUM
で履歴が削除される可能性。長期的な履歴分析には不向き。 - SCD Type 2: 明示的なビジネス要件に基づく長期的な履歴管理手法。
- Time Travel: 短期間のバージョン管理、偶発的な変更からの回復向け。
CDC処理、Delta CDF、Join戦略、ビューとGoldテーブル
1. Change Data Capture (CDC) フィードの処理
CDCとは: データソースでの行レベルの変更 (Insert, Update, Delete) を識別・キャプチャし、ターゲットシステムに伝播するプロセス。
Delta Lakeでの処理:
-
MERGE INTO
: CDCフィードをDeltaテーブルに適用する主要なコマンド。MERGE INTO target_table t USING cdc_source s ON t.id = s.id WHEN MATCHED AND s.operation = 'update' THEN UPDATE SET ... WHEN MATCHED AND s.operation = 'delete' THEN DELETE WHEN NOT MATCHED AND s.operation = 'insert' THEN INSERT ...
-
課題: 同一キーへの複数変更: CDCフィード内に同一キーに対する複数の変更レコードがあると、
MERGE
がエラーになる可能性。 -
解決策: 最新の変更のみ適用:
-
Window関数 (
rank()
) を使用:SELECT * EXCEPT(rank_num) FROM ( SELECT *, rank() OVER (PARTITION BY id ORDER BY update_timestamp DESC) as rank_num FROM cdc_source_with_duplicates ) WHERE rank_num = 1
-
ストリーミングでの適用 (
foreachBatch
): Window関数はストリーミングDataFrameで直接サポートされないため、foreachBatch
内で上記ロジックを適用してからMERGE
を実行する。def process_cdc_batch(microBatchDF, batchId): # Window関数で最新の変更のみを選択 ranked_updates = microBatchDF.sql(""" SELECT * EXCEPT(rank_num) FROM ( SELECT *, rank() OVER (PARTITION BY id ORDER BY row_time DESC) as rank_num FROM microbatch_view -- microBatchDFから作成した一時ビュー ) WHERE rank_num = 1 AND row_status IN ('insert', 'update') -- Deleteは別途処理する場合 """) ranked_updates.createOrReplaceTempView("ranked_updates_view") # MERGE実行 spark = microBatchDF.sparkSession spark.sql(f""" MERGE INTO target_customers c USING ranked_updates_view u ON c.customer_id = u.customer_id WHEN MATCHED THEN UPDATE SET c.email = u.email, ... WHEN NOT MATCHED THEN INSERT (customer_id, email, ...) VALUES (u.customer_id, u.email, ...) """) (cdc_stream_df .writeStream .foreachBatch(process_cdc_batch) .option("checkpointLocation", "...") .trigger(availableNow=True) .start() )
-
Window関数 (
-
ルックアップ結合:
foreachBatch
内で、静的なルックアップテーブルと結合可能。小さなテーブルにはbroadcast()
ヒントが有効。enriched_df = microBatchDF.join(broadcast(country_lookup_df), "country_code")
-
アサーション (
assert
): 開発中にデータやロジックの正当性を検証するために使用。assert spark.table("target_table").count() == expected_count, "Record count mismatch!"
2. Delta Lake Change Data Feed (CDF)
CDFとは: Deltaテーブルへの行レベルの変更 (Insert, Update, Delete) を自動的に記録するDelta Lakeの機能。
-
有効化:
CREATE TABLE ... TBLPROPERTIES (delta.enableChangeDataFeed = true)
ALTER TABLE ... SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
SET spark.databricks.delta.properties.defaults.enableChangeDataFeed = true
-
CDFデータの読み取り:
- SQL:
SELECT * FROM table_changes('table_name', start_version | start_timestamp, [end_version | end_timestamp])
- PySpark (Stream):
spark.readStream.format("delta").option("readChangeData", "true").option("startingVersion", version).table("table_name")
- PySpark (Batch):
spark.read.format("delta").option("readChangeData", "true").option("startingVersion", version).table("table_name")
- SQL:
-
CDFレコードの構造:
- 元のテーブル列 +
_change_type
,_commit_version
,_commit_timestamp
-
_change_type
:insert
,delete
,update_preimage
,update_postimage
- 元のテーブル列 +
-
CDFデータの下流への伝播:
- CDFストリームをソースとして読み取り、下流のテーブルに
MERGE
する (上記CDCフィード処理と同様のロジックをforeachBatch
で適用)。 -
insert
とupdate_postimage
を処理対象とし、_commit_timestamp
でランク付けすることが多い。
cdf_stream = (spark.readStream.format("delta") .option("readChangeData", "true") .option("startingVersion", cdf_start_version) .table("source_table_with_cdf")) # foreachBatch内で、cdf_streamから受け取ったデータを処理 def process_cdf_batch(microBatchDF, batchId): updates_to_merge = microBatchDF.filter(col("_change_type").isin("insert", "update_postimage")) # rank() で最新の変更を選択し、MERGE...
- CDFストリームをソースとして読み取り、下流のテーブルに
-
Stream-Stream Join (CDF利用例): CDFストリームと別の業務データストリーム (例: Orders) を結合して、新しいSilver/Goldテーブルを作成できる。状態管理のため
withWatermark
が重要。 -
注意点:
-
VACUUM
でCDFデータも削除される。 - Update/Deleteを含む増分変更の伝播に有効。Append-onlyなら不要。
- 大量のレコードが更新されるテーブルには不向き。
-
3. Stream-Static Join
Stream-Static Joinとは: ストリーミングDataFrameと静的DataFrame (Deltaテーブルなど) の結合。
-
動作:
- ストリーミング側のデータ到着が処理をトリガー。
- 各マイクロバッチで静的テーブルの最新バージョンが読み込まれる。
- 静的テーブルの更新だけでは処理はトリガーされない。
-
実装:
streaming_df = spark.readStream.format("delta").table("streaming_orders") static_df = spark.read.format("delta").table("static_current_books") joined_stream = streaming_df.join(static_df, "book_id", "inner") # 通常のjoin構文 (joined_stream .writeStream .format("delta") .outputMode("append") .option("checkpointLocation", "...") .trigger(availableNow=True) .toTable("output_silver_table") )
-
制限事項:
- 非ステートフル: 結合時に静的側に一致するレコードがない場合、そのストリーミングレコードは結果に含まれず、失われる。後から静的側が更新されても遡って結合されない。
- 対策: 必要に応じて別途バッチジョブで補完処理を検討。
4. ビューとGoldテーブル (Materialized View)
-
Stored View (ビュー):
- 定義:
CREATE VIEW view_name AS SELECT ... FROM ...;
- 実体: SQLクエリの保存。データの実体は持たない。
- 実行: クエリ実行時に基テーブルから計算。
- キャッシュ: Delta Cachingにより同一クラスターでの再実行は高速な場合がある。
- 定義:
-
Materialized View ( ≈ Gold Table):
- Databricksでの概念: クエリ結果 (特に集計結果) を物理的に格納したDeltaテーブル (Goldレイヤーに作成)。
- 目的: 複雑なクエリや頻繁にアクセスされる集計結果のコスト/レイテンシを削減。
- 作成方法:
-
バッチ:
CREATE OR REPLACE TABLE gold_table AS SELECT ... FROM ... GROUP BY ...;
-
ストリーミング集計:
from pyspark.sql.functions import window gold_stream = (silver_stream .withWatermark("order_timestamp", "10 minutes") # 状態管理と遅延データ対応 .groupBy(window("order_timestamp", "5 minutes"), "author") # 時間窓とキーで集計 .agg(count("order_id").alias("orders_count"), avg("quantity").alias("avg_quantity")) ) (gold_stream .writeStream .format("delta") .outputMode("complete") # 集計結果全体を出力 (または update) .option("checkpointLocation", "...") .trigger(availableNow=True) .toTable("gold_author_stats") )
-
バッチ:
-
使い分け:
- View: シンプルなクエリ、最新データが必要、計算コストが許容範囲。
- Gold Table: 複雑/高コストな集計、頻繁なアクセス、レイテンシ要件が厳しい場合。
Delta Lake詳細:パーティショニング、トランザクションログ、Auto Optimize
1. Delta Lake テーブルパーティショニング
目的: 大規模テーブルに対するクエリパフォーマンスの最適化。
-
仕組み:
- テーブル作成時に
PARTITIONED BY (col1, col2, ...)
句でパーティションキー列を指定。 - 指定された列の値ごとにデータがサブディレクトリに物理的に分割されて保存される (
/path/to/table/col1=valA/col2=valB/...
)。 - データ挿入時にDatabricksが自動で適切なパーティションディレクトリに振り分ける。
- テーブル作成時に
-
利点:
-
Partition Pruning: クエリの
WHERE
句にパーティションキーが含まれる場合、条件に合致しないパーティションディレクトリ全体のスキャンをスキップし、クエリを高速化。 -
効率的なデータ操作:
OPTIMIZE
やデータ削除をパーティション単位で実行可能。 - データ管理: 時間ベースのパーティション(年、月など)を利用して、古いデータの削除やアーカイブを容易にする。
-
Partition Pruning: クエリの
-
ベストプラクティス (パーティションキー選択):
- 低カーディナリティ: 列に含まれるユニークな値が少ないこと(数百〜数千程度が目安)。
- 十分なパーティションサイズ: 各パーティションが最低でも1GB程度のデータ量になるようにする。小さすぎるパーティション(Over-partitioning)は避ける。
-
クエリフィルタ条件:
WHERE
句で頻繁に使用される列。 - 時間ベース: データが継続的に追加される場合(例: ログデータ)、日付/時刻関連の列(年、月、日など)でパーティションすると管理しやすい。
-
注意点:
-
Over-partitioning の弊害: パーティション数が多すぎると、ファイル数が増加しメタデータ管理が複雑になり、多くのクエリでパフォーマンスが低下する。パーティション境界を越えてファイルを結合できないため、
OPTIMIZE
の効果も限定的になる。 - 修正コスト高: 不適切なパーティショニングはテーブル全体の再書き込みが必要。
- 迷ったらパーティションしない: Delta Lakeのファイル統計やZ-Orderingによるデータスキッピングで十分な場合が多い。
-
Over-partitioning の弊害: パーティション数が多すぎると、ファイル数が増加しメタデータ管理が複雑になり、多くのクエリでパフォーマンスが低下する。パーティション境界を越えてファイルを結合できないため、
-
パーティション削除とストリーミング:
- パーティション単位の削除は効率的だが、テーブルをストリーミングソースとして利用できなくなる(追記のみの原則違反)。
-
対策: ストリーミング読み取り時に
.option("ignoreDeletes", "true")
を使用する。 - 物理的なデータファイル削除は
VACUUM
実行時に行われる。
_delta_log
)
2. Delta Lake トランザクションログ (目的: テーブルへの変更履歴を記録し、ACIDトランザクションとTime Travelを実現。
-
コミットログ (JSON):
- 各トランザクション (Commit) は個別のJSONファイル (
000...0.json
,000...1.json
, ...) として_delta_log
ディレクトリに記録される。 - ファイル追加 (
add
) や削除 (remove
) などのアクション情報を含む。
- 各トランザクション (Commit) は個別のJSONファイル (
-
チェックポイント (Parquet):
- 目的: テーブル状態解決の高速化。多数のJSONログを読む手間を省く。
-
生成: デフォルトで10コミットごとに自動生成 (
00...10.checkpoint.parquet
など)。 - 内容: 特定バージョン時点でのテーブル状態全体(有効なファイルリスト、メタデータなど)を集約してParquet形式で保存。
- 動作: Sparkは最新のチェックポイントを読み込み、それ以降のJSONログのみを処理して最新状態を把握する。
-
ファイル統計:
- 目的: データスキッピングによるクエリ最適化。
-
収集: ファイルがテーブルに追加される際に自動で収集され、コミットログ (JSON/Checkpoint) 内の
add
アクションに記録される。 - 内容 (ファイルごと): レコード数、最初の32列の min/max 値、nullカウント。ネストされたフィールドも列数としてカウントされる。
-
活用:
-
WHERE
句によるクエリ実行時、統計情報に基づいて不要なファイルをスキャン対象から除外 (例:WHERE val > max_value_in_file
)。 -
COUNT(*)
などの集計クエリをファイルスキャンなしで高速に実行。
-
- 注意: 統計収集は最初の32列のみ。高カーディナリティの文字列フィールド(自由記述など)には効果が薄く、収集に時間がかかる可能性あり → スキーマ定義で後ろの列に配置して回避。
-
ログ保持期間:
- デフォルト: 30日間。
-
VACUUM
はデータファイルのみ削除し、ログファイルは削除しない。 - ログファイルはチェックポイント作成時に、保持期間 (
delta.logRetentionDuration
) より古いものが自動的に削除される。 - Time Travel可能な期間はこの保持期間に依存する。
- 変更:
ALTER TABLE ... SET TBLPROPERTIES (delta.logRetentionDuration = 'interval <N> days')
3. Auto Optimize
目的: 手動 OPTIMIZE
の必要性を減らし、書き込み時にファイルサイズを自動調整。
-
構成要素:
-
Optimized Writes:
- 書き込み処理中に、パーティションごとに適切なファイルサイズ (目標: 128MB) で出力しようと試みる。
-
Auto Compaction:
- 書き込み完了後に実行されるオプション。
- 小さなファイルが存在する場合、それらをマージしてファイルサイズを調整する (目標: 128MB)。
-
注意: 標準の
OPTIMIZE
(目標1GB) とは目標サイズが異なり、Z-Orderingは実行しない。
-
Optimized Writes:
-
有効化:
-
テーブルプロパティ (推奨):
-- テーブル作成時 CREATE TABLE ... TBLPROPERTIES ( 'delta.autoOptimize.optimizeWrite' = 'true', 'delta.autoOptimize.autoCompact' = 'true' ) -- 既存テーブル ALTER TABLE ... SET TBLPROPERTIES ( 'delta.autoOptimize.optimizeWrite' = 'true', 'delta.autoOptimize.autoCompact' = 'true' )
-
Sparkセッション設定 (テーブルプロパティより優先):
SET spark.databricks.delta.optimizeWrite.enabled = true; SET spark.databricks.delta.autoCompact.enabled = true;
-
テーブルプロパティ (推奨):
-
自動チューニング:
MERGE
が多い場合など、ワークロードに応じてDatabricksが自動で目標ファイルサイズを128MBより小さく調整することがある。
Jobsオーケストレーション、トラブルシューティング、API、CLI
1. Databricks Jobs オーケストレーション
目的: 複数のタスク(ノートブック、スクリプト等)を連携させてデータパイプラインを構築・実行・管理する。
-
マルチタスクジョブ作成 (UI):
- 場所: サイドバー [Workflows] > [Jobs] > [Create Job]
-
タスク設定項目:
- Name: タスク名 (例: "Bronze", "Silver-Orders")
- Type: Notebook, Python Script, SQL, DLT Pipeline など
- Path: 実行するノートブック等のパス
-
Cluster:
- Job Cluster (推奨): [Create new job cluster] を選択。タスク/ジョブ専用で実行完了後に自動停止するためコスト効率が良い。
- All-Purpose Cluster: 既存の汎用クラスターを選択。
-
Parameters: ノートブックウィジェット (
dbutils.widgets.text(...)
など) に値を渡す。Keyにウィジェット名、Valueに値を設定。 - Depends on: 実行順序を定義。先行タスクを選択。複数選択可能。
- Retries: タスク失敗時の自動リトライポリシー (例: 回数指定、Unlimited)。
-
ジョブ全体設定 (右パネル):
-
Maximum concurrent runs: 同一ジョブを同時に実行できる最大数。Unlimited Retryと組み合わせる際は、意図しない大量並列実行を防ぐため
1
に設定推奨。
-
Maximum concurrent runs: 同一ジョブを同時に実行できる最大数。Unlimited Retryと組み合わせる際は、意図しない大量並列実行を防ぐため
-
本番用ノートブックの準備:
-
コードクリーンアップ: 開発/デバッグ用のコマンド (
display()
, 不要なクエリ、DROP TABLE
等) は削除またはコメントアウトする。 -
Auto Optimize 有効化: パイプラインの各テーブルで小さなファイルが多数生成されるのを防ぐため、ノートブックの最初などで有効化しておくことが推奨される。
spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", "true") spark.conf.set("spark.databricks.delta.autoCompact.enabled", "true")
-
コードクリーンアップ: 開発/デバッグ用のコマンド (
-
スケジューリング:
- 設定: ジョブ詳細画面の右パネル [Schedule] > [Add schedule]
-
UI:
Scheduled
を選択し、実行頻度、日時などをUIで設定。 -
Cron構文: UIで表現できない複雑なスケジュールは
Cron Syntax
を選択して直接記述 (例:0 0 1,13 * * MON-FRI
- 平日の1AMと1PM)。 -
操作: スケジュールの
Pause
(一時停止)、Resume
(再開)、Delete
(削除) が可能。
-
権限管理 (
Permissions
):-
Owner (所有者):
- ジョブに対する全権限 (表示、実行、管理、削除、権限変更)。
- ジョブ実行時の認証情報として使用 (
Run as
フィールドに表示)。 - 必須: 個人ユーザーであること (グループは不可)。
-
変更: [Edit Permissions] から変更可能。所有者を変更すると
Run as
も更新される。
- Creator (作成者): ジョブを最初に作成したユーザー。所有者が変わっても作成者は不変。
-
権限レベル:
-
Can View
: 閲覧のみ。 -
Can Manage Run
: 実行の開始/キャンセル、実行結果の閲覧。 -
Can Manage
: 編集、削除、権限変更を含む全権限。
-
- 設定: [Edit Permissions] でユーザーまたはグループに追加。
-
Owner (所有者):
2. Databricks Jobs トラブルシューティング
-
失敗時の挙動:
-
ルートタスク失敗: それに依存する全てのタスクが
Skipped
になる。 -
中間タスク失敗: そのタスクに依存する後続タスクのみ
Skipped
。並列タスクは影響を受けず実行される(成功または失敗)。 - コミット: タスクが失敗しても、失敗ポイントより前に実行された処理(Delta Lakeへの書き込みコミット等)は完了している。
-
ルートタスク失敗: それに依存する全てのタスクが
- エラー確認: 失敗したジョブ実行を開き、赤く表示された失敗タスクをクリックすると、エラーメッセージやスタックトレースが表示される。
-
リペア実行 (
Repair Run
):- 目的: 失敗したジョブ実行を効率的に修正・完了させる。
- 場所: 失敗したジョブ実行の詳細ページ右上のボタン。
- 動作: 失敗したタスクと、それに依存していたためにスキップされたタスクのみを再実行する。
- 手順: コード等の問題を修正 → 失敗したRunを開く → [Repair Run] をクリック。
3. Databricks REST API
目的: プログラム(スクリプト、アプリケーション)からDatabricksリソースを操作。
-
認証:
- パーソナルアクセストークン (PAT): [User Settings] > [Access tokens] で生成。生成時に表示されるトークン文字列をコピー&保存。
-
ヘッダー:
Authorization: Bearer <YOUR_PAT>
-
基本:
-
エンドポイント:
https://<databricks-instance>/api/<version>/<resource>
-
ツール: Postman,
curl
, Pythonrequests
など。
-
エンドポイント:
-
主な操作例 (Jobs API v2.1):
-
ジョブ作成:
POST /api/2.1/jobs/create
- Body: ジョブ定義のJSON (UIの
View JSON
で取得可能)。 - Response:
job_id
- Body: ジョブ定義のJSON (UIの
-
ジョブ実行:
POST /api/2.1/jobs/run-now
- Body:
{ "job_id": <job_id> }
- Response:
run_id
(ジョブ実行ID),number_in_job
- Body:
-
実行状態取得:
GET /api/2.1/jobs/runs/get?run_id=<run_id>
- Response: 実行ステータス (
life_cycle_state
,result_state
)、各タスクのIDなど。
- Response: 実行ステータス (
-
ジョブ作成:
4. Databricks CLI
目的: コマンドラインインターフェースからDatabricksリソースを操作。
-
セットアップ:
-
インストール:
pip install databricks-cli
(要Python) -
設定:
databricks configure --token
- Databricks Host (例:
https://adb-....azuredatabricks.net/
) と PAT を入力。 - 設定は
~/.databrickscfg
に保存 (トークンは平文)。
- Databricks Host (例:
-
インストール:
-
基本:
-
databricks -h
: ヘルプ -
databricks <resource> -h
: リソースごとのヘルプ (例:databricks fs -h
)
-
-
主な操作例:
-
クラスター (
databricks clusters ...
):-
list
: 一覧表示 -
start --cluster-id <id>
: 起動 -
get --cluster-id <id>
: 状態取得
-
-
DBFS (
databricks fs ...
):-
ls [dbfs_path]
: 一覧表示 -
cp <local_path> <dbfs_path>
: ローカルからアップロード -
cp <dbfs_path> <local_path>
: DBFSからダウンロード -
rm <dbfs_path>
: 削除
-
-
Secrets (
databricks secrets ...
):-
スコープ管理:
scopes create --scope <scope_name>
scopes list
-
シークレット管理:
-
put --scope <scope_name> --key <key_name>
(値はプロンプト入力) list --scope <scope_name>
delete --scope <scope_name> --key <key_name>
-
-
ノートブックでの利用:
value = dbutils.secrets.get(scope="<scope_name>", key="<key_name>")
-
print(value)
しても値は[REDACTED]
と表示される。
-
スコープ管理:
-
クラスター (
削除要求処理と動的ビュー
1. 削除要求 (Right to be Forgotten) の増分処理と伝播
目的: GDPR等に基づくユーザーからの個人情報 (PII) 削除要求に対応し、関連する全てのテーブルから該当データを削除・伝播させる。
-
処理フロー例:
-
削除要求の追跡:
-
ソース: CDCフィード (例: Bronzeテーブルの
customers
トピック、_change_type = 'delete'
) などから削除対象ユーザーIDを取得。 -
追跡テーブル作成: 削除要求を管理するテーブル (例:
delete_requests
) を作成。customer_id
,request_time
,deadline
,status
(例: 'requested', 'processing', 'deleted') などの列を持つ。 -
取り込み: ストリーミングクエリでソースから読み取り、追跡テーブルに
INSERT
する。-- 例: delete_requests テーブルへのストリーミング書き込み CREATE OR REPLACE TABLE delete_requests AS SELECT customer_id, timestamp AS request_time, timestamp + INTERVAL 30 DAYS AS deadline, -- 例: 30日の期限 'requested' AS status FROM bronze_table_cdc WHERE topic = 'customers' AND _change_type = 'delete'; -- 例
-
ソース: CDCフィード (例: Bronzeテーブルの
-
PIIテーブルからの削除:
-
delete_requests
テーブルを基に、PIIを含む主要テーブル (例:customers
) から該当レコードをDELETE
。DELETE FROM customers WHERE customer_id IN ( SELECT customer_id FROM delete_requests WHERE status = 'requested' );
-
-
下流テーブルへの削除伝播 (CDF利用):
-
前提: PIIテーブル (
customers
) で Change Data Feed (CDF) が有効であること (TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true')
)。 -
CDF読み取り:
customers
テーブルのCDFをストリーミングで読み取る。cdf_stream = (spark.readStream.format("delta") .option("readChangeData", "true") .option("startingVersion", <start_version>) # または startingTimestamp .table("customers"))
-
foreachBatch
で処理: CDFストリームの各マイクロバッチを処理する関数を定義。def propagate_deletes(microBatchDF, batchId): # 削除イベントのみフィルタリング deletes_df = microBatchDF.filter(col("_change_type") == "delete") .select("customer_id", "_commit_timestamp") # 必要な列を選択 if deletes_df.count() > 0: deletes_df.createOrReplaceTempView("deletes_microbatch") spark = deletes_df.sparkSession # 1. 下流テーブルから削除 (例: customers_orders) spark.sql(""" DELETE FROM customers_orders WHERE customer_id IN (SELECT customer_id FROM deletes_microbatch) """) # 2. delete_requests テーブルのステータスを更新 spark.sql(""" MERGE INTO delete_requests t USING deletes_microbatch s ON t.customer_id = s.customer_id WHEN MATCHED AND t.status = 'requested' THEN -- または適切なステータス条件 UPDATE SET t.status = 'deleted', t.deleted_timestamp = s._commit_timestamp """) # ストリーミングクエリで foreachBatch を実行 (cdf_stream .writeStream .foreachBatch(propagate_deletes) .option("checkpointLocation", "...") .trigger(availableNow=True) # または processingTime .start() )
-
前提: PIIテーブル (
-
削除要求の追跡:
-
完全な削除のための注意点:
-
DELETE
操作の性質: Delta LakeのDELETE
は論理削除。データは即座に物理削除されず、古いテーブルバージョンやCDFフィードには削除されたはずのデータが残っている。 -
VACUUM
の必要性: 削除されたデータへのアクセスを完全に不可能にし、物理ストレージからファイルを削除するには、関連する全てのテーブル (customers
,customers_orders
など) に対して定期的にVACUUM
コマンドを実行する必要がある。VACUUM customers RETAIN 0 HOURS; -- 即時削除の場合 (保持期間に注意) VACUUM customers_orders RETAIN 0 HOURS;
-
2. 動的ビュー (Dynamic Views) によるアクセス制御
目的: ユーザーのIDやグループメンバーシップに基づき、ビューが表示するデータを列レベルまたは行レベルで動的に制御する。
-
主要関数:
-
is_member('group_name')
: 現在のユーザーが指定グループのメンバーか判定 (Boolean)。 -
current_user()
: 現在のユーザー名を返す (String)。
-
-
列レベルアクセス制御 (マスキング):
-
CASE WHEN
とis_member()
を使用して機密性の高い列の値を出し分ける。CREATE OR REPLACE VIEW customers_redacted_view AS SELECT customer_id, CASE WHEN is_member('admins_demo') THEN email -- 特権グループは平文 ELSE 'REDACTED' -- それ以外はマスク END AS email, CASE WHEN is_member('admins_demo') THEN first_name ELSE 'REDACTED' END AS first_name, -- ... 他の列 ... country_name FROM customers;
-
-
行レベルアクセス制御:
-
WHERE
句にis_member()
やcurrent_user()
を組み込んだ条件を追加する。CREATE OR REPLACE VIEW customers_row_filtered_view AS SELECT * FROM customers_redacted_view -- 上記の列マスク済みビューをベースにする例 WHERE is_member('admins_demo') -- 特権グループは全行アクセス可 OR (country_name = 'France' AND update_timestamp > '2023-01-01') -- それ以外は特定の条件に合う行のみ -- OR email = current_user() -- 自分のデータのみアクセス可にする場合など ;
-
-
設定:
-
グループ: 事前に Databricks Admin Console や API/CLI でアクセス制御用のグループ (例:
admins_demo
) を作成し、適切なユーザーを所属させる。 -
ビュー作成: 上記のような
CREATE VIEW
文を実行する。
-
グループ: 事前に Databricks Admin Console や API/CLI でアクセス制御用のグループ (例:
- ビューのレイヤリング: 複数のビューを重ねて定義し、段階的にアクセス制御を適用することも可能。
コードモジュール化、ライブラリインストール、テスト戦略
1. コードのモジュール化とインポート (Databricks Repos)
目的: ノートブック間のコード共有と再利用性を高め、標準的なPython開発プラクティスに近づける。
-
%run <notebook_path>
(従来の方法):- 指定したノートブックを実行し、その中の変数、関数、クラスを現在のノートブックスコープに取り込む。
-
Pythonファイル (
.py
) の利用 (推奨):-
利点: 標準の
import
構文が使える、テストしやすい、IDE連携しやすい。 -
作成: Repos UIで [Create] > [File] を選択し、
.py
拡張子でファイルを作成 (Admin Consoleでの設定有効化が必要な場合あり)。 -
注意: ノートブックとして認識される
# Databricks notebook source
コメントをファイルに含めないこと。 -
インポート:
-
同じリポジトリ内のファイル:
# helpers/math_utils.py 内の関数をインポート from helpers.math_utils import add_numbers result = add_numbers(5, 3)
-
別のディレクトリのファイル: Python検索パス (
sys.path
) にモジュールのあるディレクトリを追加する必要がある。import sys import os # 例: ノートブックの親ディレクトリにある 'common_modules' をパスに追加 module_dir = os.path.abspath(os.path.join('..', 'common_modules')) if module_dir not in sys.path: sys.path.append(module_dir) from data_validation import validate_schema
-
同じリポジトリ内のファイル:
-
パスの確認・操作:
- カレントワーキングディレクトリ (CWD):
%sh pwd
(通常は実行中ノートブックのディレクトリ) - Python検索パス:
import sys; print(sys.path)
- パス追加:
sys.path.append('/path/to/add')
- カレントワーキングディレクトリ (CWD):
-
利点: 標準の
2. ノートブックスコープでのライブラリインストール
-
%pip install
マジックコマンド:- 目的: 現在のノートブックセッションにのみ有効なPythonライブラリ (PyPIやWheelファイル) をインストールする。
-
構文:
%pip install <package_name> %pip install <package_name>==<version> %pip install /path/to/your_package-0.1.0-py3-none-any.whl %pip install -r requirements.txt
-
重要事項:
-
%pip
コマンドはノートブックの一番最初に記述する。 - 実行後、Pythonインタープリターが再起動され、それ以前のセルで定義した変数は失われる。
-
-
%sh pip
との違い:%pip
はクラスターの全ノードにインストールするが、%sh pip
はドライバーノードのみ。ライブラリインストールには%pip
を使用する。
3. データパイプラインのテスト
目的: データパイプラインの信頼性と品質を確保する。
-
テストの主要カテゴリ:
-
データ品質テスト (Data Quality Tests):
- 対象: データそのもの。
- 目的: データが期待される形式、範囲、一貫性などを満たしているか検証する (例: Nullでないか、特定の値の範囲内か、ユニークであるべきか)。
-
方法: Delta Lake CHECK制約、
assert
文、専用ライブラリ (例: Great Expectations)、カスタム検証ロジック。
-
標準テスト (Standard Tests):
- 対象: パイプラインを構成するコード (関数、クラス、モジュール、パイプライン全体)。
- 目的: コードロジックが期待通りに動作するか検証する。コード変更時のデグレ防止。
-
データ品質テスト (Data Quality Tests):
-
標準テストの種類:
-
ユニットテスト (Unit Testing):
- 焦点: 個別の関数やメソッドなどの最小単位のコード。
- 目的: 特定の入力に対する出力が期待通りか、独立して検証する。
-
方法:
assert
文やpytest
,unittest
などのフレームワークを使用。# 例: helper関数のテスト from helpers.math_utils import add_numbers def test_add_numbers(): assert add_numbers(2, 3) == 5 assert add_numbers(-1, 1) == 0
-
インテグレーションテスト (Integration Testing):
- 焦点: 複数のユニット(関数、クラス、モジュール)が連携して動作する部分。
- 目的: コンポーネント間のインタラクションやデータの受け渡しが正しく行われるか検証する。
-
エンドツーエンドテスト (End-to-End Testing / E2E Testing):
- 焦点: パイプライン全体(データソースから最終的な出力/シンクまで)。
- 目的: 実際の運用に近い状況で、パイプライン全体が期待通り機能するか検証する。ビジネスロジック全体の検証。
-
ユニットテスト (Unit Testing):
クラスター権限、ログ、モニタリング
1. クラスター権限管理
目的: ユーザーやグループがクラスターをどのように利用・操作できるかを制御する。
-
権限の種類:
-
ワークスペースレベル (クラスター作成権限):
- 場所: Admin Console > [Users] タブ > 対象ユーザー選択
-
設定:
Allow unrestricted cluster creation
チェックボックス - 効果: チェックを入れると、ユーザーは自由に(ポリシー制限内で)クラスターを作成できる。
-
クラスターレベル (個別クラスターへのアクセス):
- 場所: [Compute] > 対象クラスター > [...]メニュー > [Edit Permissions]
-
レベル:
-
Can Attach To
:- ノートブックをクラスターにアタッチして実行できる。
- Spark UI、ログ、メトリクスを閲覧できる。
- クラスターの状態変更(開始/停止/再起動/編集)は不可。
-
Can Restart
:-
Can Attach To
の全権限。 - クラスターの 開始、停止、再起動 が可能。
- クラスター設定の編集は不可。
-
-
Can Manage
:-
Can Restart
の全権限。 - クラスター設定の 編集 が可能。
- クラスターの 権限設定の編集 が可能 (他のユーザー/グループへの権限付与/剥奪)。
- クラスターの削除が可能。
-
-
- 割り当て: 各レベルをユーザーまたはグループに割り当て可能。
-
ワークスペースレベル (クラスター作成権限):
2. クラスターログの確認
目的: クラスターの動作状況の追跡、トラブルシューティング、コードのデバッグを行う。
- 場所: クラスター詳細ページ内の各タブ
-
ログの種類:
-
クラスターイベントログ (Cluster Event Log):
- タブ: [Event Log]
- 内容: クラスターのライフサイクルイベント履歴 (作成、開始、停止、終了、編集、リサイズ、エラー等)。手動操作と自動イベントの両方を記録。
-
用途: クラスターがいつ、誰によって、どのように操作されたか追跡。オートスケールの挙動確認 (
RESIZING
イベントでフィルタ)。 - 機能: イベントタイプによるフィルタリング。イベント行クリック → [JSON] タブで詳細情報(例: 設定変更前後の値)を表示。
-
ドライバログ (Driver Logs):
- タブ: [Driver Logs]
- 内容: クラスターのドライバーノードで実行されたコード(ノートブック、ジョブ、ライブラリ)からの標準出力、標準エラー、Log4jログ。
-
ストリーム:
-
stdout
:print()
文などの標準出力。 -
stderr
: エラーメッセージなどの標準エラー出力。 -
Log4j output
: Log4jライブラリを使用したログ出力。
-
- 用途: コードのデバッグ、アプリケーション固有のログ確認。
- 機能: 各ログファイルのダウンロードが可能。
-
クラスターイベントログ (Cluster Event Log):
3. クラスターパフォーマンスモニタリング (Ganglia UI)
目的: クラスターのリソース使用状況をリアルタイムで監視し、パフォーマンスのボトルネックを特定する。
-
アクセス:
- 場所: クラスター詳細ページ > [Metrics] タブ > [Ganglia UI] リンク
- 条件: クラスターが実行中 であること。
-
Ganglia UIの主要メトリクス:
-
クラスター全体概要 (UI上部4グラフ):
- Cluster Load: 負荷状況(プロセス数/ノード数)。
- Memory Usage: 全ノードの合計メモリ使用量(キャッシュ/バッファ/使用中/空き)。
- CPU Usage: 全ノードの合計CPU使用率。
- Network Usage: 全ノードの合計ネットワーク送受信量。
-
ノード別詳細 (UI下部 & 左メニュー):
- ドライバーノードと各ワーカーノード個別のメトリクスを確認可能。
- ドライバーノード特定: Spark UI (クラスター詳細ページの [Spark UI] タブ) でドライバーのIPアドレスを確認し、Ganglia UIのノードリストと照合。
-
レポート選択: 左メニューから
Memory Report
,CPU Report
,Network Report
,Disk Report
などを選択し、ノードごとの詳細グラフを表示。
-
クラスター全体概要 (UI上部4グラフ):
-
分析とアクション:
-
CPU高負荷:
- ドライバーのみ高負荷 → ドライバーに処理が集中しすぎている可能性。コードの見直しや、より大きなドライバーノードを検討。
- 全ワーカー高負荷 → 全体的な処理能力不足。ワーカー数を増やすか、オートスケールを有効化/調整。
- 特定ワーカーのみ高負荷 → データスキューの可能性。パーティショニングやデータ分散処理の見直し。
-
メモリ逼迫:
- 特定ノードでメモリ使用量が高い → メモリリークや不適切なキャッシュ利用の可能性。コードの見直しや、よりメモリの大きいインスタンスタイプを検討。
- ディスクI/O高負荷 (Disk Report) → メモリ不足によるディスクスピル(データの一時退避)が発生している可能性。メモリ割り当ての増加を検討。
-
CPU高負荷:
-
履歴メトリクス (スナップショット):
- 制限: Ganglia UIのライブメトリクスはクラスターが停止/再起動すると失われる。
- 代替: DatabricksがGanglia UIのスクリーンショットを定期的に保存。
- 場所: クラスター詳細ページ > [Metrics] タブ > "History" セクション
- 内容: 特定時点でのクラスター全体の概要グラフの画像のみ。ノード別の詳細レポートやインタラクティブな分析は不可。過去のある時点での全体的な状況を確認するのに役立つ。
Discussion