データ基盤の設計思想:Databricksのスキーマ推論・進化
1. はじめに:なぜスキーマ推論・進化が重要なのか
データエンジニアリングの現場では、「データの形が毎回少しずつ違う」 という問題がつきまといます。
特に、IoTデバイスのログや業務アプリからのイベントデータ、API経由で取得するJSONなど、構造が頻繁に変わるデータを扱うとき、この課題は顕著です。
こうした変化に都度手作業で対応していると、パイプラインのメンテナンスコストが跳ね上がり、運用が回らなくなります。
そこで登場するのが、Databricksの「スキーマ推論(Schema Inference)」と「スキーマ進化(Schema Evolution)」 です。
これは、データの構造を自動で理解し、必要に応じて変化に対応する仕組みです。
スキーマ推論・スキーマ進化とは
「スキーマ」とは、データの列名や型、階層構造など、データの設計図のようなものです。

Databricksではデータを取り込むと列名・それぞれのデータ型を勝手に定義してくれる
ETLやデータ基盤の設計では、このスキーマをどのタイミングで決めるかが重要になります。
ここでよく出てくるのが次の2つの考え方です。
| モデル | 説明 |
|---|---|
| スキーマ・オン・ライト(Schema-on-Write) | データを保存する前に、スキーマを明示的に定義して書き込む方式。RDBやDWHなどが代表例。 |
| スキーマ・オン・リード(Schema-on-Read) | データを読み込むときにスキーマを適用する方式。データレイクや半構造化データで多用される。 |
データレイクのように柔軟な構造を扱う環境では、スキーマ・オン・リードが主流です。
ただし、自由度が高い分「スキーマをどう解釈するか」という問題が発生します。
たとえば、同じキーでも日によってデータ型が違ったり、列が増えたり減ったりするケースです。
このような状況で役立つのが、スキーマ推論、
ファイルの中身をサンプリングして、自動的に列名とデータ型を判断する機能です。
Databricksが「自動でスキーマを理解・進化させる」仕組みを持つ理由
Databricksは、レイクハウスアーキテクチャを支えるプラットフォームとして、
構造化・半構造化・非構造化データをすべて統一的に扱うことを目指しています。
このために、
- Autoloader によるリアルタイム取込+自動スキーマ検出
- Delta Lake によるスキーマ進化(ALTER TABLEを使わずに列を追加)
- Rescue Data Column による未知フィールドの自動保存
といった仕組みが備わっています。
つまりDatabricksでは、「スキーマを固定して守る」よりも
“変化を前提に、安全に受け入れる” という考え方が基本になっています。
この設計思想こそが、データレイクハウス時代の運用負荷を大きく減らすポイントです。
2. Autoloaderによるスキーマ推論の仕組み
Databricksの Autoloader は、クラウドストレージ上の新規データを自動で検知・取り込みできる仕組みです。
特徴は、単なるバッチ取り込みではなく、「増分処理」+「スキーマ自動検出」 を同時に実現していること。
データが増えても構造が変わっても止まらないこと がAutoloaderの強みです。
Autoloaderの概要
Autoloaderは内部的にストレージイベント通知やディレクトリスキャンを用い、
「まだ読み込まれていないファイルだけ」をインクリメンタルに取り込みます。
df = (spark.readStream
.format("cloudFiles") # AutoLoader(増分読み込み用フォーマット)
.option("cloudFiles.format", "json") # 入力データのフォーマット
.option("cloudFiles.schemaLocation", # スキーマ定義を保存するパス(推論結果のキャッシュ)
"/mnt/schema/")
.load("/mnt/input/")) # 入力元のディレクトリ(クラウドストレージなど)
.option("cloudFiles.schemaLocation", "/mnt/schema/") が重要です。
Autoloaderが推論したスキーマ情報を保存し、次回の読み込み時に再利用するキャッシュとして機能します。
これがないと、毎回全ファイルをスキャンして推論し直すため、コストも時間もかかります。
サンプリング方式
スキーマ推論は、最初のデータ読み込み時に「サンプルデータ」をもとに行われます。
公式ドキュメントによると、Autoloaderは次のようなサンプリング戦略を取ります。
- 1,000ファイルまたは50GBまでを対象にサンプリング(超過した場合はそこで打ち切り)
- ファイル形式によって推論戦略が異なる(JSONやCSVは中身をスキャン、ParquetやAvroはメタ情報から抽出)
- 設定値は内部オプション
(例:spark.databricks.cloudFiles.schemaInference.sampleSize.numFiles)で調整可能
これにより、データのボリュームが大きくても効率的にスキーマを判断できます。
schemaLocation によるスキーマキャッシュ
schemaLocation に指定したパスには、以下のようなメタデータが保存されます。
- 最新のスキーマ(推論済みの構造)
- 変更履歴(スキーマ進化時に追加された列の情報)
- rescueDataColumnなどの例外フィールド情報
この仕組みにより、次回以降の実行では 「スキーマを再推論せずに」前回の構造を再利用 できます。
Autoloaderには、inferColumnTypes, schemaHints, cloudFiles.schemaInference など、
推論を制御するためのいくつかのオプションがあります。
| オプション名 | 役割 | 例 |
|---|---|---|
cloudFiles.inferColumnTypes |
推論時に数値やブール型を自動識別する(falseの場合すべてstring) | "true" |
cloudFiles.schemaHints |
列ごとに型を事前指定できる | "age INT, price DOUBLE" |
cloudFiles.schemaInference.mode |
推論の実行タイミング(rescue, addNewColumns, noneなど) |
"addNewColumns" |
実装例:
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv") # CSV形式を読み込む
.option("cloudFiles.inferColumnTypes", # 数値や日時を自動推論(falseなら全てstring)
"true")
.option("cloudFiles.schemaHints", # 列単位で型を明示指定(誤推論防止)
"id INT, created_at TIMESTAMP")
.option("cloudFiles.schemaLocation", # スキーマを保存するディレクトリ
"/mnt/schema/")
.load("/mnt/input/"))
このように設定することで、誤推論(例:数字をstring扱いするなど)を防ぎながら、柔軟なスキーマ制御が可能になります。
フォーマットごとの違い
| フォーマット | 推論方法 | 特徴 |
|---|---|---|
| JSON / CSV | ファイル内容をサンプリングして列名・型を解析 | 柔軟だがサンプリングコスト高 |
| Parquet / Avro | メタ情報からスキーマを直接抽出 | 高速かつ正確だが柔軟性は低い |
| Binary / Image | 固定スキーマまたはメタ情報のみ | ML用途などに限定される |
たとえば、JSONを扱う場合は半構造化データのため推論コストが高くなります。
このため、頻繁にスキーマが変わる場合は Parquet 変換してから扱う方が安定的です。
3. スキーマ進化の対応
スキーマ推論は「いまのデータ構造を理解する」機能ですが、
スキーマ進化(Schema Evolution) は「変化を受け入れて更新する」仕組みです。
現場では、新しい列が追加されたり、型が変わったりといった変更が頻繁に起こります。
DatabricksのAutoloaderは、そうした変更を自動的に取り込む設計になっています。
addNewColumns / rescueDataColumn モードの動作
Autoloaderは、スキーマの変化を検知した際にいくつかの動作モードを選べます。
| モード名 | 動作内容 |
|---|---|
addNewColumns |
新しい列を自動でスキーマに追加し、テーブルを更新する(最も一般的) |
rescue |
予期しない列を _rescued_data という構造体列にまとめて保存(データ損失防止) |
none / failOnNewColumns |
新しい列があるとエラーを発生させ、手動対応を促す |
この設定は .option("cloudFiles.schemaEvolutionMode", "addNewColumns") のように指定します。
実装例:Autoloaderのスキーマ進化設定
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "/mnt/schema/")
.option("cloudFiles.inferColumnTypes", "true")
.option("cloudFiles.schemaEvolutionMode", # 新しい列が追加されたときの挙動を制御
"addNewColumns") # 自動で新しい列を追加
.load("/mnt/input/"))
上記設定では、新しい列が出現してもジョブが停止せず、自動的にDeltaテーブルへ追加されます。
一方で、スキーマ変更を無制限に許すと構造が不安定になるため、
開発環境では addNewColumns、本番では rescue モード のように環境別運用を推奨されています。
Delta Lakeとの関係:スキーマ変更をどう反映させるか
Autoloaderが推論・進化させたスキーマは、通常Delta Lakeテーブルに保存されます。
このとき、Delta Lake側でもスキーマの整合性を保つために
mergeSchema オプションを設定する必要があります。
実装例:
(df.writeStream
.option("mergeSchema", "true") # Delta側でもスキーマ変更を許可
.option("checkpointLocation", # ストリーミング状態を保存(必須)
"/mnt/checkpoints/")
.trigger(availableNow=True) # すぐに1回分を処理(バッチ的な動作)
.table("bronze.events")) # 書き込み先のDeltaテーブル名
これにより、Autoloaderが新しい列を追加してもDeltaテーブルに自動反映されます。
逆にこの設定がないと、スキーマ不一致エラーが発生します。
実運用での注意点
スキーマ進化を本番で使う場合、次の3点には注意が必要です。
-
型変更(int → stringなど)は非互換
→ 自動では変換されず、明示的なCASTや中間処理が必要。 -
チェックポイントとの整合性
→ ストリーミングは過去のスキーマをチェックポイントに保存しているため、
スキーマ変更後に再起動するときは再推論・再設定が必要になることがあります。 -
statefulな処理との相性
→groupByやaggregateなど状態を保持する処理では、
スキーマ進化がトリガーでジョブが再起動するケースもあります。
AutoloaderとDelta Lakeのスキーマ進化機能を組み合わせることで、
従来のETLのように「スキーマ変更=障害」ではなく、
変化を前提とした柔軟なパイプラインを構築できます。
ただし、自動化しすぎるとスキーマドリフト(想定外の構造変化)を招くため、
「ガイド付きの自動化(ヒント・契約・監視)」を組み合わせることが重要です。
まとめ
本記事では、AutoLoader/Delta Lake を活用した Databricks における スキーマ推論(Schema Inference) と スキーマ進化(Schema Evolution) の仕組みと実装を整理しました。
- データレイク環境では、データ構造が時間とともに変化するため、手動でスキーマを固定していくと運用コスト・障害リスクが高まるという背景があります。
- Auto Loader によるスキーマ推論機能により、初回取り込み時にサンプリングして構造を自動的に判断し、以降はスキーマキャッシュ(
schemaLocation)を活用することで処理の安定化を図れます。 - スキーマ進化にも対応しており、例えば
addNewColumnsモードやrescueモードを設定することで「新しい列が来ても止まらない」設計が可能です。 - ただし、スキーマ変更には運用上の注意点があります。チェックポイント、状態管理、ストリーミング再起動など設計を誤ると障害につながります。
- 実運用では、完全自動化ではなく「ヒントを活用する」「スキーマ契約を設ける」など、制御された自動化が鍵となります。
Discussion