📌

DatabricksとS3連携の基本:外部テーブルによるデータロードとDelta Lake化

に公開

1. 事前準備

AWS側でS3バケットが作成済みであることが必要です。(今回は s3://databricks-external-table-test/ を使用)

2. 目的

Databricksプラットフォームを利用して、外部クラウドストレージ(AWS S3)に格納された生データ(CSV)を活用するための一連のプロセスを試し、その手順を確認することを目的とします。
具体的には、以下の項目を試しました。

  • DatabricksからS3バケットへのセキュアな接続設定。
  • S3上のCSVデータに対する**外部テーブル(External Table)**の作成とデータアクセス。
  • 外部テーブルから、より高機能で信頼性の高いDelta Lakeテーブルへのデータ変換およびロード。
  • データソースに新しいファイルが追加された際の、テーブルへのデータ追加・更新プロセス。

3. 実行環境

  • プラットフォーム: Databricks Free Edition
  • 外部ストレージ: AWS S3 (ユーザー管理バケット: s3://databricks-external-table-test/)

4. 試してみた

以降のステップで登場するSQLクエリは、すべてDatabricksのSQL Editor上で実行しました。

ステップ1: DatabricksとS3の連携設定(外部ロケーション作成)

まず、DatabricksがS3バケットに安全にアクセスするための設定として「外部ロケーション(External Location)」を作成しました。この操作はDatabricksのGUIから行いました。

実行手順の概要:

  1. DatabricksのUIから カタログ > 外部データ > 外部ロケーションを作成 を選択。
  2. クイックスタート オプションを選択し、対象のS3バケット名を入力。
  3. 画面の指示に従い、Databricksが生成した情報を用いてAWSコンソール上でCloudFormationスタックを作成。
    • このプロセスにより、Databricksが必要とするIAMロールなどがAWSアカウント内に自動でプロビジョニングされます。

この設定により、Databricksワークスペースが指定のS3バケットに対してデータの読み書きを行う権限を獲得しました。

ステップ2: S3上のデータに対する外部テーブルの作成

S3バケットの raw_data/ フォルダに配置した商品データCSV (sample_products.csv) をDatabricksから直接クエリできるようにするため、外部テーブル my_raw_products_table を作成しました。

実行クエリ:

-- S3上のCSVファイルを指し示す外部テーブルを作成
CREATE TABLE my_raw_products_table (
  product_id INT,
  product_name STRING,
  category STRING,
  price DOUBLE,
  sales_date DATE,
  is_available BOOLEAN
)
USING CSV
OPTIONS (
  header = "true",
  delimiter = ","
)
LOCATION 's3://databricks-external-table-test/raw_data/';

ステップ3: テーブルタイプの確認

作成したテーブルが意図通り「外部テーブル」として認識されているかを確認しました。

実行クエリ:

-- テーブルの詳細情報を表示
DESCRIBE EXTENDED my_raw_products_table;

結果:

  • 出力結果の Type 項目が EXTERNAL となっており、外部テーブルとして正しく作成されたことを確認しました。
  • Location 項目には、ステップ2で指定したS3パスが表示されました。

ステップ4: Delta Lakeテーブルへのデータ変換とロード

CSVベースの外部テーブルからデータを読み込み、Delta Lake形式の新しいテーブル my_delta_table を作成しました。

実行クエリ:

-- my_raw_products_tableのデータを元に、新しいDelta Lakeテーブルを作成
CREATE OR REPLACE TABLE my_delta_table
USING DELTA
LOCATION 's3://databricks-external-table-test/delta_tables/my_products_delta/'
AS SELECT * FROM my_raw_products_table;

ステップ5: データの追加とテーブルの更新 (外部テーブル経由)

データソースに新しいCSVファイル (products_add.csv) が追加された場合の更新プロセスを試しました。

5-1. 外部テーブルへの変更反映

S3の raw_data/ フォルダに新しいCSVファイルをアップロードした後、REFRESH TABLE コマンドで外部テーブルにその変更を認識させました。

実行クエリ:

-- S3フォルダの最新の状態をテーブルに反映
REFRESH TABLE my_raw_products_table;

5-2. 更新結果の確認

テーブルの行数を確認し、新しいファイルのデータが正しく追加されたことを検証しました。

実行クエリ:

-- 行数を確認
SELECT COUNT(*) FROM my_raw_products_table;

ステップ6: COPY INTOによるDelta Lakeテーブルへの直接データロード

ステップ5とは別のアプローチとして、COPY INTO コマンドを使い、外部テーブルを経由せずにS3上の新しいデータを直接Delta Lakeテーブルにロードする方法を試します。この方法は、インクリメンタルなデータロードをより効率的かつ冪等(べきとう)に(=何回実行しても結果が同じになるように)行うのに適しています。

6-1. 新しいデータファイルの準備

S3の raw_data/ フォルダに、さらに新しいデータファイル (products_add_2.csvなど、まだ読み込んでいないファイル) をアップロードします。

6-2. COPY INTOの実行とスキーマ不一致エラーへの対処

COPY INTO をそのまま実行すると、[DELTA_FAILED_TO_MERGE_FIELDS] のようなエラーが発生することがあります。これは、コマンドのデフォルト動作ではCSVのすべての列を文字列(STRING)として読み込もうとするため、ロード先のDeltaテーブルで定義されている数値型(INT)や日付型(DATE)と型が一致しないことが原因です。

この問題を解決する最も簡単な方法は、FORMAT_OPTIONS'inferSchema' = 'true' を追加することです。このオプションを指定すると、DatabricksがCSVファイルの内容をスキャンし、各列のデータ型を自動的に推測(infer)してくれます。これにより、スキーマの不一致が解消され、スムーズにデータをロードできます。

実行クエリ:

-- S3の指定フォルダからDelta Lakeテーブルにデータをコピー
COPY INTO my_delta_table
FROM 's3://databricks-external-table-test/raw_data/'
FILEFORMAT = CSV
FORMAT_OPTIONS (
  'header' = 'true',
  'inferSchema' = 'true' -- CSVのスキーマを自動推論させる
);

ポイント: 'inferSchema' = 'true' を追加することで、Databricksはproduct_id列を数値として、sales_date列を日付として認識し、Deltaテーブルのスキーマと正しく一致させることができます。また、COPY INTOはロード済みのファイルを記憶しているため、このコマンドを再度実行しても同じファイルが二重にロードされることはありません。

6-3. 更新結果の確認

my_delta_table の行数を確認し、products_add_2.csv のデータが正しく追加されたことを検証します。

実行クエリ:

-- ロード後の行数を確認
SELECT COUNT(*) FROM my_delta_table;

この方法では、REFRESH TABLE を実行する必要がなく、1つのコマンドでS3からDelta Lakeテーブルへのデータロードが完結します。

5. 最後に

今回試した一連のプロセスを通じて、DatabricksとS3を連携させたデータレイクハウスの基本的な構築・運用フローを実践することができました。

  • 外部テーブルは、データレイク上の生データへの手軽なアクセス手段として有効です。
  • Delta Lakeは、生データを信頼性とパフォーマンスに優れた形式に変換し、MERGE による高度な差分更新や、今回試した COPY INTO による効率的な追加ロードを可能にする、データパイプラインの中核技術となります。
  • COPY INTO を利用する際は inferSchema オプションを活用することで、スキーマの不一致に起因するエラーを簡単に回避し、堅牢なデータロード処理を実現できます。
  • データを「生データ」と「加工済みデータ」で物理的に分離して管理するアプローチは、DatabricksではSQLの LOCATION 句で容易に実現できます。

データ更新の各ステップが手動実行であるため、より高い頻度でのデータ更新や自動化が求められる場合は、次のステップとして**ストリーミングテーブル(Delta Live Tables)**のような自動化フレームワークの導入が有効となります。


6. 参考資料

  1. Databricks Free Editionでユーザー管理のS3に対する外部テーブル作成・Delta Lakeとしてのデータロードを試してみた
  2. DatabricksでS3に対する外部ロケーションを作成する手順(YouTube動画)

Discussion