🛰️

Dataform と BigQuery ML で実現するテキスト分析パイプラインの構築

2024/12/13に公開

はじめに

こんにちは。クラウドエース第三開発部の泉澤です。
本記事では、Google Cloud のサービスである Dataform と BigQuery ML を使って、LLM による「テキスト分析パイプライン」を構築する方法をシェアします。
業務で BigQuery ML を使ってテキスト分析を実施した経験があるのですが、プロンプトや LLM のパラメータを設定する必要があるため、他の処理も同じクエリで行おうとするとクエリが複雑になり、可読性が下がりやすいと感じました。
Dataform を使えばクエリを分割しつつも、一連のワークフローとして管理でき便利そうだと思い、この記事を書くきっかけとなりました。

対象読者

  • Dataform を使用して、クエリの依存関係をワークフローとして管理したい方。
  • BigQuery ML のリモートモデルを用いた LLM の活用方法を知りたい方。

なお、後半の実装パートでは以下のスキルを前提としています

  • 基本的な SQL の知識
  • BigQuery や GCP の基本操作経験(GCP のプロジェクト作成、API有効化など)
  • データパイプライン構築や機械学習プロジェクトに関する基礎的な理解

前提知識の説明(Dataform、BigQuery ML)

Dataform とは?

Dataformとは、Google Cloud の BigQuery を利用してデータ変換パイプラインを開発・運用するためのツールです。

主な機能は次のとおりです。

  • クエリの依存関係管理: クエリ同士の依存関係を管理し、一連の処理を効率的に実行できます。
  • Git 連携によるバージョン管理とチームコラボレーション: GitHub や GitLab と連携して、コードのバージョン管理やチームでの共同作業が容易になります。
  • ワークフローによる詳細なスケジュール設定と運用管理: ワークフロー構成を使って、処理の実行タイミングや実行時のアクションを設定できます。
  • アサーションによるテストの実施: SQLX ファイル(SQL の拡張ファイル)にアサーションを定義し、データの品質を自動的にチェックできます。

Dataform の最大の特徴は、データ変換クエリの依存関係を管理できることです。通常、複数のクエリを組み合わせて分析を行う場合、分析時間などを考慮して、各クエリのスケジュール設定を個別に行う必要があります。
しかし、Dataform では ${ref("[リソース名]")} のように前のクエリ結果を参照することで依存関係を定義でき、一連のワークフローとして管理できます。これにより、複雑なスケジュール設定を一度に済ませることができ、作業効率が大幅に向上します。
さらに、テーブルや処理の依存関係を可視化できるため、保守性や可読性も向上します。

本記事後半の実装パートでは、紹介した機能のうち、依存関係管理とワークフローによるスケジュール設定に触れます。

BigQuery ML のリモートモデルとは?

BigQuery ML とは、SQL のみで機械学習モデルを構築することができるサービスです。これにより、データサイエンスの専門知識がなくても比較的簡単に機械学習を活用することができます。

BigQuery ML では Vertex AI エンドポイント や リモートサービスタイプ(Vertex AI が提供する LLM や Cloud Natural Language API)を呼び出す際に、リモートモデルとして作成し、使用します。

BigQuery ML から LLM を使用してテキストを生成する際には、ML.GENERATE_TEXT 関数を使用します。テーブルでリクエストの対象を指定できるため、大量のデータに対し同一のシステムプロンプトで生成タスクを実行したいという時には BigQuery ML の利用はとても便利だと感じます。

実際には生成テキストだけでなく、生成プロセスに関するメタデータも返されます。

実装パートでは、リモートモデルの作成方法と、リモートモデルを使ったクエリの書き方を紹介します。

実装

前提

今回はとある家電メーカーの自社製品に関するオンラインレビューの分析システムを想定し、Dataform と BigQuery ML でパイプラインを構築します。
システム構成図は以下の通りになります。
逐次、外部からカスタマーレビューテーブルを取り込み、週次でそれを分析することを想定しています。

事前準備

前提として gcloud コマンド、bq コマンドが使える状態になっていることを確認してください。

  1. データを用意する

GPT にサンプルデータを生成してもらい、BigQuery にアップロードしました。

customer_reviews_raw.csv
review_id,customer_id,product_id,review_text,review_date,star_rating
"R1","C1","P1","音質が非常に良く、バッテリー持ちも満足です。長時間の通勤でも問題ありません。","2024-12-01",5
"R2","C2","P2","写真の画質が思ったよりも悪く、暗所での撮影に弱いです。","2024-12-03",2
"R3","C3","P3","最新のアプリもサクサク動きますが、発熱が気になります。","2024-11-30",4
"R4","C4","P4","軽量で持ち運びに便利です。バッテリーの持ちも良く、仕事用として最適。","2024-12-05",5
"R5","C5","P5","デザインがスタイリッシュで、機能も豊富。ただしバンドが少し硬い。","2024-12-02",4
"R11","C11","P1","接続が頻繁に切れるので、ストレスがたまります。サポートもイマイチ。","2024-12-01",2
"R12","C12","P2","一週間で故障しました。サポートの対応も遅く、がっかりです。","2024-12-05",1
"R13","C13","P3","画面が大きくて見やすいですが、バッテリーの減りが早いです。","2024-12-04",3
"R14","C14","P4","パフォーマンスが高く、動画編集もスムーズです。ファンの音が少し気になります。","2024-12-02",4
"R15","C15","P5","健康管理に役立っています。睡眠トラッキングが特に良いです。","2024-12-03",5
customer_reviews_raw_schema.json
[
    {"name": "review_id", "type": "STRING", "mode": "REQUIRED"},
    {"name": "customer_id", "type": "STRING", "mode": "REQUIRED"},
    {"name": "product_id", "type": "STRING", "mode": "REQUIRED"},
    {"name": "review_text", "type": "STRING", "mode": "REQUIRED"},
    {"name": "review_date", "type": "DATE", "mode": "REQUIRED"},
    {"name": "star_rating", "type": "INTEGER", "mode": "REQUIRED"}
]
product.csv
product_id,product_name
"P1","ワイヤレスイヤホン"
"P2","デジタルカメラ"
"P3","スマートフォン"
"P4","ノートパソコン"
"P5","スマートウォッチ"
product_schema.json
[
    {"name": "product_id", "type": "STRING", "mode": "REQUIRED"},
    {"name": "product_name", "type": "STRING", "mode": "REQUIRED"}
]

bq コマンドを使ってデータセットを作成し、データをアップロードします。

bq mk --dataset --location=asia-northeast1 sample_data

bq load \
--skip_leading_rows=1 \
--source_format=CSV \
--time_partitioning_field=review_date \
--time_partitioning_type=DAY \
sample_data.customer_reviews_raw \
customer_reviews_raw.csv \
customer_reviews_raw_schema.json

bq load \
--skip_leading_rows=1 \
--source_format=CSV \
sample_data.product \
product.csv \
product_schema.json
  1. BigQuery ML のリモートモデルを用意する

続いて BigQuery ML のリモートモデルを用意します。
まず、Vertex AI API を有効化します。

gcloud services enable aiplatform.googleapis.com

外部接続を作成します。

bq mk --connection --location=asia-northeast1 \
    --connection_type=CLOUD_RESOURCE remote_model_connection

サービスアカウント ID を取得してコピーしておきます。

bq show --connection PROJECT_ID.REGION.remote_model_connection

上記で取得したサービスアカウントに Vertex AI ユーザー ロールを追加します。

gcloud projects add-iam-policy-binding [PROJECT_ID] \
    --member="serviceAccount:[SERVICE_ACCOUNT_EMAIL]" \
    --role="roles/aiplatform.user"

リモートモデルを格納する BigQuery データセットを用意します。

bq mk --dataset --location=asia-northeast1 remote_model

リモートモデルを作成するクエリを実行します。

bq query --use_legacy_sql=false \
'CREATE MODEL `remote_model.gemini_1_5_flash_001`
REMOTE WITH CONNECTION `asia-northeast1.remote_model_connection`
OPTIONS(endpoint = "gemini-1.5-flash-001");'

指定したデータセットにモデルができていれば完了です。
これで SQL から LLM へのリクエストが可能になりました!

  1. Dataform での開発を準備する

Dataform リポジトリとワークスペースを作成します。リポジトリには、SQL ワークフローを構成する SQLX・JavaScript ファイルや、Dataform の構成ファイルなどが格納されます。一方、ワークスペースはGitのブランチに似た役割を持ち、各メンバーが個別にワークスペースを作成することで、独立して作業を進めることができます。
BigQuery > Dataform > CREATE REPOSITORY を選択し、リポジトリ ID・リージョンを入力します。

作成するとサービスアカウントに対してロールを付与するように出てくるので、「すべて付与」を選択し、リポジトリに移動します。

但し、これだけですと、BigQuery へのリソースにアクセスできないため、さらにロールを付与する必要があります。
今回はリモートモデルを使用しているので BigQuery コネクションユーザーも付与しています。

gcloud projects add-iam-policy-binding [PROJECT_ID] \
    --member="serviceAccount:[SERVICE_ACCOUNT_EMAIL]" \
    --role="roles/bigquery.dataEditor"

gcloud projects add-iam-policy-binding [PROJECT_ID] \
    --member="serviceAccount:[SERVICE_ACCOUNT_EMAIL]" \
    --role="roles/bigquery.dataViewer"

gcloud projects add-iam-policy-binding [PROJECT_ID] \
    --member="serviceAccount:[SERVICE_ACCOUNT_EMAIL]" \
    --role="roles/bigquery.connectionUser"

リポジトリにアクセスしたら次は「CREATE DEVELOPMENT WORKSPACE」を選択し、ワークスペース ID を入力します。

ワークスペースに入ったら「ワークスペースを初期化」を選択してください。デフォルトのフォルダとファイルが作成されます。
definitions フォルダ内の SQLX ファイルは必要ないので削除しておきます。
ファイル or フォルダ名の右側にある縦3点からファイルの操作を行うことができます。

これで開発の準備が整いました!

パイプラインの実装

ディレクトリ構成

下記が完成系のディレクトリとファイルの構成となります。
このディレクトリ構成は Google Cloud の公式ドキュメントで推奨されている設計方針に基づいています。

  • source: データソースの宣言や、基本的なデータ変換(今回はフィルタリング)を行うファイルを配置する。
  • intermediate: source で定義されたデータを基にさらなる変換処理を行うファイルを配置する。このフォルダ内のリソースは、通常 BI ツールなど外部のシステムに直接公開しない。内部的なデータ変換や集計処理を担当する。
  • outputs: BI ツールなどで直接利用される最終的なテーブルを作成するためのファイルを配置する。公開用データを生成するステージとして機能する。

ディレクトリ構成例:

├── definitions
│   ├── sources
│   │   ├── customer_reviews
│   │   │   ├── customer_reviews_raw.sqlx
│   │   │   └── customer_reviews_raw_filtered.sqlx
│   │   └── product
│   │       └── product.sqlx
│   ├── intermediate
│   │   └── extract_keyword_from_customer_review.sqlx
│   └── outputs
│       └── convert_product_id_to_name.sqlx
├── includes
├── .gitignore
└── workflow_settings.yaml

手順詳細

  1. データソースの宣言
    データソースを明示的に宣言することで、可視化される依存関係にデータソースも含まれ、管理が容易になります。以下の例では、customer_reviews_raw テーブルと product テーブルをデータソースとして宣言しています。
    また、データソースを宣言することで、次のクエリからは、冒頭でも紹介した ${ref("[リソース名]")} を使用できるようになります。
customer_reviews_raw.sqlx
config {
    type: "declaration",
    database: "[PROJECT_ID]",
    schema: "sample_data",
    name: "customer_reviews_raw"
}
  • type: 作成するリソースのタイプ。"declaration" の場合はデータソースの宣言を示す。他にも "table""view" など。
  • database: プロジェクト ID。
  • schema: データセット名。
  • name: テーブル名。
product.sqlx
config {
    type: "declaration",
    database: "[PROJECT_ID]",
    schema: "sample_data",
    name: "product"
}
  1. データフィルタリング
    source フォルダにはデータソースの宣言だけでなく、フィルタリングやフィールド名の変換などの簡単な処理も加えても良いでしょう。
    以下では、1週間分のレビューを取得するフィルタを追加しています。
customer_reviews_raw_filtered.sqlx
config {
    type: "view",
    name: "customer_reviews_raw_filtered"
}

SELECT
  review_id,
  review_text,
  review_date,
  product_id
FROM
  ${ref("customer_reviews_raw")}
WHERE
  review_date BETWEEN CURRENT_DATE('Asia/Tokyo')-7
  AND CURRENT_DATE('Asia/Tokyo')-1
  • type: "view" は中間的な仮想テーブルを作成。
  • ${ref("customer_reviews_raw")}: リソース間の依存関係を指定。
  1. BigQuery ML を活用した LLM テキスト分析

LLM を用いて、カスタマーレビューからキーワードを抽出する処理を記述します。

全体の流れは以下の通りです。

  1. ユーザーのレビューからプロンプトを作成する
  2. LLMにプロンプトを送り、応答を取得する
  3. 応答からキーワードを抽出し、データを整形する
extract_keyword_from_customer_review.sqlx
config {
  type: "view",
  description: "カスタマーレビューからキーワードを抽出する",
  name: "keywords_from_customer_review"
}

WITH user_prompts AS (
  SELECT
    review_id,
    product_id,
    review_text,
    review_date,
    CONCAT(
      '<指示>\nユーザーのレビューから最大5つのキーワードを抽出してください。\n',
      '<outputのフォーマット>\n{"keywords": [最大5つ]}\n\n',
      '<実際の評価>\nレビュー: "', 
      review_text, 
      '"\n=> '
    ) AS prompt
  FROM
    ${ref("customer_reviews_raw_filtered")}
),
llm_results AS (
  SELECT
    u.review_id,
    u.product_id,
    u.review_text,
    u.review_date,
    REGEXP_EXTRACT(ml_generate_text_llm_result, '{[\\s\\S]*?}') AS json_response
  FROM
    user_prompts AS u
  JOIN
    ML.GENERATE_TEXT(
      MODEL `remote_model.gemini_1_5_flash_001`,
      TABLE user_prompts,
      STRUCT(
        0.2 AS temperature,
        150 AS max_output_tokens,
        0.3 AS top_p,
        5 AS top_k,
        TRUE AS flatten_json_output
      )
    ) AS t
  USING (review_id, product_id, review_text, review_date, prompt)
),
extracted_keywords AS (
  SELECT
    review_id,
    product_id,
    review_text,
    review_date,
    JSON_EXTRACT_ARRAY(json_response, '$.keywords') AS keyword_array
  FROM
    llm_results
)

SELECT
  review_id,
  product_id,
  review_text,
  review_date,
  keyword
FROM
  extracted_keywords,
  UNNEST(keyword_array) AS keyword

簡単にクエリ部の説明をします。

最初の WITH 句ではプロンプトの作成をしています。BigQuery では LLM へのリクエスト時にテーブルを指定する場合には、プロンプトの入る列名を prompt とする必要があるため、AS prompt でそのように指定しています。

WITH user_prompts AS (
  SELECT
    review_id,
    product_id,
    review_text,
    review_date,
    CONCAT(
      '<指示>\nユーザーのレビューから最大5つのキーワードを抽出してください。\n',
      '<outputのフォーマット>\n{"keywords": [最大5つ]}\n\n',
      '<実際の評価>\nレビュー: "', 
      review_text, 
      '"\n=> '
    ) AS prompt
  FROM
    ${ref("customer_reviews_raw_filtered")}
),

続いて、JOIN 句の ML.GENERATE_TEXT で LLM に対してリクエストを送っています。 STRUCT ブロックでは同時にリクエストに送るパラメータを設定しています。
レスポンスで生成されたテキストは ml_generate_text_result という名前で返ってきます。
上のプロンプトでは出力が JSON 形式になるようにしているので、ml_generate_text_result から REGEXP_EXTRACT で JSON を抽出しています。

llm_results AS (
  SELECT
    u.review_id,
    u.product_id,
    u.review_text,
    u.review_date,
    REGEXP_EXTRACT(ml_generate_text_llm_result, '{[\\s\\S]*?}') AS json_response
  FROM
    user_prompts AS u
  JOIN
    ML.GENERATE_TEXT(
      MODEL `remote_model.gemini_1_5_flash_001`,
      TABLE user_prompts,
      STRUCT(
        0.2 AS temperature,
        150 AS max_output_tokens,
        0.3 AS top_p,
        5 AS top_k,
        TRUE AS flatten_json_output
      )
    ) AS t
  USING (review_id, product_id, review_text, review_date, prompt)
),

最後に JSON からキーワードを取得し、UNNEST で展開しています。

extracted_keywords AS (
  SELECT
    review_id,
    product_id,
    review_text,
    review_date,
    JSON_EXTRACT_ARRAY(json_response, '$.keywords') AS keyword_array
  FROM
    llm_results
)

SELECT
  review_id,
  product_id,
  review_text,
  review_date,
  keyword
FROM
  extracted_keywords,
  UNNEST(keyword_array) AS keyword
  1. 最終テーブル作成
    extract_keyword_from_customer_review.sqlx の出力を元に、product_idproduct_name に変換し、出力用テーブルを生成します。
convert_product_id_to_name.sqlx
config {
  type: "table",
  description: "product_idをproduct_nameに変換する",
  schema: "result",
  name: "customer_review_keywords",
  bigquery: {
    partitionBy: "review_date"
  }
}

SELECT
  review_id,
  review_text,
  review_date,
  keyword,
  product.product_name AS product_name
FROM
  ${ref("keywords_from_customer_review")} AS kr
JOIN 
  ${ref("product")} AS product
ON 
  kr.product_id = product.product_id
  • schema: 最終テーブルを保存するデータセット。
  • bigquery: 作成するテーブルやビューに対する詳細な設定。今回は partitionBy でパーティションキーを設定している。

依存関係を確認する

「COMPILED GRAPH」から依存関係を確認します。
Dataform では、ファイルを作成すると自動的にコンパイルが実行され、下記のような依存関係のグラフが生成されます。この機能により、データソースからクエリの流れが一目で把握できるようになります。

実行してみる

すべてのファイルが作成され、依存関係にも問題がなさそうなため、実際にパイプラインを実行し結果を確認します。

画面上部の 「実行を開始」 > 「操作を実行」 を選択してください。特定のファイルだけ実行するなど細かい操作も可能ですが、今回は「ALL ACTIONS」を選択してすべてを実行します。

隠れてしまっていますが、宛先には BigQuery の出力先が記載されています。([プロジェクトID].[データセット名].[テーブル名]など)

BigQuery Studio で出力されたデータを確認します。convert_product_id_to_name.sqlx で指定した通り、result データセットに最終テーブル(customer_review_keywords)が出力されていました。
review_text からキーワードが抽出されていることが確認できます。パイプラインを拡張しユーザーの感情分析を加えたり、このデータをさらに活用し、抽出されたキーワードを集計しても面白そうだと思いました。

なお、中間結果として作成されるビューは保存先を指定していなかったため dataform データセット内に保存されます。

スケジュールを定義する

最後に、Dataform を使用して定期実行のスケジュールを設定します。
Dataform のワークフロー構成機能を使えば、定期的なデータ処理を自動化できます。

  • ワークフロー構成:
    ワークフロー構成では、リリース構成でコンパイルされた SQL を実行するためのスケジュールや設定を定義します。これにより、データ変換処理を自動化できます。
    主な設定: 実行するリリース構成の指定、実行頻度やタイミングの設定、実行するアクションの選択(リポジトリ内のどのファイルを実行するか)など
  • リリース構成:
    リリース構成は、リポジトリ内の SQLX ファイルをコンパイルし、BigQuery で実行可能な SQL に変換する設定です。
    主な設定: コンパイルの頻度やタイミング、スキーマやテーブル名の接頭辞・接尾辞、コンパイル時に使用する変数など

スケジュール設定の手順

  1. リポジトリへコミット
    リリース構成やワークフロー構成は、リポジトリにコミットされた内容が反映されます。
    ワークスペース左上の「+n件の変更を COMMIT」を選択し、すべてのチェックボックスをオンにして commit メッセージを入力してください。

その後、「PUSH TO DEFAULT BRANCH」を選択すればコミット完了です。

  1. リリース構成を作成する
    ワークスペースの選択画面に戻り、「リリースとスケジュール」>「製品版リリースの作成」を選択します。
    リリース ID を入力し、「Schedule Frequency」はオンデマンドとします。
    その他はデフォルトのままで OK です。

  1. ワークフロー構成の作成
    リリース構成を作成するとワークフロー構成を作成できるようになります。
    「ワークフロー構成」の右に書かれた「+作成」を選択します。

構成 ID を入力し、リリース構成には先ほど作成したリリース構成を選択します。サービスアカウントはデフォルトのもので問題ありません。
スケジュール設定とアクション指定を行い、「作成」 を選択すればすべての作業が完了です。

実行結果は「WORKFLOW EXECUTION LOGS」で確認できます。

これで、Dataform を用いたデータパイプラインの構築と自動化が完了です!

まとめ

今回は、Dataform と BigQuery ML のリモートモデルを活用し、LLM を用いたテキスト分析パイプラインの構築方法をご紹介しました。
近年、生成AIの発展により、非データサイエンティストでも大量の非構造化データを手軽に分析できるようになりつつあります。
ぜひ、LLM と Dataform と BigQuery ML を活用して、新たなインサイトを発見してみてください。

Discussion