DatabricksとS3連携の基本:外部テーブルによるデータロードとDelta Lake化
1. 事前準備
AWS側でS3バケットが作成済みであることが必要です。(今回は s3://databricks-external-table-test/ を使用)
2. 目的
Databricksプラットフォームを利用して、外部クラウドストレージ(AWS S3)に格納された生データ(CSV)を活用するための一連のプロセスを試し、その手順を確認することを目的とします。
具体的には、以下の項目を試しました。
- DatabricksからS3バケットへのセキュアな接続設定。
- S3上のCSVデータに対する**外部テーブル(External Table)**の作成とデータアクセス。
- 外部テーブルから、より高機能で信頼性の高いDelta Lakeテーブルへのデータ変換およびロード。
- データソースに新しいファイルが追加された際の、テーブルへのデータ追加・更新プロセス。
3. 実行環境
- プラットフォーム: Databricks Free Edition
-
外部ストレージ: AWS S3 (ユーザー管理バケット:
s3://databricks-external-table-test/)
4. 試してみた
以降のステップで登場するSQLクエリは、すべてDatabricksのSQL Editor上で実行しました。
ステップ1: DatabricksとS3の連携設定(外部ロケーション作成)
まず、DatabricksがS3バケットに安全にアクセスするための設定として「外部ロケーション(External Location)」を作成しました。この操作はDatabricksのGUIから行いました。
実行手順の概要:
- DatabricksのUIから
カタログ>外部データ>外部ロケーションを作成を選択。 -
クイックスタートオプションを選択し、対象のS3バケット名を入力。 - 画面の指示に従い、Databricksが生成した情報を用いてAWSコンソール上でCloudFormationスタックを作成。
- このプロセスにより、Databricksが必要とするIAMロールなどがAWSアカウント内に自動でプロビジョニングされます。
この設定により、Databricksワークスペースが指定のS3バケットに対してデータの読み書きを行う権限を獲得しました。
ステップ2: S3上のデータに対する外部テーブルの作成
S3バケットの raw_data/ フォルダに配置した商品データCSV (sample_products.csv) をDatabricksから直接クエリできるようにするため、外部テーブル my_raw_products_table を作成しました。
実行クエリ:
-- S3上のCSVファイルを指し示す外部テーブルを作成
CREATE TABLE my_raw_products_table (
product_id INT,
product_name STRING,
category STRING,
price DOUBLE,
sales_date DATE,
is_available BOOLEAN
)
USING CSV
OPTIONS (
header = "true",
delimiter = ","
)
LOCATION 's3://databricks-external-table-test/raw_data/';
ステップ3: テーブルタイプの確認
作成したテーブルが意図通り「外部テーブル」として認識されているかを確認しました。
実行クエリ:
-- テーブルの詳細情報を表示
DESCRIBE EXTENDED my_raw_products_table;
結果:
- 出力結果の
Type項目がEXTERNALとなっており、外部テーブルとして正しく作成されたことを確認しました。 -
Location項目には、ステップ2で指定したS3パスが表示されました。
ステップ4: Delta Lakeテーブルへのデータ変換とロード
CSVベースの外部テーブルからデータを読み込み、Delta Lake形式の新しいテーブル my_delta_table を作成しました。
実行クエリ:
-- my_raw_products_tableのデータを元に、新しいDelta Lakeテーブルを作成
CREATE OR REPLACE TABLE my_delta_table
USING DELTA
LOCATION 's3://databricks-external-table-test/delta_tables/my_products_delta/'
AS SELECT * FROM my_raw_products_table;
ステップ5: データの追加とテーブルの更新 (外部テーブル経由)
データソースに新しいCSVファイル (products_add.csv) が追加された場合の更新プロセスを試しました。
5-1. 外部テーブルへの変更反映
S3の raw_data/ フォルダに新しいCSVファイルをアップロードした後、REFRESH TABLE コマンドで外部テーブルにその変更を認識させました。
実行クエリ:
-- S3フォルダの最新の状態をテーブルに反映
REFRESH TABLE my_raw_products_table;
5-2. 更新結果の確認
テーブルの行数を確認し、新しいファイルのデータが正しく追加されたことを検証しました。
実行クエリ:
-- 行数を確認
SELECT COUNT(*) FROM my_raw_products_table;
ステップ6: COPY INTOによるDelta Lakeテーブルへの直接データロード
ステップ5とは別のアプローチとして、COPY INTO コマンドを使い、外部テーブルを経由せずにS3上の新しいデータを直接Delta Lakeテーブルにロードする方法を試します。この方法は、インクリメンタルなデータロードをより効率的かつ冪等(べきとう)に(=何回実行しても結果が同じになるように)行うのに適しています。
6-1. 新しいデータファイルの準備
S3の raw_data/ フォルダに、さらに新しいデータファイル (products_add_2.csvなど、まだ読み込んでいないファイル) をアップロードします。
6-2. COPY INTOの実行とスキーマ不一致エラーへの対処
COPY INTO をそのまま実行すると、[DELTA_FAILED_TO_MERGE_FIELDS] のようなエラーが発生することがあります。これは、コマンドのデフォルト動作ではCSVのすべての列を文字列(STRING)として読み込もうとするため、ロード先のDeltaテーブルで定義されている数値型(INT)や日付型(DATE)と型が一致しないことが原因です。
この問題を解決する最も簡単な方法は、FORMAT_OPTIONS に 'inferSchema' = 'true' を追加することです。このオプションを指定すると、DatabricksがCSVファイルの内容をスキャンし、各列のデータ型を自動的に推測(infer)してくれます。これにより、スキーマの不一致が解消され、スムーズにデータをロードできます。
実行クエリ:
-- S3の指定フォルダからDelta Lakeテーブルにデータをコピー
COPY INTO my_delta_table
FROM 's3://databricks-external-table-test/raw_data/'
FILEFORMAT = CSV
FORMAT_OPTIONS (
'header' = 'true',
'inferSchema' = 'true' -- CSVのスキーマを自動推論させる
);
ポイント: 'inferSchema' = 'true' を追加することで、Databricksはproduct_id列を数値として、sales_date列を日付として認識し、Deltaテーブルのスキーマと正しく一致させることができます。また、COPY INTOはロード済みのファイルを記憶しているため、このコマンドを再度実行しても同じファイルが二重にロードされることはありません。
6-3. 更新結果の確認
my_delta_table の行数を確認し、products_add_2.csv のデータが正しく追加されたことを検証します。
実行クエリ:
-- ロード後の行数を確認
SELECT COUNT(*) FROM my_delta_table;
この方法では、REFRESH TABLE を実行する必要がなく、1つのコマンドでS3からDelta Lakeテーブルへのデータロードが完結します。
5. 最後に
今回試した一連のプロセスを通じて、DatabricksとS3を連携させたデータレイクハウスの基本的な構築・運用フローを実践することができました。
- 外部テーブルは、データレイク上の生データへの手軽なアクセス手段として有効です。
-
Delta Lakeは、生データを信頼性とパフォーマンスに優れた形式に変換し、
MERGEによる高度な差分更新や、今回試したCOPY INTOによる効率的な追加ロードを可能にする、データパイプラインの中核技術となります。 -
COPY INTOを利用する際はinferSchemaオプションを活用することで、スキーマの不一致に起因するエラーを簡単に回避し、堅牢なデータロード処理を実現できます。 - データを「生データ」と「加工済みデータ」で物理的に分離して管理するアプローチは、DatabricksではSQLの
LOCATION句で容易に実現できます。
データ更新の各ステップが手動実行であるため、より高い頻度でのデータ更新や自動化が求められる場合は、次のステップとして**ストリーミングテーブル(Delta Live Tables)**のような自動化フレームワークの導入が有効となります。
Discussion