🙆♀️
Auto LoaderとジョブでETLパイプラインを構築する
1. 事前準備
ハンズオンを開始する前に、Databricksのワークスペース上で以下の準備が必要です。
-
カタログとスキーマ: データを格納するためのカタログとスキーマが作成済みであること。(動画では
hello
カタログ内のtest
スキーマを使用) -
ボリューム: データソースファイルとチェックポイント情報を格納するためのボリュームが2つ作成済みであること。(動画では
hello.test
スキーマ配下にvol
とcheckpoint
の2つのボリュームを使用) - サンプルデータファイル: ボリュームにアップロードするサンプルデータ(JSON形式)がローカルに準備されていること。
2. 目的
Databricksプラットフォームを利用して、インクリメンタルな(増分)データロードを行うETLパイプラインを構築し、それを自動化するまでの一連のプロセスを試すことを目的とします。
具体的には、以下の項目を試しました。
- Auto Loaderを使用した、ボリュームからの継続的なデータ取り込み設定。
- 取り込んだデータを元にしたDelta Lakeテーブルの自動作成。
- 作成したノートブックをジョブとしてスケジュール登録し、ETL処理を自動化する手順。
3. 実行環境
- プラットフォーム: Databricks Free Edition
- データソース: Databricks Volumesに格納されたJSONファイル
4. 試してみた
ステップ1: データ取り込みノートブックの作成と設定
次に、ETL処理を記述するためのノートブックを作成し、Auto Loaderの設定を行います。
1-1. ノートブックの作成
- UI左側ナビゲーションから
新規
>ノートブック
を選択し、新しいノートブックを開きます。
1-2. データソースの準備
-
カタログ
エクスプローラーを開き、事前準備で作成したデータソース用ボリューム(例:vol
)に移動します。 -
このボリュームにアップロード
ボタンをクリックし、サンプルデータとなるJSONファイル(例:test.json
)をアップロードします。
1-3. Auto Loaderのコード設定
チュートリアルのPythonコードをコピーし、ノートブックのセルに貼り付けます。その後、ご自身の環境に合わせて3つの変数を変更します。
実行コード(変数変更前):
# Import functions
from pyspark.sql.functions import col, current_timestamp
# Define variables used in code below
file_path = "/databricks-datasets/structured-streaming/events/"
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
table_name = f"{username}_etl_quickstart"
checkpoint_path = f"/tmp/{username}/_checkpoint/etl_quickstart"
# Clear out data from previous demo execution
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
dbutils.fs.rm(checkpoint_path, True)
# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
.select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True)
.toTable(table_name))
変更箇所:
-
file_path
: ステップ2-2でサンプルデータをアップロードしたボリュームのパスに変更します。パスはカタログエクスプローラーからコピーできます。-
変更後 (例):
file_path = "/Volumes/hello/test/vol"
-
変更後 (例):
-
checkpoint_path
: チェックポイントを保存するボリュームのパスに変更します。-
変更後 (例):
checkpoint_path = "/Volumes/hello/test/checkpoint"
-
変更後 (例):
-
table_name
: 作成するDelta Lakeテーブルを完全修飾名(カタログ名.スキーマ名.テーブル名
)で指定します。-
変更後 (例):
table_name = "hello.test.emp"
-
変更後 (例):
ステップ2: ETLパイプラインの実行と結果確認
設定が完了したら、セルを実行してAuto Loaderを起動し、データが正しく処理されたかを確認します。
2-1. パイプラインの実行
ノートブックのセルを実行します。Auto Loaderが file_path
で指定された場所を監視し、新しいJSONファイル(test.json
)を検知してDelta Lakeテーブル(hello.test.emp
)への取り込み処理を開始します。
2-2. 実行結果の確認
-
カタログエクスプローラーでの確認:
-
checkpoint
ボリューム内に、処理状況を管理するためのメタデータファイルが自動生成されていることを確認します。 -
hello.test
スキーマ配下に、指定したテーブル名(emp
)で新しいテーブルが作成されていることを確認します。 - 作成されたテーブルを選択し、
サンプルデータ
タブを開くと、JSONファイルの内容がテーブル形式で表示されます。Auto Loaderが自動で追加したメタデータ列(_rescued_data
,source_file
など)も確認できます。
-
-
ノートブックでのデータ確認:
- ノートブックに新しいセルを追加し、以下のコードを実行して、作成されたテーブルのデータをDataFrameとして表示・確認します。
実行クエリ:
df = spark.read.table("hello.test.emp") display(df)
ステップ3: ジョブのスケジューリング設定
最後に、このノートブックを定期的に自動実行するためのジョブを作成します。
- ノートブック画面の右上にある
スケジュール
ボタンをクリックします。 -
スケジュールを追加
をクリックし、表示されたダイアログで以下の項目を設定します。-
ジョブ名: 任意のジョブ名を入力します(例:
turial5
)。 -
スケジュール:
Simple
を選択し、実行頻度を設定します。 -
スケジュール (Advanced):
Advanced
を選択すると、cron構文でより詳細なスケジュール(例: 毎日の21:45に実行)が設定できます。 -
クラスター: ジョブの実行に使用するクラスターを選択します(例:
サーバーレス
)。
-
ジョブ名: 任意のジョブ名を入力します(例:
-
作成
ボタンをクリックします。
ステップ4: スケジュールされたジョブの確認
作成されたジョブは「ジョブとパイプライン」機能で管理されます。
- UI左側ナビゲーションから
ジョブとパイプライン
をクリックします。 - 作成したジョブ(
turial5
)が一覧に表示されていることを確認します。 - ジョブ名をクリックすると詳細画面が開き、過去の実行履歴や、
スケジュールとトリガー
の設定内容を確認・編集できます。トリガータイプは「スケジュール済み」の他に、特定のファイル到着を検知して実行する「ファイル到着」なども選択可能です。
5. 最後に
今回のチュートリアルを通じて、Databricksの基本的なETLパイプライン構築の流れを体験しました。
- Auto Loader を利用することで、データレイク(今回はVolumes)に到着する新しいファイルを継続的かつ効率的に検知し、テーブルに取り込むことができます。
- Delta Lake をテーブルフォーマットとして使用することで、スキーマの自動推論や進化への対応、信頼性の高いデータ管理が容易になります。
- Databricksの ワークフロー(ジョブ)機能 を使うことで、作成したETLノートブックをGUIベースで簡単にスケジュール実行でき、データパイプラインの自動化を実現できます。
これらの機能を組み合わせることで、手動での介入を最小限に抑えた、堅牢でスケーラブルなデータパイプラインを迅速に構築することが可能です。
Discussion