🌱

データ基盤の設計思想:Databricksのスキーマ推論・進化

に公開

1. はじめに:なぜスキーマ推論・進化が重要なのか

データエンジニアリングの現場では、「データの形が毎回少しずつ違う」 という問題がつきまといます。
特に、IoTデバイスのログや業務アプリからのイベントデータ、API経由で取得するJSONなど、構造が頻繁に変わるデータを扱うとき、この課題は顕著です。

こうした変化に都度手作業で対応していると、パイプラインのメンテナンスコストが跳ね上がり、運用が回らなくなります。
そこで登場するのが、Databricksの「スキーマ推論(Schema Inference)」と「スキーマ進化(Schema Evolution)」 です。
これは、データの構造を自動で理解し、必要に応じて変化に対応する仕組みです。

スキーマ推論・スキーマ進化とは

「スキーマ」とは、データの列名や型、階層構造など、データの設計図のようなものです。


Databricksではデータを取り込むと列名・それぞれのデータ型を勝手に定義してくれる

ETLやデータ基盤の設計では、このスキーマをどのタイミングで決めるかが重要になります。

https://learn.microsoft.com/ja-jp/azure/databricks/ingestion/cloud-object-storage/auto-loader/schema

ここでよく出てくるのが次の2つの考え方です。

モデル 説明
スキーマ・オン・ライト(Schema-on-Write) データを保存する前に、スキーマを明示的に定義して書き込む方式。RDBやDWHなどが代表例。
スキーマ・オン・リード(Schema-on-Read) データを読み込むときにスキーマを適用する方式。データレイクや半構造化データで多用される。

データレイクのように柔軟な構造を扱う環境では、スキーマ・オン・リードが主流です。
ただし、自由度が高い分「スキーマをどう解釈するか」という問題が発生します。
たとえば、同じキーでも日によってデータ型が違ったり、列が増えたり減ったりするケースです。
このような状況で役立つのが、スキーマ推論
ファイルの中身をサンプリングして、自動的に列名とデータ型を判断する機能です。

Databricksが「自動でスキーマを理解・進化させる」仕組みを持つ理由

Databricksは、レイクハウスアーキテクチャを支えるプラットフォームとして、
構造化・半構造化・非構造化データをすべて統一的に扱うことを目指しています。

このために、

  • Autoloader によるリアルタイム取込+自動スキーマ検出
  • Delta Lake によるスキーマ進化(ALTER TABLEを使わずに列を追加)
  • Rescue Data Column による未知フィールドの自動保存

といった仕組みが備わっています。

つまりDatabricksでは、「スキーマを固定して守る」よりも
“変化を前提に、安全に受け入れる” という考え方が基本になっています。
この設計思想こそが、データレイクハウス時代の運用負荷を大きく減らすポイントです。

2. Autoloaderによるスキーマ推論の仕組み

Databricksの Autoloader は、クラウドストレージ上の新規データを自動で検知・取り込みできる仕組みです。
特徴は、単なるバッチ取り込みではなく、「増分処理」+「スキーマ自動検出」 を同時に実現していること。
データが増えても構造が変わっても止まらないこと がAutoloaderの強みです。

Autoloaderの概要

Autoloaderは内部的にストレージイベント通知やディレクトリスキャンを用い、
「まだ読み込まれていないファイルだけ」をインクリメンタルに取り込みます。

df = (spark.readStream
      .format("cloudFiles")                     # AutoLoader(増分読み込み用フォーマット)
      .option("cloudFiles.format", "json")      # 入力データのフォーマット
      .option("cloudFiles.schemaLocation",      # スキーマ定義を保存するパス(推論結果のキャッシュ)
              "/mnt/schema/")
      .load("/mnt/input/"))                     # 入力元のディレクトリ(クラウドストレージなど)

.option("cloudFiles.schemaLocation", "/mnt/schema/") が重要です。
Autoloaderが推論したスキーマ情報を保存し、次回の読み込み時に再利用するキャッシュとして機能します。
これがないと、毎回全ファイルをスキャンして推論し直すため、コストも時間もかかります。

サンプリング方式

スキーマ推論は、最初のデータ読み込み時に「サンプルデータ」をもとに行われます。
公式ドキュメントによると、Autoloaderは次のようなサンプリング戦略を取ります。

  • 1,000ファイルまたは50GBまでを対象にサンプリング(超過した場合はそこで打ち切り)
  • ファイル形式によって推論戦略が異なる(JSONやCSVは中身をスキャン、ParquetやAvroはメタ情報から抽出)
  • 設定値は内部オプション
    (例:spark.databricks.cloudFiles.schemaInference.sampleSize.numFiles)で調整可能

これにより、データのボリュームが大きくても効率的にスキーマを判断できます。

https://learn.microsoft.com/ja-jp/azure/databricks/ingestion/cloud-object-storage/auto-loader/schema#how-does-auto-loader-schema-inference-work

schemaLocation によるスキーマキャッシュ

schemaLocation に指定したパスには、以下のようなメタデータが保存されます。

  • 最新のスキーマ(推論済みの構造)
  • 変更履歴(スキーマ進化時に追加された列の情報)
  • rescueDataColumnなどの例外フィールド情報

この仕組みにより、次回以降の実行では 「スキーマを再推論せずに」前回の構造を再利用 できます。

Autoloaderには、inferColumnTypes, schemaHints, cloudFiles.schemaInference など、
推論を制御するためのいくつかのオプションがあります。

オプション名 役割
cloudFiles.inferColumnTypes 推論時に数値やブール型を自動識別する(falseの場合すべてstring) "true"
cloudFiles.schemaHints 列ごとに型を事前指定できる "age INT, price DOUBLE"
cloudFiles.schemaInference.mode 推論の実行タイミング(rescue, addNewColumns, noneなど) "addNewColumns"

実装例:

df = (spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")       # CSV形式を読み込む
      .option("cloudFiles.inferColumnTypes",    # 数値や日時を自動推論(falseなら全てstring)
              "true")
      .option("cloudFiles.schemaHints",         # 列単位で型を明示指定(誤推論防止)
              "id INT, created_at TIMESTAMP")
      .option("cloudFiles.schemaLocation",      # スキーマを保存するディレクトリ
              "/mnt/schema/")
      .load("/mnt/input/"))

このように設定することで、誤推論(例:数字をstring扱いするなど)を防ぎながら、柔軟なスキーマ制御が可能になります。

https://learn.microsoft.com/ja-jp/azure/databricks/ldp/from-json-schema-evolution

フォーマットごとの違い

フォーマット 推論方法 特徴
JSON / CSV ファイル内容をサンプリングして列名・型を解析 柔軟だがサンプリングコスト高
Parquet / Avro メタ情報からスキーマを直接抽出 高速かつ正確だが柔軟性は低い
Binary / Image 固定スキーマまたはメタ情報のみ ML用途などに限定される

たとえば、JSONを扱う場合は半構造化データのため推論コストが高くなります。
このため、頻繁にスキーマが変わる場合は Parquet 変換してから扱う方が安定的です。

3. スキーマ進化の対応

スキーマ推論は「いまのデータ構造を理解する」機能ですが、
スキーマ進化(Schema Evolution) は「変化を受け入れて更新する」仕組みです。
現場では、新しい列が追加されたり、型が変わったりといった変更が頻繁に起こります。
DatabricksのAutoloaderは、そうした変更を自動的に取り込む設計になっています。

addNewColumns / rescueDataColumn モードの動作

Autoloaderは、スキーマの変化を検知した際にいくつかの動作モードを選べます。

モード名 動作内容
addNewColumns 新しい列を自動でスキーマに追加し、テーブルを更新する(最も一般的)
rescue 予期しない列を _rescued_data という構造体列にまとめて保存(データ損失防止)
none / failOnNewColumns 新しい列があるとエラーを発生させ、手動対応を促す

この設定は .option("cloudFiles.schemaEvolutionMode", "addNewColumns") のように指定します。

実装例:Autoloaderのスキーマ進化設定

df = (spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "json")
      .option("cloudFiles.schemaLocation", "/mnt/schema/")
      .option("cloudFiles.inferColumnTypes", "true")
      .option("cloudFiles.schemaEvolutionMode", # 新しい列が追加されたときの挙動を制御
              "addNewColumns")                  # 自動で新しい列を追加
      .load("/mnt/input/"))

上記設定では、新しい列が出現してもジョブが停止せず、自動的にDeltaテーブルへ追加されます。
一方で、スキーマ変更を無制限に許すと構造が不安定になるため、
開発環境では addNewColumns、本番では rescue モード のように環境別運用を推奨されています。

https://learn.microsoft.com/ja-jp/azure/databricks/ingestion/cloud-object-storage/auto-loader/schema#evolution

Delta Lakeとの関係:スキーマ変更をどう反映させるか

Autoloaderが推論・進化させたスキーマは、通常Delta Lakeテーブルに保存されます。
このとき、Delta Lake側でもスキーマの整合性を保つために
mergeSchema オプションを設定する必要があります。

実装例:

(df.writeStream
   .option("mergeSchema", "true")               # Delta側でもスキーマ変更を許可
   .option("checkpointLocation",                # ストリーミング状態を保存(必須)
           "/mnt/checkpoints/")
   .trigger(availableNow=True)                  # すぐに1回分を処理(バッチ的な動作)
   .table("bronze.events"))                     # 書き込み先のDeltaテーブル名

これにより、Autoloaderが新しい列を追加してもDeltaテーブルに自動反映されます。
逆にこの設定がないと、スキーマ不一致エラーが発生します。

実運用での注意点

スキーマ進化を本番で使う場合、次の3点には注意が必要です。

  1. 型変更(int → stringなど)は非互換
    → 自動では変換されず、明示的なCASTや中間処理が必要。
  2. チェックポイントとの整合性
    → ストリーミングは過去のスキーマをチェックポイントに保存しているため、
    スキーマ変更後に再起動するときは再推論・再設定が必要になることがあります。
  3. statefulな処理との相性
    groupByaggregate など状態を保持する処理では、
    スキーマ進化がトリガーでジョブが再起動するケースもあります。

AutoloaderとDelta Lakeのスキーマ進化機能を組み合わせることで、
従来のETLのように「スキーマ変更=障害」ではなく、
変化を前提とした柔軟なパイプラインを構築できます。

ただし、自動化しすぎるとスキーマドリフト(想定外の構造変化)を招くため、
「ガイド付きの自動化(ヒント・契約・監視)」を組み合わせることが重要です。

https://learn.microsoft.com/ja-jp/azure/databricks/ingestion/cloud-object-storage/auto-loader/schema#override-schema-inference-with-schema-hints

まとめ

本記事では、AutoLoader/Delta Lake を活用した Databricks における スキーマ推論(Schema Inference)スキーマ進化(Schema Evolution) の仕組みと実装を整理しました。

  • データレイク環境では、データ構造が時間とともに変化するため、手動でスキーマを固定していくと運用コスト・障害リスクが高まるという背景があります。
  • Auto Loader によるスキーマ推論機能により、初回取り込み時にサンプリングして構造を自動的に判断し、以降はスキーマキャッシュ(schemaLocation)を活用することで処理の安定化を図れます。
  • スキーマ進化にも対応しており、例えば addNewColumns モードや rescue モードを設定することで「新しい列が来ても止まらない」設計が可能です。
  • ただし、スキーマ変更には運用上の注意点があります。チェックポイント、状態管理、ストリーミング再起動など設計を誤ると障害につながります。
  • 実運用では、完全自動化ではなく「ヒントを活用する」「スキーマ契約を設ける」など、制御された自動化が鍵となります。
ヘッドウォータース

Discussion