Databricks Certified Data Engineer Professionalチートシート

に公開

Databricksデータエンジニア Professional チートシート

このチートシートは、Databricksプラットフォームにおけるデータエンジニアリングの主要な概念、コマンド、ベストプラクティスをまとめたものです。Bronzeレイヤーへのデータ取り込み、ストリーミング処理、Delta Lakeの高度な機能、Slowly Changing Dimensions、CDC処理、CDF、Join戦略、ジョブオーケストレーション、テスト、権限管理、モニタリングなどをカバーしています。

Bronzeレイヤー、Auto Loader、ストリーミング、Delta Lake基本操作、SCD

1. Bronzeレイヤーへのデータ取り込みパターン

目的: ソースデータをBronzeテーブルにどのようにマッピングするか決定する。

  • Singleplex (1対1):
    • 各データソース/トピックを個別のBronzeテーブルに取り込む。
    • 利点: シンプル、従来のバッチ処理に適している。
    • 欠点: ストリーミングソースが多い場合、同時実行ジョブ数の制限に達する可能性がある。
  • Multiplex (多対1):
    • 複数のデータソース/トピックを単一のBronzeテーブルに取り込む。
    • 利点: ストリーミングソースが多い場合に効率的。ジョブ数を削減。
    • 実装:
      • ソース: Kafka、またはAuto Loaderを使用したクラウドストレージ上のファイル (JSONなど)。
      • テーブル構造: topic列 (ソース/トピック識別子)、value列 (実際のデータ、JSON形式など) を含むことが多い。
      • 後続処理: Silverレイヤーへの移行時に topic 列でフィルタリングする。

2. Auto Loader

目的: クラウドストレージから増分データを効率的に取り込む。

  • 設定 (PySpark):
    (spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "json") # or "csv", "parquet", etc.
      .option("cloudFiles.schemaLocation", "/path/to/schema/checkpoint") # スキーマ追跡・進化のため
      .schema(your_schema) # スキーマ指定 (推論も可能)
      .load("/path/to/source/directory/")
    )
    
  • スキーマ進化:
    • cloudFiles.schemaLocation を指定すると、スキーマの追跡と進化が有効になる。
    • 新しい列が検出された場合、自動的にテーブルスキーマが更新される (書き込み時に .option("mergeSchema", "true") が必要)。
  • 実行モード:
    • .trigger(availableNow=True): 利用可能なすべての新規ファイルを処理し、完了後にストリームを停止するバッチモード。
    • .trigger(processingTime='1 minute'): 継続的なストリーミング処理 (指定間隔ごと)。

3. ストリーミング処理

  • 基本:
    • 読み込み: spark.readStream...
    • 書き込み: df.writeStream...
  • JSONパース (Kafkaデータ例):
    from pyspark.sql.functions import col, from_json
    from pyspark.sql.types import StructType, StructField, StringType, IntegerType # Define your schema
    
    kafka_schema = StructType([...]) # Kafkaのトップレベルスキーマ (topic, key, value, timestampなど)
    payload_schema = StructType([...]) # value列内のJSONスキーマ
    
    df = (spark.readStream.format("delta").table("bronze_multiplex")
          .filter(col("topic") == "orders") # Multiplexテーブルから特定トピックを抽出
          .select(from_json(col("value").cast("string"), payload_schema).alias("payload")) # valueをパース
          .select("payload.*") # パースしたフィールドを展開
         )
    
    -- SQLでのJSONパース
    SELECT from_json(CAST(value AS STRING), 'schema_string').*
    FROM bronze_multiplex
    WHERE topic = 'orders';
    
  • ストリーミング一時ビュー:
    • SQLでストリーミングデータを扱うために、DataFrameからストリーミング一時ビューを作成できる。
    df.createOrReplaceTempView("streaming_temp_view")
    
    -- streaming_temp_view を通常のテーブルのようにクエリ可能 (ストリーミングとして扱われる)
    SELECT * FROM streaming_temp_view WHERE amount > 100;
    
  • foreachBatch:
    • 各マイクロバッチに対してカスタムの書き込みロジックを実行できる。
    • MERGE操作 (Upsertや重複排除) など、標準の writeStream では直接サポートされない複雑な書き込み処理に有用。
    def upsert_microbatch(microBatchDF, batchId):
        # MERGEロジックなどをここに記述
        microBatchDF.createOrReplaceTempView("microbatch_data")
        # Sparkセッション取得 (Runtime 11.3+): microBatchDF.sparkSession
        spark = microBatchDF.sparkSession
        spark.sql("""
            MERGE INTO target_table t
            USING microbatch_data s ON t.key = s.key
            WHEN MATCHED THEN UPDATE SET ...
            WHEN NOT MATCHED THEN INSERT ...
        """)
    
    (streamingDF
      .writeStream
      .foreachBatch(upsert_microbatch)
      .option("checkpointLocation", "/path/to/checkpoint")
      .trigger(availableNow=True)
      .start()
    )
    
    • 注意: foreachBatch 内でSparkセッションを取得する方法はDatabricks Runtimeのバージョンによって異なる場合がある。

4. Delta Lake テーブル操作

  • CHECK制約:
    • データ品質を強制するルール。
    • 追加: ALTER TABLE table_name ADD CONSTRAINT constraint_name CHECK (condition);
    • 削除: ALTER TABLE table_name DROP CONSTRAINT constraint_name;
    • 動作:
      • 制約に違反するデータを含む書き込み操作は失敗する (トランザクション全体がロールバック)。
      • 制約追加時、テーブル内の既存データも検証される。違反があれば追加失敗。
    • ストリーミングでの考慮事項: 制約違反でジョブを止めずに不正データを除外したい場合は、書き込み前に WHERE 句でフィルタリングするか、別の「隔離」テーブルに書き込むなどの対策が必要。
  • ストリーミングでの重複排除:
    • dropDuplicates(): 静的DataFrameと同様に使用可能。
    • withWatermark(): 状態管理のオーバーヘッドを抑えるために dropDuplicates() と併用する。指定した時間内の遅延レコードのみを考慮して重複チェックを行う。
      deduplicated_df = (streamingDF
                         .withWatermark("event_timestamp", "30 seconds") # 30秒遅延まで許容
                         .dropDuplicates(["unique_id"])
                        )
      
    • Insert-only Merge (foreachBatchを使用): マイクロバッチ内の重複だけでなく、ターゲットテーブルに既に存在するレコードとの重複も排除する確実な方法。
      -- foreachBatch 内で実行するMERGE文の例
      MERGE INTO silver_table t
      USING microbatch_updates s ON t.unique_id = s.unique_id -- 結合キーで一致確認
      WHEN NOT MATCHED THEN INSERT * -- 一致しない場合のみ挿入
      

5. Slowly Changing Dimensions (SCD)

目的: 時間とともに変化するディメンションデータをどのように管理するか。

  • Type 1 (上書き):
    • 最新の値で既存のレコードを上書きする。履歴は保持されない。
    • 実装: 通常の UPDATEMERGEWHEN MATCHED THEN UPDATE 句を使用。
  • Type 2 (履歴保持):
    • 変更が発生すると新しいレコードを追加し、古いレコードを非アクティブとしてマークする。完全な履歴を保持。
    • 必要な追加列:
      • is_current (BOOLEAN): レコードが現在有効かを示すフラグ。
      • effective_date (TIMESTAMP/DATE): レコードが有効になった日時。
      • end_date (TIMESTAMP/DATE): レコードが無効になった日時 (現在のレコードはNULL)。
    • 実装 (foreachBatchMERGE):
      1. 古いレコードを閉じる (マークする): WHEN MATCHED AND target.is_current = true AND source.data <> target.data (データに変更があった場合) に、target.is_current = false, target.end_date = source.effective_date (または現在のタイムスタンプ) を設定。
      2. 新しい/変更されたレコードを挿入: WHEN NOT MATCHED (または別のロジックで更新レコードを挿入対象として準備) で、新しいレコードを is_current = true, end_date = NULL として挿入。
      • テキストの例では、更新レコードを意図的に NOT MATCHED で挿入するために、まず更新対象を MATCHED で処理し、その後、更新レコード自体を NOT MATCHED BY SOURCE のようなロジック(または一時ビューへの再挿入など)で挿入対象としている可能性がある点に注意。具体的なMERGE文は要確認。
    • 現在レコードのみのビュー/テーブル: SCD Type 2 テーブルから WHERE is_current = true でフィルタリングすることで、常に最新のレコードセットを取得できる。これはバッチ処理で上書き作成 (CREATE OR REPLACE TABLE) されることが多い。
  • Delta Time Travel vs SCD Type 2:
    • Time Travel: 短期間のバージョン管理、偶発的な変更からの回復向け。VACUUM で履歴が削除される可能性。長期的な履歴分析には不向き。
    • SCD Type 2: 明示的なビジネス要件に基づく長期的な履歴管理手法。

CDC処理、Delta CDF、Join戦略、ビューとGoldテーブル

1. Change Data Capture (CDC) フィードの処理

CDCとは: データソースでの行レベルの変更 (Insert, Update, Delete) を識別・キャプチャし、ターゲットシステムに伝播するプロセス。

Delta Lakeでの処理:

  • MERGE INTO: CDCフィードをDeltaテーブルに適用する主要なコマンド。
    MERGE INTO target_table t
    USING cdc_source s ON t.id = s.id
    WHEN MATCHED AND s.operation = 'update' THEN UPDATE SET ...
    WHEN MATCHED AND s.operation = 'delete' THEN DELETE
    WHEN NOT MATCHED AND s.operation = 'insert' THEN INSERT ...
    
  • 課題: 同一キーへの複数変更: CDCフィード内に同一キーに対する複数の変更レコードがあると、MERGE がエラーになる可能性。
  • 解決策: 最新の変更のみ適用:
    • Window関数 (rank()) を使用:
      SELECT * EXCEPT(rank_num) FROM (
        SELECT *, rank() OVER (PARTITION BY id ORDER BY update_timestamp DESC) as rank_num
        FROM cdc_source_with_duplicates
      ) WHERE rank_num = 1
      
    • ストリーミングでの適用 (foreachBatch): Window関数はストリーミングDataFrameで直接サポートされないため、foreachBatch 内で上記ロジックを適用してから MERGE を実行する。
      def process_cdc_batch(microBatchDF, batchId):
          # Window関数で最新の変更のみを選択
          ranked_updates = microBatchDF.sql("""
              SELECT * EXCEPT(rank_num) FROM (
                SELECT *, rank() OVER (PARTITION BY id ORDER BY row_time DESC) as rank_num
                FROM microbatch_view -- microBatchDFから作成した一時ビュー
              ) WHERE rank_num = 1 AND row_status IN ('insert', 'update') -- Deleteは別途処理する場合
          """)
          ranked_updates.createOrReplaceTempView("ranked_updates_view")
      
          # MERGE実行
          spark = microBatchDF.sparkSession
          spark.sql(f"""
              MERGE INTO target_customers c
              USING ranked_updates_view u ON c.customer_id = u.customer_id
              WHEN MATCHED THEN UPDATE SET c.email = u.email, ...
              WHEN NOT MATCHED THEN INSERT (customer_id, email, ...) VALUES (u.customer_id, u.email, ...)
          """)
      
      (cdc_stream_df
        .writeStream
        .foreachBatch(process_cdc_batch)
        .option("checkpointLocation", "...")
        .trigger(availableNow=True)
        .start()
      )
      
  • ルックアップ結合: foreachBatch 内で、静的なルックアップテーブルと結合可能。小さなテーブルには broadcast() ヒントが有効。
    enriched_df = microBatchDF.join(broadcast(country_lookup_df), "country_code")
    
  • アサーション (assert): 開発中にデータやロジックの正当性を検証するために使用。
    assert spark.table("target_table").count() == expected_count, "Record count mismatch!"
    

2. Delta Lake Change Data Feed (CDF)

CDFとは: Deltaテーブルへの行レベルの変更 (Insert, Update, Delete) を自動的に記録するDelta Lakeの機能。

  • 有効化:
    • CREATE TABLE ... TBLPROPERTIES (delta.enableChangeDataFeed = true)
    • ALTER TABLE ... SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
    • SET spark.databricks.delta.properties.defaults.enableChangeDataFeed = true
  • CDFデータの読み取り:
    • SQL: SELECT * FROM table_changes('table_name', start_version | start_timestamp, [end_version | end_timestamp])
    • PySpark (Stream): spark.readStream.format("delta").option("readChangeData", "true").option("startingVersion", version).table("table_name")
    • PySpark (Batch): spark.read.format("delta").option("readChangeData", "true").option("startingVersion", version).table("table_name")
  • CDFレコードの構造:
    • 元のテーブル列 + _change_type, _commit_version, _commit_timestamp
    • _change_type: insert, delete, update_preimage, update_postimage
  • CDFデータの下流への伝播:
    • CDFストリームをソースとして読み取り、下流のテーブルに MERGE する (上記CDCフィード処理と同様のロジックを foreachBatch で適用)。
    • insertupdate_postimage を処理対象とし、_commit_timestamp でランク付けすることが多い。
    cdf_stream = (spark.readStream.format("delta")
                  .option("readChangeData", "true")
                  .option("startingVersion", cdf_start_version)
                  .table("source_table_with_cdf"))
    
    # foreachBatch内で、cdf_streamから受け取ったデータを処理
    def process_cdf_batch(microBatchDF, batchId):
        updates_to_merge = microBatchDF.filter(col("_change_type").isin("insert", "update_postimage"))
        # rank() で最新の変更を選択し、MERGE...
    
  • Stream-Stream Join (CDF利用例): CDFストリームと別の業務データストリーム (例: Orders) を結合して、新しいSilver/Goldテーブルを作成できる。状態管理のため withWatermark が重要。
  • 注意点:
    • VACUUM でCDFデータも削除される。
    • Update/Deleteを含む増分変更の伝播に有効。Append-onlyなら不要。
    • 大量のレコードが更新されるテーブルには不向き。

3. Stream-Static Join

Stream-Static Joinとは: ストリーミングDataFrameと静的DataFrame (Deltaテーブルなど) の結合。

  • 動作:
    • ストリーミング側のデータ到着が処理をトリガー。
    • 各マイクロバッチで静的テーブルの最新バージョンが読み込まれる。
    • 静的テーブルの更新だけでは処理はトリガーされない。
  • 実装:
    streaming_df = spark.readStream.format("delta").table("streaming_orders")
    static_df = spark.read.format("delta").table("static_current_books")
    
    joined_stream = streaming_df.join(static_df, "book_id", "inner") # 通常のjoin構文
    
    (joined_stream
      .writeStream
      .format("delta")
      .outputMode("append")
      .option("checkpointLocation", "...")
      .trigger(availableNow=True)
      .toTable("output_silver_table")
    )
    
  • 制限事項:
    • 非ステートフル: 結合時に静的側に一致するレコードがない場合、そのストリーミングレコードは結果に含まれず、失われる。後から静的側が更新されても遡って結合されない。
    • 対策: 必要に応じて別途バッチジョブで補完処理を検討。

4. ビューとGoldテーブル (Materialized View)

  • Stored View (ビュー):
    • 定義: CREATE VIEW view_name AS SELECT ... FROM ...;
    • 実体: SQLクエリの保存。データの実体は持たない。
    • 実行: クエリ実行時に基テーブルから計算。
    • キャッシュ: Delta Cachingにより同一クラスターでの再実行は高速な場合がある。
  • Materialized View ( ≈ Gold Table):
    • Databricksでの概念: クエリ結果 (特に集計結果) を物理的に格納したDeltaテーブル (Goldレイヤーに作成)。
    • 目的: 複雑なクエリや頻繁にアクセスされる集計結果のコスト/レイテンシを削減。
    • 作成方法:
      • バッチ: CREATE OR REPLACE TABLE gold_table AS SELECT ... FROM ... GROUP BY ...;
      • ストリーミング集計:
        from pyspark.sql.functions import window
        
        gold_stream = (silver_stream
                       .withWatermark("order_timestamp", "10 minutes") # 状態管理と遅延データ対応
                       .groupBy(window("order_timestamp", "5 minutes"), "author") # 時間窓とキーで集計
                       .agg(count("order_id").alias("orders_count"), avg("quantity").alias("avg_quantity"))
                      )
        
        (gold_stream
          .writeStream
          .format("delta")
          .outputMode("complete") # 集計結果全体を出力 (または update)
          .option("checkpointLocation", "...")
          .trigger(availableNow=True)
          .toTable("gold_author_stats")
        )
        
    • 使い分け:
      • View: シンプルなクエリ、最新データが必要、計算コストが許容範囲。
      • Gold Table: 複雑/高コストな集計、頻繁なアクセス、レイテンシ要件が厳しい場合。

Delta Lake詳細:パーティショニング、トランザクションログ、Auto Optimize

1. Delta Lake テーブルパーティショニング

目的: 大規模テーブルに対するクエリパフォーマンスの最適化。

  • 仕組み:
    • テーブル作成時に PARTITIONED BY (col1, col2, ...) 句でパーティションキー列を指定。
    • 指定された列の値ごとにデータがサブディレクトリに物理的に分割されて保存される (/path/to/table/col1=valA/col2=valB/...)。
    • データ挿入時にDatabricksが自動で適切なパーティションディレクトリに振り分ける。
  • 利点:
    • Partition Pruning: クエリの WHERE 句にパーティションキーが含まれる場合、条件に合致しないパーティションディレクトリ全体のスキャンをスキップし、クエリを高速化。
    • 効率的なデータ操作: OPTIMIZE やデータ削除をパーティション単位で実行可能。
    • データ管理: 時間ベースのパーティション(年、月など)を利用して、古いデータの削除やアーカイブを容易にする。
  • ベストプラクティス (パーティションキー選択):
    • 低カーディナリティ: 列に含まれるユニークな値が少ないこと(数百〜数千程度が目安)。
    • 十分なパーティションサイズ: 各パーティションが最低でも1GB程度のデータ量になるようにする。小さすぎるパーティション(Over-partitioning)は避ける。
    • クエリフィルタ条件: WHERE 句で頻繁に使用される列。
    • 時間ベース: データが継続的に追加される場合(例: ログデータ)、日付/時刻関連の列(年、月、日など)でパーティションすると管理しやすい。
  • 注意点:
    • Over-partitioning の弊害: パーティション数が多すぎると、ファイル数が増加しメタデータ管理が複雑になり、多くのクエリでパフォーマンスが低下する。パーティション境界を越えてファイルを結合できないため、OPTIMIZE の効果も限定的になる。
    • 修正コスト高: 不適切なパーティショニングはテーブル全体の再書き込みが必要。
    • 迷ったらパーティションしない: Delta Lakeのファイル統計やZ-Orderingによるデータスキッピングで十分な場合が多い。
  • パーティション削除とストリーミング:
    • パーティション単位の削除は効率的だが、テーブルをストリーミングソースとして利用できなくなる(追記のみの原則違反)。
    • 対策: ストリーミング読み取り時に .option("ignoreDeletes", "true") を使用する。
    • 物理的なデータファイル削除は VACUUM 実行時に行われる。

2. Delta Lake トランザクションログ (_delta_log)

目的: テーブルへの変更履歴を記録し、ACIDトランザクションとTime Travelを実現。

  • コミットログ (JSON):
    • 各トランザクション (Commit) は個別のJSONファイル (000...0.json, 000...1.json, ...) として _delta_log ディレクトリに記録される。
    • ファイル追加 (add) や削除 (remove) などのアクション情報を含む。
  • チェックポイント (Parquet):
    • 目的: テーブル状態解決の高速化。多数のJSONログを読む手間を省く。
    • 生成: デフォルトで10コミットごとに自動生成 (00...10.checkpoint.parquet など)。
    • 内容: 特定バージョン時点でのテーブル状態全体(有効なファイルリスト、メタデータなど)を集約してParquet形式で保存。
    • 動作: Sparkは最新のチェックポイントを読み込み、それ以降のJSONログのみを処理して最新状態を把握する。
  • ファイル統計:
    • 目的: データスキッピングによるクエリ最適化。
    • 収集: ファイルがテーブルに追加される際に自動で収集され、コミットログ (JSON/Checkpoint) 内の add アクションに記録される。
    • 内容 (ファイルごと): レコード数、最初の32列の min/max 値、nullカウント。ネストされたフィールドも列数としてカウントされる。
    • 活用:
      • WHERE 句によるクエリ実行時、統計情報に基づいて不要なファイルをスキャン対象から除外 (例: WHERE val > max_value_in_file)。
      • COUNT(*) などの集計クエリをファイルスキャンなしで高速に実行。
    • 注意: 統計収集は最初の32列のみ。高カーディナリティの文字列フィールド(自由記述など)には効果が薄く、収集に時間がかかる可能性あり → スキーマ定義で後ろの列に配置して回避。
  • ログ保持期間:
    • デフォルト: 30日間。
    • VACUUM はデータファイルのみ削除し、ログファイルは削除しない。
    • ログファイルはチェックポイント作成時に、保持期間 (delta.logRetentionDuration) より古いものが自動的に削除される。
    • Time Travel可能な期間はこの保持期間に依存する。
    • 変更: ALTER TABLE ... SET TBLPROPERTIES (delta.logRetentionDuration = 'interval <N> days')

3. Auto Optimize

目的: 手動 OPTIMIZE の必要性を減らし、書き込み時にファイルサイズを自動調整。

  • 構成要素:
    • Optimized Writes:
      • 書き込み処理中に、パーティションごとに適切なファイルサイズ (目標: 128MB) で出力しようと試みる。
    • Auto Compaction:
      • 書き込み完了後に実行されるオプション。
      • 小さなファイルが存在する場合、それらをマージしてファイルサイズを調整する (目標: 128MB)。
      • 注意: 標準の OPTIMIZE (目標1GB) とは目標サイズが異なり、Z-Orderingは実行しない
  • 有効化:
    • テーブルプロパティ (推奨):
      -- テーブル作成時
      CREATE TABLE ... TBLPROPERTIES (
          'delta.autoOptimize.optimizeWrite' = 'true',
          'delta.autoOptimize.autoCompact' = 'true'
      )
      -- 既存テーブル
      ALTER TABLE ... SET TBLPROPERTIES (
          'delta.autoOptimize.optimizeWrite' = 'true',
          'delta.autoOptimize.autoCompact' = 'true'
      )
      
    • Sparkセッション設定 (テーブルプロパティより優先):
      SET spark.databricks.delta.optimizeWrite.enabled = true;
      SET spark.databricks.delta.autoCompact.enabled = true;
      
  • 自動チューニング: MERGE が多い場合など、ワークロードに応じてDatabricksが自動で目標ファイルサイズを128MBより小さく調整することがある。

Jobsオーケストレーション、トラブルシューティング、API、CLI

1. Databricks Jobs オーケストレーション

目的: 複数のタスク(ノートブック、スクリプト等)を連携させてデータパイプラインを構築・実行・管理する。

  • マルチタスクジョブ作成 (UI):
    • 場所: サイドバー [Workflows] > [Jobs] > [Create Job]
    • タスク設定項目:
      • Name: タスク名 (例: "Bronze", "Silver-Orders")
      • Type: Notebook, Python Script, SQL, DLT Pipeline など
      • Path: 実行するノートブック等のパス
      • Cluster:
        • Job Cluster (推奨): [Create new job cluster] を選択。タスク/ジョブ専用で実行完了後に自動停止するためコスト効率が良い。
        • All-Purpose Cluster: 既存の汎用クラスターを選択。
      • Parameters: ノートブックウィジェット (dbutils.widgets.text(...) など) に値を渡す。Keyにウィジェット名、Valueに値を設定。
      • Depends on: 実行順序を定義。先行タスクを選択。複数選択可能。
      • Retries: タスク失敗時の自動リトライポリシー (例: 回数指定、Unlimited)。
    • ジョブ全体設定 (右パネル):
      • Maximum concurrent runs: 同一ジョブを同時に実行できる最大数。Unlimited Retryと組み合わせる際は、意図しない大量並列実行を防ぐため 1 に設定推奨。
  • 本番用ノートブックの準備:
    • コードクリーンアップ: 開発/デバッグ用のコマンド (display(), 不要なクエリ、DROP TABLE 等) は削除またはコメントアウトする。
    • Auto Optimize 有効化: パイプラインの各テーブルで小さなファイルが多数生成されるのを防ぐため、ノートブックの最初などで有効化しておくことが推奨される。
      spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", "true")
      spark.conf.set("spark.databricks.delta.autoCompact.enabled", "true")
      
  • スケジューリング:
    • 設定: ジョブ詳細画面の右パネル [Schedule] > [Add schedule]
    • UI: Scheduled を選択し、実行頻度、日時などをUIで設定。
    • Cron構文: UIで表現できない複雑なスケジュールは Cron Syntax を選択して直接記述 (例: 0 0 1,13 * * MON-FRI - 平日の1AMと1PM)。
    • 操作: スケジュールの Pause (一時停止)、Resume (再開)、Delete (削除) が可能。
  • 権限管理 (Permissions):
    • Owner (所有者):
      • ジョブに対する全権限 (表示、実行、管理、削除、権限変更)。
      • ジョブ実行時の認証情報として使用 (Run as フィールドに表示)。
      • 必須: 個人ユーザーであること (グループは不可)。
      • 変更: [Edit Permissions] から変更可能。所有者を変更すると Run as も更新される。
    • Creator (作成者): ジョブを最初に作成したユーザー。所有者が変わっても作成者は不変。
    • 権限レベル:
      • Can View: 閲覧のみ。
      • Can Manage Run: 実行の開始/キャンセル、実行結果の閲覧。
      • Can Manage: 編集、削除、権限変更を含む全権限。
    • 設定: [Edit Permissions] でユーザーまたはグループに追加。

2. Databricks Jobs トラブルシューティング

  • 失敗時の挙動:
    • ルートタスク失敗: それに依存する全てのタスクが Skipped になる。
    • 中間タスク失敗: そのタスクに依存する後続タスクのみ Skipped。並列タスクは影響を受けず実行される(成功または失敗)。
    • コミット: タスクが失敗しても、失敗ポイントよりに実行された処理(Delta Lakeへの書き込みコミット等)は完了している。
  • エラー確認: 失敗したジョブ実行を開き、赤く表示された失敗タスクをクリックすると、エラーメッセージやスタックトレースが表示される。
  • リペア実行 (Repair Run):
    • 目的: 失敗したジョブ実行を効率的に修正・完了させる。
    • 場所: 失敗したジョブ実行の詳細ページ右上のボタン。
    • 動作: 失敗したタスクと、それに依存していたためにスキップされたタスクのみを再実行する。
    • 手順: コード等の問題を修正 → 失敗したRunを開く → [Repair Run] をクリック。

3. Databricks REST API

目的: プログラム(スクリプト、アプリケーション)からDatabricksリソースを操作。

  • 認証:
    • パーソナルアクセストークン (PAT): [User Settings] > [Access tokens] で生成。生成時に表示されるトークン文字列をコピー&保存。
    • ヘッダー: Authorization: Bearer <YOUR_PAT>
  • 基本:
    • エンドポイント: https://<databricks-instance>/api/<version>/<resource>
    • ツール: Postman, curl, Python requests など。
  • 主な操作例 (Jobs API v2.1):
    • ジョブ作成: POST /api/2.1/jobs/create
      • Body: ジョブ定義のJSON (UIの View JSON で取得可能)。
      • Response: job_id
    • ジョブ実行: POST /api/2.1/jobs/run-now
      • Body: { "job_id": <job_id> }
      • Response: run_id (ジョブ実行ID), number_in_job
    • 実行状態取得: GET /api/2.1/jobs/runs/get?run_id=<run_id>
      • Response: 実行ステータス (life_cycle_state, result_state)、各タスクのIDなど。

4. Databricks CLI

目的: コマンドラインインターフェースからDatabricksリソースを操作。

  • セットアップ:
    • インストール: pip install databricks-cli (要Python)
    • 設定: databricks configure --token
      • Databricks Host (例: https://adb-....azuredatabricks.net/) と PAT を入力。
      • 設定は ~/.databrickscfg に保存 (トークンは平文)。
  • 基本:
    • databricks -h: ヘルプ
    • databricks <resource> -h: リソースごとのヘルプ (例: databricks fs -h)
  • 主な操作例:
    • クラスター (databricks clusters ...):
      • list: 一覧表示
      • start --cluster-id <id>: 起動
      • get --cluster-id <id>: 状態取得
    • DBFS (databricks fs ...):
      • ls [dbfs_path]: 一覧表示
      • cp <local_path> <dbfs_path>: ローカルからアップロード
      • cp <dbfs_path> <local_path>: DBFSからダウンロード
      • rm <dbfs_path>: 削除
    • Secrets (databricks secrets ...):
      • スコープ管理:
        • scopes create --scope <scope_name>
        • scopes list
      • シークレット管理:
        • put --scope <scope_name> --key <key_name> (値はプロンプト入力)
        • list --scope <scope_name>
        • delete --scope <scope_name> --key <key_name>
      • ノートブックでの利用:
        • value = dbutils.secrets.get(scope="<scope_name>", key="<key_name>")
        • print(value) しても値は [REDACTED] と表示される。

削除要求処理と動的ビュー

1. 削除要求 (Right to be Forgotten) の増分処理と伝播

目的: GDPR等に基づくユーザーからの個人情報 (PII) 削除要求に対応し、関連する全てのテーブルから該当データを削除・伝播させる。

  • 処理フロー例:
    1. 削除要求の追跡:
      • ソース: CDCフィード (例: Bronzeテーブルの customers トピック、_change_type = 'delete') などから削除対象ユーザーIDを取得。
      • 追跡テーブル作成: 削除要求を管理するテーブル (例: delete_requests) を作成。customer_id, request_time, deadline, status (例: 'requested', 'processing', 'deleted') などの列を持つ。
      • 取り込み: ストリーミングクエリでソースから読み取り、追跡テーブルに INSERT する。
        -- 例: delete_requests テーブルへのストリーミング書き込み
        CREATE OR REPLACE TABLE delete_requests AS
        SELECT
          customer_id,
          timestamp AS request_time,
          timestamp + INTERVAL 30 DAYS AS deadline, -- 例: 30日の期限
          'requested' AS status
        FROM bronze_table_cdc
        WHERE topic = 'customers' AND _change_type = 'delete'; -- 例
        
    2. PIIテーブルからの削除:
      • delete_requests テーブルを基に、PIIを含む主要テーブル (例: customers) から該当レコードを DELETE
        DELETE FROM customers
        WHERE customer_id IN (
          SELECT customer_id FROM delete_requests WHERE status = 'requested'
        );
        
    3. 下流テーブルへの削除伝播 (CDF利用):
      • 前提: PIIテーブル (customers) で Change Data Feed (CDF) が有効であること (TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true'))。
      • CDF読み取り: customers テーブルのCDFをストリーミングで読み取る。
        cdf_stream = (spark.readStream.format("delta")
                      .option("readChangeData", "true")
                      .option("startingVersion", <start_version>) # または startingTimestamp
                      .table("customers"))
        
      • foreachBatch で処理: CDFストリームの各マイクロバッチを処理する関数を定義。
        def propagate_deletes(microBatchDF, batchId):
            # 削除イベントのみフィルタリング
            deletes_df = microBatchDF.filter(col("_change_type") == "delete")
                                    .select("customer_id", "_commit_timestamp") # 必要な列を選択
        
            if deletes_df.count() > 0:
                deletes_df.createOrReplaceTempView("deletes_microbatch")
                spark = deletes_df.sparkSession
        
                # 1. 下流テーブルから削除 (例: customers_orders)
                spark.sql("""
                    DELETE FROM customers_orders
                    WHERE customer_id IN (SELECT customer_id FROM deletes_microbatch)
                """)
        
                # 2. delete_requests テーブルのステータスを更新
                spark.sql("""
                    MERGE INTO delete_requests t
                    USING deletes_microbatch s ON t.customer_id = s.customer_id
                    WHEN MATCHED AND t.status = 'requested' THEN -- または適切なステータス条件
                      UPDATE SET t.status = 'deleted', t.deleted_timestamp = s._commit_timestamp
                """)
        
        # ストリーミングクエリで foreachBatch を実行
        (cdf_stream
          .writeStream
          .foreachBatch(propagate_deletes)
          .option("checkpointLocation", "...")
          .trigger(availableNow=True) # または processingTime
          .start()
        )
        
  • 完全な削除のための注意点:
    • DELETE 操作の性質: Delta Lakeの DELETE は論理削除。データは即座に物理削除されず、古いテーブルバージョンやCDFフィードには削除されたはずのデータが残っている。
    • VACUUM の必要性: 削除されたデータへのアクセスを完全に不可能にし、物理ストレージからファイルを削除するには、関連する全てのテーブル (customers, customers_orders など) に対して定期的に VACUUM コマンドを実行する必要がある。
      VACUUM customers RETAIN 0 HOURS; -- 即時削除の場合 (保持期間に注意)
      VACUUM customers_orders RETAIN 0 HOURS;
      

2. 動的ビュー (Dynamic Views) によるアクセス制御

目的: ユーザーのIDやグループメンバーシップに基づき、ビューが表示するデータを列レベルまたは行レベルで動的に制御する。

  • 主要関数:
    • is_member('group_name'): 現在のユーザーが指定グループのメンバーか判定 (Boolean)。
    • current_user(): 現在のユーザー名を返す (String)。
  • 列レベルアクセス制御 (マスキング):
    • CASE WHENis_member() を使用して機密性の高い列の値を出し分ける。
      CREATE OR REPLACE VIEW customers_redacted_view AS
      SELECT
        customer_id,
        CASE
          WHEN is_member('admins_demo') THEN email -- 特権グループは平文
          ELSE 'REDACTED' -- それ以外はマスク
        END AS email,
        CASE
          WHEN is_member('admins_demo') THEN first_name
          ELSE 'REDACTED'
        END AS first_name,
        -- ... 他の列 ...
        country_name
      FROM customers;
      
  • 行レベルアクセス制御:
    • WHERE 句に is_member()current_user() を組み込んだ条件を追加する。
      CREATE OR REPLACE VIEW customers_row_filtered_view AS
      SELECT * FROM customers_redacted_view -- 上記の列マスク済みビューをベースにする例
      WHERE
        is_member('admins_demo') -- 特権グループは全行アクセス可
        OR
        (country_name = 'France' AND update_timestamp > '2023-01-01') -- それ以外は特定の条件に合う行のみ
        -- OR email = current_user() -- 自分のデータのみアクセス可にする場合など
      ;
      
  • 設定:
    • グループ: 事前に Databricks Admin Console や API/CLI でアクセス制御用のグループ (例: admins_demo) を作成し、適切なユーザーを所属させる。
    • ビュー作成: 上記のような CREATE VIEW 文を実行する。
  • ビューのレイヤリング: 複数のビューを重ねて定義し、段階的にアクセス制御を適用することも可能。

コードモジュール化、ライブラリインストール、テスト戦略

1. コードのモジュール化とインポート (Databricks Repos)

目的: ノートブック間のコード共有と再利用性を高め、標準的なPython開発プラクティスに近づける。

  • %run <notebook_path> (従来の方法):
    • 指定したノートブックを実行し、その中の変数、関数、クラスを現在のノートブックスコープに取り込む。
  • Pythonファイル (.py) の利用 (推奨):
    • 利点: 標準の import 構文が使える、テストしやすい、IDE連携しやすい。
    • 作成: Repos UIで [Create] > [File] を選択し、.py 拡張子でファイルを作成 (Admin Consoleでの設定有効化が必要な場合あり)。
    • 注意: ノートブックとして認識される # Databricks notebook source コメントをファイルに含めないこと。
    • インポート:
      • 同じリポジトリ内のファイル:
        # helpers/math_utils.py 内の関数をインポート
        from helpers.math_utils import add_numbers
        
        result = add_numbers(5, 3)
        
      • 別のディレクトリのファイル: Python検索パス (sys.path) にモジュールのあるディレクトリを追加する必要がある。
        import sys
        import os
        
        # 例: ノートブックの親ディレクトリにある 'common_modules' をパスに追加
        module_dir = os.path.abspath(os.path.join('..', 'common_modules'))
        if module_dir not in sys.path:
            sys.path.append(module_dir)
        
        from data_validation import validate_schema
        
    • パスの確認・操作:
      • カレントワーキングディレクトリ (CWD): %sh pwd (通常は実行中ノートブックのディレクトリ)
      • Python検索パス: import sys; print(sys.path)
      • パス追加: sys.path.append('/path/to/add')

2. ノートブックスコープでのライブラリインストール

  • %pip install マジックコマンド:
    • 目的: 現在のノートブックセッションにのみ有効なPythonライブラリ (PyPIやWheelファイル) をインストールする。
    • 構文:
      %pip install <package_name>
      %pip install <package_name>==<version>
      %pip install /path/to/your_package-0.1.0-py3-none-any.whl
      %pip install -r requirements.txt
      
    • 重要事項:
      • %pip コマンドはノートブックの一番最初に記述する。
      • 実行後、Pythonインタープリターが再起動され、それ以前のセルで定義した変数は失われる。
    • %sh pip との違い: %pip はクラスターの全ノードにインストールするが、%sh pipドライバーノードのみ。ライブラリインストールには %pip を使用する。

3. データパイプラインのテスト

目的: データパイプラインの信頼性と品質を確保する。

  • テストの主要カテゴリ:
    • データ品質テスト (Data Quality Tests):
      • 対象: データそのもの。
      • 目的: データが期待される形式、範囲、一貫性などを満たしているか検証する (例: Nullでないか、特定の値の範囲内か、ユニークであるべきか)。
      • 方法: Delta Lake CHECK制約、assert 文、専用ライブラリ (例: Great Expectations)、カスタム検証ロジック。
    • 標準テスト (Standard Tests):
      • 対象: パイプラインを構成するコード (関数、クラス、モジュール、パイプライン全体)。
      • 目的: コードロジックが期待通りに動作するか検証する。コード変更時のデグレ防止。
  • 標準テストの種類:
    • ユニットテスト (Unit Testing):
      • 焦点: 個別の関数やメソッドなどの最小単位のコード。
      • 目的: 特定の入力に対する出力が期待通りか、独立して検証する。
      • 方法: assert 文や pytest, unittest などのフレームワークを使用。
        # 例: helper関数のテスト
        from helpers.math_utils import add_numbers
        
        def test_add_numbers():
            assert add_numbers(2, 3) == 5
            assert add_numbers(-1, 1) == 0
        
    • インテグレーションテスト (Integration Testing):
      • 焦点: 複数のユニット(関数、クラス、モジュール)が連携して動作する部分。
      • 目的: コンポーネント間のインタラクションやデータの受け渡しが正しく行われるか検証する。
    • エンドツーエンドテスト (End-to-End Testing / E2E Testing):
      • 焦点: パイプライン全体(データソースから最終的な出力/シンクまで)。
      • 目的: 実際の運用に近い状況で、パイプライン全体が期待通り機能するか検証する。ビジネスロジック全体の検証。

クラスター権限、ログ、モニタリング

1. クラスター権限管理

目的: ユーザーやグループがクラスターをどのように利用・操作できるかを制御する。

  • 権限の種類:
    • ワークスペースレベル (クラスター作成権限):
      • 場所: Admin Console > [Users] タブ > 対象ユーザー選択
      • 設定: Allow unrestricted cluster creation チェックボックス
      • 効果: チェックを入れると、ユーザーは自由に(ポリシー制限内で)クラスターを作成できる。
    • クラスターレベル (個別クラスターへのアクセス):
      • 場所: [Compute] > 対象クラスター > [...]メニュー > [Edit Permissions]
      • レベル:
        • Can Attach To:
          • ノートブックをクラスターにアタッチして実行できる。
          • Spark UI、ログ、メトリクスを閲覧できる。
          • クラスターの状態変更(開始/停止/再起動/編集)は不可。
        • Can Restart:
          • Can Attach To の全権限。
          • クラスターの 開始、停止、再起動 が可能。
          • クラスター設定の編集は不可。
        • Can Manage:
          • Can Restart の全権限。
          • クラスター設定の 編集 が可能。
          • クラスターの 権限設定の編集 が可能 (他のユーザー/グループへの権限付与/剥奪)。
          • クラスターの削除が可能。
      • 割り当て: 各レベルをユーザーまたはグループに割り当て可能。

2. クラスターログの確認

目的: クラスターの動作状況の追跡、トラブルシューティング、コードのデバッグを行う。

  • 場所: クラスター詳細ページ内の各タブ
  • ログの種類:
    • クラスターイベントログ (Cluster Event Log):
      • タブ: [Event Log]
      • 内容: クラスターのライフサイクルイベント履歴 (作成、開始、停止、終了、編集、リサイズ、エラー等)。手動操作と自動イベントの両方を記録。
      • 用途: クラスターがいつ、誰によって、どのように操作されたか追跡。オートスケールの挙動確認 (RESIZING イベントでフィルタ)。
      • 機能: イベントタイプによるフィルタリング。イベント行クリック → [JSON] タブで詳細情報(例: 設定変更前後の値)を表示。
    • ドライバログ (Driver Logs):
      • タブ: [Driver Logs]
      • 内容: クラスターのドライバーノードで実行されたコード(ノートブック、ジョブ、ライブラリ)からの標準出力、標準エラー、Log4jログ。
      • ストリーム:
        • stdout: print() 文などの標準出力。
        • stderr: エラーメッセージなどの標準エラー出力。
        • Log4j output: Log4jライブラリを使用したログ出力。
      • 用途: コードのデバッグ、アプリケーション固有のログ確認。
      • 機能: 各ログファイルのダウンロードが可能。

3. クラスターパフォーマンスモニタリング (Ganglia UI)

目的: クラスターのリソース使用状況をリアルタイムで監視し、パフォーマンスのボトルネックを特定する。

  • アクセス:
    • 場所: クラスター詳細ページ > [Metrics] タブ > [Ganglia UI] リンク
    • 条件: クラスターが実行中 であること。
  • Ganglia UIの主要メトリクス:
    • クラスター全体概要 (UI上部4グラフ):
      • Cluster Load: 負荷状況(プロセス数/ノード数)。
      • Memory Usage: 全ノードの合計メモリ使用量(キャッシュ/バッファ/使用中/空き)。
      • CPU Usage: 全ノードの合計CPU使用率。
      • Network Usage: 全ノードの合計ネットワーク送受信量。
    • ノード別詳細 (UI下部 & 左メニュー):
      • ドライバーノードと各ワーカーノード個別のメトリクスを確認可能。
      • ドライバーノード特定: Spark UI (クラスター詳細ページの [Spark UI] タブ) でドライバーのIPアドレスを確認し、Ganglia UIのノードリストと照合。
      • レポート選択: 左メニューから Memory Report, CPU Report, Network Report, Disk Report などを選択し、ノードごとの詳細グラフを表示。
  • 分析とアクション:
    • CPU高負荷:
      • ドライバーのみ高負荷 → ドライバーに処理が集中しすぎている可能性。コードの見直しや、より大きなドライバーノードを検討。
      • 全ワーカー高負荷 → 全体的な処理能力不足。ワーカー数を増やすか、オートスケールを有効化/調整。
      • 特定ワーカーのみ高負荷 → データスキューの可能性。パーティショニングやデータ分散処理の見直し。
    • メモリ逼迫:
      • 特定ノードでメモリ使用量が高い → メモリリークや不適切なキャッシュ利用の可能性。コードの見直しや、よりメモリの大きいインスタンスタイプを検討。
      • ディスクI/O高負荷 (Disk Report) → メモリ不足によるディスクスピル(データの一時退避)が発生している可能性。メモリ割り当ての増加を検討。
  • 履歴メトリクス (スナップショット):
    • 制限: Ganglia UIのライブメトリクスはクラスターが停止/再起動すると失われる。
    • 代替: DatabricksがGanglia UIのスクリーンショットを定期的に保存。
    • 場所: クラスター詳細ページ > [Metrics] タブ > "History" セクション
    • 内容: 特定時点でのクラスター全体の概要グラフの画像のみ。ノード別の詳細レポートやインタラクティブな分析は不可。過去のある時点での全体的な状況を確認するのに役立つ。

Discussion