💭

Databricksでパイプラインを構築する2つの方法

に公開

はじめに

近年、企業における生成AIの導入が加速しています。しかし、ChatGPTのような大規模言語モデル(LLM)をそのまま利用するだけでは、企業固有の専門用語や社内文書に基づいた正確な回答を得ることが難しいという課題があります。
そこで注目されているのが「RAG(Retrieval-Augmented Generation)」というアプローチです。

RAGでは、外部の信頼できる情報源を検索・抽出し、それをLLMに与えて回答を生成することで、高精度な応答を実現します。
このRAGを実運用に落とし込むためには、安定した「データパイプライン」の構築が不可欠です。特に、企業内に点在するデータソースを統合・加工し、効率的にAIに供給できる基盤が求められています。

そこで今回は、Databricksを活用したパイプラインの構築する方法を紹介します。

Databricksとは?

Databricks は、エンタープライズ レベルの大規模なデータ分析やAIソリューションを構築、デプロイ、共有、保守するための、統合されたオープンな分析プラットフォームであり、様々なデータソースからデータを取り込み、用途に合わせてデータを処理するパイプラインを簡単に構築することができます。

Databricksを使ってデータパイプラインを構築するには主に、Lakeflow宣言型パイプラインとDatabricksプラットフォーム上のApache Sparkを用いる方法の2つがあります。
この記事では、これらのデータパイプラインを構築する2つの方法を公式のチュートリアルを参考に紹介します。

ETL Lakeflow宣言型パイプラインを使用してパイプラインを構築する

ETL Lakeflow宣言型パイプラインは、SQL と Pythonでバッチおよびストリーミング処理を行う、データパイプラインを開発するための宣言型フレームワークです。(従来はDLTやDelta Live Tablesと呼ばれていた機能がETL Lakeflow宣言型パイプラインに統合されました)
実際に、公式のチュートリアルを参考に、Lakeflow宣言型パイプラインを構築していきます。

前提条件

  • ワークスペースで Unity Catalog が有効になっていること
  • サーバレス コンピュートをアカウントで有効になっていること
  • サーバレス Lakeflow 宣言型パイプラインが有効な地域であること
  • コンピュート リソースを作成する権限、またはコンピュート リソースにアクセスする権限を持っていること
  • カタログに新しいスキーマを作成する権限があること(ALL PRIVILEGES または USE CATALOG および CREATE SCHEMA)
  • 既存のスキーマに新しいボリュームを作成するアクセス許可があること(ALL PRIVILEGES または USE SCHEMA および CREATE VOLUME)

ステップ1:パイプラインの作成

  1. ワークスペースで、サイドバーの[ジョブとパイプライン]をクリックします
    image.png

  2. 新規作成セクションの、[ ETL パイプライン ] をクリックします
    image.png

  3. パイプライン名を入力し、サーバレスのチェックボックスを選択します
    image.png

  4. 配信先でテーブルが公開されるUnity Catalogを選択し、必要であれば、スキーマに新しい名前を入力し、カタログに新しいスキーマを作成します
    image.png

  5. [作成]をクリックし、続けて表示されるポップアップの[パイプラインを作成]を選択します
    image.png

  6. 自動的にパイプラインUI が表示されます。
    image.png

ステップ2:パイプラインの開発

Databricksでは、ノートブックを使用して、Lakeflow宣言型パイプラインのソース コードを対話形式で開発および検証することができます。

  1. パイプラインUIのパイプラインの詳細セクションの[ソースコード]のリンクをクリックしてノートブックを開きます
    image.png

  2. 右上に出てくるポップアップメッセージの[接続]を選択して、作成したパイプラインにノートブックを接続します
    image.png

  3. 今回はSQLで開発するため、左上部分の[python]をクリックし、[SQL]を選択し、ポップアップメッセージの[確認]を選択します
    image.png

  4. 以下のコードをセルに入力し、ストリーミングテーブルを作成します
    ストリーミングテーブルは、クラウド ストレージに新たに届いたデータファイルからデータを取り込むことができる Delta テーブルです。
    今回は、databricks ファイルシステム(DBFS)上のディレクトリをステージング場所として、ファイルが追加され次第、データを取り込む設定をしています。

    SQL
    -- Define a streaming table to ingest data from a volume
    CREATE OR REFRESH STREAMING TABLE songs_raw
    (
     artist_id STRING,
     artist_lat DOUBLE,
     artist_long DOUBLE,
     artist_location STRING,
     artist_name STRING,
     duration DOUBLE,
     end_of_fade_in DOUBLE,
     key INT,
     key_confidence DOUBLE,
     loudness DOUBLE,
     release STRING,
     song_hotnes DOUBLE,
     song_id STRING,
     start_of_fade_out DOUBLE,
     tempo DOUBLE,
     time_signature INT,
     time_signature_confidence DOUBLE,
     title STRING,
     year INT,
     partial_sequence STRING,
     value STRING
    )
    COMMENT "Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks."
    AS SELECT *
    FROM STREAM read_files(
    '/databricks-datasets/songs/data-001/');
    

    image.png

  5. 次に、以下のコードを追加して、マテリアライズドビューを作成します
    マテリアライズドビューは、クエリごとに結果を再計算する標準ビューとは異なり、結果をキャッシュし、指定した間隔で更新します。
    ここでは、ストリーミングテーブルのtitle列をsong_title列へと列名の変更を行っています。
    また、3つの列に列制約を設定することによって、データの品質を高めています。

    SQL
    -- Define a materialized view that validates data and renames a column
    CREATE OR REFRESH MATERIALIZED VIEW songs_prepared(
    CONSTRAINT valid_artist_name EXPECT (artist_name IS NOT NULL),
    CONSTRAINT valid_title EXPECT (song_title IS NOT NULL),
    CONSTRAINT valid_duration EXPECT (duration > 0)
    )
    COMMENT "Million Song Dataset with data cleaned and prepared for analysis."
    AS SELECT artist_id, artist_name, duration, release, tempo, time_signature, title AS song_title, year
    FROM songs_raw;
    

    image.png

  6. 最後に、以下のコードを追加してユーザーが扱いやすい状態にデータを整形したマテリアライズドビューを作成します
    ここでは、必要なカラムのみに射影し、必要なデータのみを選択し、集約しています。
    このような、データを取り込んで、クレンジングし、加工するような3段階のパイプラインの構成は、メダリオンアーキテクチャと呼ばれ、データパイプラインの設計において、広く採用されています。

    SQL
    -- Define a materialized view that has a filtered, aggregated, and sorted view of the data
    CREATE OR REFRESH MATERIALIZED VIEW top_artists_by_year
    COMMENT "A table summarizing counts of songs released by the artists each year, who released the most songs."
    AS SELECT
     artist_name,
     year,
     COUNT(*) AS total_number_of_songs
    FROM songs_prepared
    WHERE year > 0
    GROUP BY artist_name, year
    ORDER BY total_number_of_songs DESC, year DESC
    

    image.png

  7. 作成したパイプラインを検証します
    右上の[検証]を選択し、パイプラインを起動する前に、作成したパイプラインが正常に動くかどうかを確認します。
    image.png
    特にエラーなくパイプラインを定義できることが確認できます。
    image.png

  8. 作成したパイプラインの更新を開始します
    右上の[起動]を選択し、パイプラインを実際に起動します。
    成功すると、緑色のチェックマークと処理された行が表示されます。
    image.png

ステップ3:パイプラインをスケジュールする

  1. パイプラインを起動するジョブをスケジュールし、定期的にパイプラインを実行します
    右上の[スケジュール]から、[Add schedule]を選択し、実行したい周期を設定します。
    image.png
    ここでは、周期実行していますが、新しいデータファイルが到着次第実行することも可能です。
  2. スケジュールされたジョブの状態を確認します
    [ジョブとパイプライン]をクリックし、パイプラインと同名のジョブを選択します。
    image.png
    周期的にパイプラインが成功していることが確認できます。
    image.png

Databricksプラットフォーム上の Apache Spark を使用して ETL パイプラインを構築する

次に、Databricksの公式のチュートリアルを参考に、Apache Sparkを使用して、外部ストレージからDelta Lakeにデータを取り込む部分のパイプラインを構築します。

必要条件

  • クラスターを作成する権限があること

ステップ1: クラスターを作成する

パイプラインを実行するクラスターを作成します。

  1. サイドバーの[コンピュート]をクリックします
    image.png
  2. コンピューティングを作成 をクリックします
    image.png
  3. クラスター名を入力し、他の設定はデフォルトのままで[作成]を選択します
    image.png

ステップ2: Databricks ノートブックを作成する

  1. サイドバーの[新規]をクリックし、[ノートブック]を選択します
    image.png
  2. 左上の入力ボックスをダブルクリックして、ノートブックの名前を設定します
    image.png
  3. 今回はPythonを用いてパイプラインを構築するので、左上のプルダウンリストからPyhonを選択します
    image.png

ステップ3: Delta Lakeにデータを取り込むようにAuto Loaderを設定する

Delta Lakeとは、Databricksのレイクハウスのテーブルの基盤を提供する最適化されたストレージレイヤーです。テーブルのデータをデータファイルと、トランザクションログで拡張し、ACIDトランザクションとスケーラブルな処理を実現しています。
Databricks上のすべてのテーブルは、デフォルトによってDeltaテーブルで作成されるので、実装時に特別な書き方を行う必要はありません。
また、Databricksでは、増分データを取り込む際には、Auto Loaderを採用することが推奨されています。
Auto Loaderとは、新しいデータ ファイルがクラウド ストレージに到着すると、段階的かつ効率的に処理を行う機能です。実は、前述のLakeflow宣言型パイプラインのストリーミングテーブルは内部でAuto Loaderを使用しています。
では、実際にこれらのコンポーネントを用いて、Delta Lakeにデータを取り込む処理を実装してみましょう。

  1. ステップ2で作成したノートブックのセルに以下のコードを張り付けて、セル左上の「セルを実行を」選択します

    python
    # Import functions
    from pyspark.sql.functions import col, current_timestamp
    
    # Define variables used in code below
    file_path = "/databricks-datasets/structured-streaming/events"
    username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
    table_name = f"{username}_etl_quickstart"
    checkpoint_path = f"/tmp/{username}/_checkpoint/etl_quickstart"
    
    # Clear out data from previous demo execution
    spark.sql(f"DROP TABLE IF EXISTS {table_name}")
    dbutils.fs.rm(checkpoint_path, True)
    
    # Configure Auto Loader to ingest JSON data to a Delta table
    (spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "json")
      .option("cloudFiles.schemaLocation", checkpoint_path)
      .load(file_path)
      .select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
      .writeStream
      .option("checkpointLocation", checkpoint_path)
      .trigger(availableNow=True)
      .toTable(table_name))
    

    image.png

    簡単に、コードを説明します。

    • ここでは、databricks ファイルシステム(DBFS)上のディレクトリ(/databricks-datasets/structured-streaming/events)をステージング場所として、ファイルが追加され次第、データを取り込む設定をしています
      • このディレクトリには50個のJSONファイルが用意されています
    • current_user()という関数は、Databricksのユーザー名(xxx@domain.com)を取得できますが、この文字列に含まれる「@」や「.」はtable_nameとして扱えないので、「_」にregexp_replace()を使用して置換しています
    • Sparkを用いたストリーミング処理では、readStreamという関数を用います
      • format
        • cloudFilesを指定することで、Auto Loaderを使用することができます
      • option
        • cloudFiles.formatにjsonを指定することで、JSONファイルのみをインジェスト対象することができます
        • cloudFiles.schemaLocationに適切なパスを指定することで、推論されたスキーマとその後の変更を格納することができます
        • checkpointLocationを指定すると、処理がどこまで実行されたかを管理することができ、ステートフルな処理が可能になります
      • select
        • Delta Tableにingestする列を設定します
        • データのソースファイルパスやインジェストされた時刻を列として保存します
        • これらのメタデータを保存することで、データの来歴を可視化することができ、後続の処理でデータの追跡が可能になります
      • trigger
        • availableNow=Trueを指定することで、前回のバッチから新しく到着したファイルだけを全て消費し、処理を終了します
        • バッチジョブとしてこのNotebookをDatabricksジョブでスケジュールすることがで、すべてのファイルを処理することができます
      • writeStream.toTable()
        • 指定された場所にテーブルを作成し、データをインジェストします
  2. 作成したテーブルにデータが取り込まれているか確認します

    python
    df = spark.read.table(table_name)
    display(df.count())
    

    image.png
    10万行がDelta Tableに取り込まれたことが確認できます。

ステップ4: ジョブをスケジュールする

次に、作成したDatabricksノートブックをDatabricksジョブのタスクとして追加し、定期実行します。

  1. ヘッダーバーの右側にある「 スケジュール 」をクリックし、「Job name*」に一意となる名前を入力します
    image.png

  2. 実行間隔を狭めるため、「Advanced」を選択し、「Schedule」に5分毎と設定し、「create」を選択します
    所定時間は、現在時刻の分の値を設定することで、現在から5分後に初回のジョブが実行されるように設定しています。
    image.png

  3. jobが作成されると、ノートブック右上の「スケジュール」に作成したジョブが表示されます
    image.png

  4. 作成したjob名をクリックすると、jobの管理画面に移動することができ、作成したジョブが定期的に実行されていることが分かります
    image.png

最後に

今回は、DatabricksにおけるETLパイプラインの作成方法を2つのチュートリアルを参考に実践してみました。
ETL Lakeflow宣言型パイプラインでは、Apache Sparkを意識することなくETLパイプラインを構築することができました。また、Apache Spark を使用した ETL パイプラインでは、ノートブックに処理を記述し、ジョブとしてノートブックをスケジュールすることでパイプラインを実行することができました。
所感としては、ETL Lakeflow宣言型パイプラインは、UIも見やすく、簡単にエラーログを確認できる部分が良いと感じました。対して、Apache Spark を使用した ETL パイプラインは、SQLを使ったETL Lakeflow宣言型パイプラインよりも、選べるオプションが多く、より細かい設定が可能である点が良いと感じました。

次回はこの2つの構築方法の選択基準をまとめたいと考えています。

お楽しみに!

参考文献

https://qiita.com/taka_yayoi/items/8511ef61498a31734b00
https://docs.databricks.com/aws/ja/dlt/concepts
https://docs.databricks.com/aws/ja/getting-started/data-pipeline-get-started
https://docs.databricks.com/aws/ja/getting-started/etl-quick-start
https://docs.databricks.com/aws/ja/dlt/load
https://docs.databricks.com/aws/ja/dlt/streaming-tables
https://docs.databricks.com/aws/ja/ingestion/cloud-object-storage/auto-loader/
https://docs.databricks.com/aws/ja/dlt/materialized-views
https://docs.databricks.com/aws/ja/delta/
https://docs.databricks.com/aws/ja/ingestion/cloud-object-storage/auto-loader/options
https://docs.databricks.com/aws/ja/structured-streaming/triggers
https://qiita.com/taka_yayoi/items/df143647dcf5942b51c6

Discussion