🌀

SQLXの1人実装を生成AIで解消しようとした(Dataform編)

2024/12/10に公開

この記事は株式会社ガラパゴス(有志)アドベントカレンダー2024の10日目の記事です。

最近会社の同僚が広い部屋に引っ越して人生変わったくらいの感じでSlackに投稿されているのを見て、私は今年何も変わってないなあと感じる今日この頃です。人生変わった話はいずれ記事化されてほしいと密かに思っているほどです。多分ならないと思いますが。
それでは本題に移ります。

背景

うちの会社ではデータ基盤にDataformを使用しています。[1]昨年導入した際には公式ドキュメントくらいしか見当たりませんでしたが、最近ではDataform関連の記事も増えてきました。

とはいえDataformに関して定番や基本的な内容のものがほとんどで、少し変わったことをするようなものはまだそれほど見当たりません。Dataformでいろいろ試したいけれど、1人で一から調査しながら試すのも時間がかかってしまう…
そこで今回も生成AIにDataformでの実装方法を質問してみて、実際にDataformで実装をするうえで使えそうかいろいろなケースで試してみました。

試したこと

ケース1:スプシソーステーブルの作成

BigQueryではGoogleのスプレッドシートをデータソースとする外部テーブルを作成することができます。その作成をDataformで行う場合のSQLXの実装について質問してみました。

渡したインプット

Dataformで以下の構造を持つGoogleスプレッドシートをソースとするテーブルを作成するSQLXを生成してください:

  • カラム
    • A列:ユーザーID(値の例:12)
    • B列:日時(値の例:2024-12-09)
    • C列:購入金額(値の例:5,000)
    • D列:アイテム名(値の例:詰め合わせセット)
  • 留意点
    • スプレッドシートのURLおよびシート名はダミー値を設定してください
    • スプレッドシートの1行目はカラム名が入っているため、データとしては2行目以降を利用してください
    • 各カラムには値の例を記載していますが、突然全く異なる形式のデータが入る可能性があります。そのような場合で作成したテーブルにSELECT *をしてもエラーが起きないようにしてください

Claude 3.5 SonnetとChatGPT 4oに質問してみましたが、どちらも1回では欲しい結果を得られませんでした。回答で得られたコードをDataform上で実行し、表示されたエラーをまた質問するという形式で何度かやり直しました。
最終的にClaudeは正しい実装が得られましたが、ChatGPTの方は一部のエラーを解消できませんでした。(Dataformの文法では、type: "operations"が正しいがtype: "operation"の回答しか得られず、何度かエラーメッセージを伝えても正しいコードを出力できなかった)

Claude 3.5 Sonnet での結果(7回目、コードのみ)
-- definitions/purchase_records_external.sqlx
config {
  type: "operations",
  hasOutput: true,
  name: "purchase_records_external"
}

CREATE OR REPLACE EXTERNAL TABLE your_dataset.purchase_records_external (
  user_id STRING,
  purchase_datetime STRING,
  purchase_amount STRING,
  item_name STRING
)
OPTIONS (
  format = 'GOOGLE_SHEETS',
  uris = ['https://docs.google.com/spreadsheets/d/dummy_spreadsheet_id/edit#gid=0'],
  skip_leading_rows = 1
);
-- definitions/purchase_records.sqlx
config {
  type: "table",
  name: "purchase_records",
  description: "Purchase records imported from Google Spreadsheet. Each record contains user ID, datetime, purchase amount, and item name.",
  dependencies: ["purchase_records_external"]
}

WITH validated_data AS (
  SELECT
    -- User ID: Convert to integer, set to NULL if invalid
    CAST(
      CASE 
        WHEN REGEXP_CONTAINS(TRIM(COALESCE(user_id, '')), r'^[0-9]+$') 
        THEN TRIM(user_id)
        ELSE NULL 
      END AS INT64
    ) AS user_id,

    -- DateTime: Convert to timestamp, set to NULL if invalid
    CAST(
      CASE 
        WHEN REGEXP_CONTAINS(TRIM(COALESCE(purchase_datetime, '')), r'^\d{4}-\d{2}-\d{2}( \d{2}:\d{2}(:\d{2})?)?$')
        THEN TRIM(purchase_datetime)
        ELSE NULL 
      END AS TIMESTAMP
    ) AS purchase_datetime,

    -- Purchase Amount: Remove commas, convert to numeric, set to NULL if invalid
    CAST(
      CASE 
        WHEN REGEXP_CONTAINS(REPLACE(TRIM(COALESCE(purchase_amount, '')), ',', ''), r'^\d+(\.\d+)?$')
        THEN REPLACE(TRIM(purchase_amount), ',', '')
        ELSE NULL 
      END AS NUMERIC
    ) AS purchase_amount,

    -- Item Name: Clean string, set to empty string if NULL
    TRIM(COALESCE(item_name, '')) AS item_name,

    -- Add metadata column for processing timestamp
    CURRENT_TIMESTAMP() AS processed_at,
    
    -- Add row number for tracking
    ROW_NUMBER() OVER() AS source_row_number
  FROM 
    your_dataset.purchase_records_external
)

SELECT 
  -- Main columns
  user_id,
  purchase_datetime,
  purchase_amount,
  item_name,
  
  -- Metadata columns
  source_row_number,
  processed_at,
  
  -- Error flag columns for data quality monitoring
  CASE WHEN user_id IS NULL THEN TRUE ELSE FALSE END AS has_invalid_user_id,
  CASE WHEN purchase_datetime IS NULL THEN TRUE ELSE FALSE END AS has_invalid_datetime,
  CASE WHEN purchase_amount IS NULL THEN TRUE ELSE FALSE END AS has_invalid_amount
FROM 
  validated_data
ChatGPT 4o での結果(8回目、コードのみ)
config {
  type: "operation",
  description: "Googleスプレッドシートを外部テーブルとして作成"
}

CREATE EXTERNAL TABLE IF NOT EXISTS `project_id.dataset_id.spreadsheet_table`
OPTIONS (
  format = 'GOOGLE_SHEETS',
  uris = ['https://docs.google.com/spreadsheets/d/DUMMY_SPREADSHEET_ID']
);
config {
  type: "table",
  description: "外部テーブルを基に加工したテーブル",
  dependencies: ["create_external_table"] -- 依存関係を指定
}

WITH filtered_data AS (
  SELECT
    *,
    ROW_NUMBER() OVER () AS row_num -- 各行に行番号を付与
  FROM
    `project_id.dataset_id.spreadsheet_table`
)
SELECT
  SAFE_CAST(CAST(A AS STRING) AS INT64) AS user_id, -- ユーザーIDを整数型にキャスト
  SAFE.PARSE_DATETIME('%Y-%m-%d', CAST(B AS STRING)) AS purchase_date, -- 日時を日付型にキャスト
  SAFE_CAST(REPLACE(CAST(C AS STRING), ',', '') AS INT64) AS purchase_amount, -- カンマを除去して整数型にキャスト
  CAST(D AS STRING) AS item_name -- アイテム名は文字列型のまま
FROM
  filtered_data
WHERE
  row_num > 1 -- 1行目(ヘッダー)を除外

ケース2:主キーの設定

BigQueryに昨年追加された主キーの設定を、先ほどのケース1で作成するテーブルに対してDataformで行う実装について質問しました。

渡したインプット

先ほど作成したテーブルに、主キーを設定するようにしてください

この質問に対してはどちらも初回でそれらしい回答が得られました。
ただし主キーは複数設定できるため1項目でも[]で囲う必要があり、ChatGPTが示したuniqueKey: "user_id"ではエラーが発生してしまいます。

Claude 3.5 Sonnet での結果
-- definitions/purchase_records.sqlx
config {
  type: "table",
  name: "purchase_records",
  description: "Purchase records imported from Google Spreadsheet. Each record contains user ID, datetime, purchase amount, and item name.",
  uniqueKey: ["user_id"],
  assertions: {
    uniqueKey: ["user_id"]
  }
}

WITH validated_data AS (
  SELECT
    -- User ID: Convert to integer, set to NULL if invalid
    CAST(
      CASE 
        WHEN REGEXP_CONTAINS(TRIM(COALESCE(user_id, '')), r'^[0-9]+$') 
        THEN TRIM(user_id)
        ELSE NULL 
      END AS INT64
    ) AS user_id,

    -- DateTime: Convert to timestamp, set to NULL if invalid
    CAST(
      CASE 
        WHEN REGEXP_CONTAINS(TRIM(COALESCE(purchase_datetime, '')), r'^\d{4}-\d{2}-\d{2}( \d{2}:\d{2}(:\d{2})?)?$')
        THEN TRIM(purchase_datetime)
        ELSE NULL 
      END AS TIMESTAMP
    ) AS purchase_datetime,

    -- Purchase Amount: Remove commas, convert to numeric, set to NULL if invalid
    CAST(
      CASE 
        WHEN REGEXP_CONTAINS(REPLACE(TRIM(COALESCE(purchase_amount, '')), ',', ''), r'^\d+(\.\d+)?$')
        THEN REPLACE(TRIM(purchase_amount), ',', '')
        ELSE NULL 
      END AS NUMERIC
    ) AS purchase_amount,

    -- Item Name: Clean string, set to empty string if NULL
    TRIM(COALESCE(item_name, '')) AS item_name,

    -- Add metadata column for processing timestamp
    CURRENT_TIMESTAMP() AS processed_at,
    
    -- Add row number for tracking
    ROW_NUMBER() OVER() AS source_row_number
  FROM 
    ${ref("purchase_records_external")}
),

-- Handle duplicate user_ids by keeping the latest record
deduplicated_data AS (
  SELECT 
    user_id,
    purchase_datetime,
    purchase_amount,
    item_name,
    source_row_number,
    processed_at,
    ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY purchase_datetime DESC, source_row_number DESC) as row_num
  FROM 
    validated_data
  WHERE 
    user_id IS NOT NULL  -- Exclude NULL user_ids as they can't be part of a primary key
)

SELECT 
  -- Main columns
  user_id,
  purchase_datetime,
  purchase_amount,
  item_name,
  
  -- Metadata columns
  source_row_number,
  processed_at,
  
  -- Error flag columns for data quality monitoring
  CASE WHEN user_id IS NULL THEN TRUE ELSE FALSE END AS has_invalid_user_id,
  CASE WHEN purchase_datetime IS NULL THEN TRUE ELSE FALSE END AS has_invalid_datetime,
  CASE WHEN purchase_amount IS NULL THEN TRUE ELSE FALSE END AS has_invalid_amount
FROM 
  deduplicated_data
WHERE 
  row_num = 1  -- Only keep the latest record for each user_id

主な変更点:
1.configにuniqueKeyとassertionsを追加
2.重複するuser_idの処理を追加(最新のレコードのみを保持)
3.NULLのuser_idを除外する処理を追加

注意点:
1.この実装では、同じuser_idの複数レコードがある場合:

  • 最新のpurchase_datetimeを持つレコードを保持
  • 同じ日時の場合は、ソースデータの後の行(より大きいsource_row_number)を優先

2.以下のようなケースで主キー制約違反が発生する可能性があります:

  • NULLのuser_id
  • 無効なフォーマットのuser_id
  • スプレッドシートに重複するuser_idがある場合

これらのケースを別の方法で処理したい場合は、要件に応じて修正できます。例えば:

  • すべてのレコードを保持したい場合は、user_idとpurchase_datetimeの組み合わせを主キーにする
  • 重複レコードをエラーとして扱いたい場合は、別のテーブルにエラーログを出力する などの対応が可能です。
ChatGPT 4o での結果

BigQueryでは、データベースシステムのように明示的な主キー制約をサポートしていません。ただし、主キーとして機能する一意性を保証するカラムを作成し、その一意性を維持するロジックをSQL内で実装することが一般的です。

Dataformでの対応として以下の方法を提案します:

修正版: 主キーを指定する方法
config内でuniqueKeyプロパティを指定する
Dataformでは、configセクションにuniqueKeyプロパティを設定することで主キーを仮想的に定義できます。このプロパティは、データがリフレッシュされるたびに一意性が保証されるカラムを指定します。

SQLで一意性を保証するためのロジックを追加
主キーに重複が生じないよう、データ処理中にROW_NUMBER()やDISTINCTを活用します。

config {
  type: "table",
  description: "外部テーブルを基に加工したテーブル",
  dependencies: ["create_external_table"], -- 依存関係を指定
  uniqueKey: "user_id" -- 主キーを指定
}

WITH filtered_data AS (
  SELECT
    *,
    ROW_NUMBER() OVER () AS row_num -- 各行に行番号を付与
  FROM
    `project_id.dataset_id.spreadsheet_table`
),
deduplicated_data AS (
  SELECT
    SAFE_CAST(CAST(A AS STRING) AS INT64) AS user_id, -- ユーザーIDを整数型にキャスト
    SAFE.PARSE_DATETIME('%Y-%m-%d', CAST(B AS STRING)) AS purchase_date, -- 日時を日付型にキャスト
    SAFE_CAST(REPLACE(CAST(C AS STRING), ',', '') AS INT64) AS purchase_amount, -- カンマを除去して整数型にキャスト
    CAST(D AS STRING) AS item_name, -- アイテム名は文字列型のまま
    row_num
  FROM
    filtered_data
  WHERE
    row_num > 1 -- 1行目(ヘッダー)を除外
)
SELECT
  user_id,
  purchase_date,
  purchase_amount,
  item_name
FROM
  deduplicated_data
GROUP BY
  user_id, purchase_date, purchase_amount, item_name
HAVING
  COUNT(user_id) = 1 -- 重複を防ぐロジック(主キーの一意性を保証)

修正内容のポイント
1.uniqueKeyを設定:

  • uniqueKeyプロパティにuser_idを指定しました。
  • Dataformでは、このプロパティを使用してデータの一意性が強調されますが、BigQuery自体が制約を設けるわけではないことに注意してください。

2.一意性の保証:

  • 重複データが発生しないようにHAVING COUNT(user_id) = 1を使用しています。
  • 万が一、user_idが重複するデータが存在した場合、Dataformの処理時点で削除されます。

3.重複を除外するロジック:

  • 必要に応じて、ROW_NUMBER()やDISTINCTを活用し、重複データを除外しました。

4.加工データのみをSELECT:

  • 最終出力では、データをuser_idに基づいて整理し、主キーとして機能するカラムを保持しました。

ケース3:DataformにおけるSQL作成

Dataformの良いところは、テーブル同士の関係をわかりやすくグラフとして可視化できる点ですが、それにはDataformの構文を用いる必要があります。Dataformで用いる一般的なSQLについても質問してみました。

渡したインプット

Dataformで以下の2つの入力テーブルからデータを取得して出力テーブルを作成するSQLを生成してください
○入力テーブル1

  • プロジェクト名:dummy
  • データセット名:dummyA
  • テーブル名:input
  • カラム構成
    • date
    • timestamp
    • url
    • user_id

○入力テーブル2

  • プロジェクト名:dummy
  • データセット名:dummyB
  • テーブル名:input
  • カラム構成
    • user_id
    • user_data
    • hidden_flag

○出力テーブル

  • プロジェクト名:dummy
  • データセット名:dummyC
  • テーブル名:output
  • カラム構成
    • user_id
    • user_data
    • url_path
    • timestamp
  • 条件
    • 同じuser_idで入力テーブルをjoinし、user_idごとにtimestampが新しいものを1件のみ残す
    • 入力テーブル1に存在しないuser_idは使用しない。入力テーブル2に存在しない場合のuser_dataは(未設定)とする
    • urlからurl_pathを抽出する処理は関数として定義して利用する
  • その他
    • 出力テーブルはtimestampでpartitionを作成する

こちらも初回の回答でそれらしい結果が得られました。
ただし${ref()}でテーブル名を指定しているClaudeの方が一枚上手に見えます。
(ChatGPTのようにテーブル名を直書きするとテーブル同士の関係性をグラフとして可視化できない)

Claude 3.5 Sonnet での結果
config {
  type: "table",
  schema: "dummyC",
  name: "output",
  description: "Transform and combine data from dummyA.input and dummyB.input",
  columns: {
    user_id: "User ID",
    user_data: "User data from dummyB",
    url_path: "Extracted path from URL",
    timestamp: "Timestamp of the record"
  },
  bigquery: {
    partitionBy: "TIMESTAMP_TRUNC(timestamp, DAY)"
  }
}

CREATE TEMP FUNCTION extract_url_path(url STRING)
RETURNS STRING
AS (
  -- URLからパスを抽出する関数
  REGEXP_EXTRACT(url, r'^(?:https?:\/\/)?(?:[^\/]+)(\/.*)$')
);

SELECT 
  t1.user_id,
  COALESCE(t2.user_data, '(未設定)') as user_data,
  extract_url_path(t1.url) as url_path,
  t1.timestamp
FROM (
  -- 入力テーブル1からuser_idごとに最新のレコードを取得
  SELECT 
    user_id,
    url,
    timestamp,
    ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY timestamp DESC) as rn
  FROM ${ref("dummy", "dummyA", "input")}
) t1
LEFT JOIN (
  -- 入力テーブル2から非表示フラグがないレコードを取得
  SELECT 
    user_id,
    user_data
  FROM ${ref("dummy", "dummyB", "input")}
  WHERE hidden_flag IS NULL OR NOT hidden_flag
) t2
ON t1.user_id = t2.user_id
WHERE t1.rn = 1  -- 最新レコードのみを選択

このSQLでは以下の処理を行っています:
1.テーブル設定

  • configブロックで出力テーブルの設定を定義
  • timestampによるパーティション化を設定

2.URL処理関数

  • extract_url_path関数を定義してURLからパスを抽出

3.データ処理

  • 入力テーブル1からuser_idごとに最新のレコードを取得
  • 入力テーブル2とLEFT JOINで結合
  • user_dataが存在しない場合は「(未設定)」を設定

4.その他の特徴

  • ref関数を使用して入力テーブルを参照
  • ROW_NUMBER()を使用して最新レコードの選択を実現
  • クエリの可読性を考慮してサブクエリを活用

このSQLを編集する必要がある場合や、特定の部分について質問がありましたらお知らせください。

ChatGPT 4o での結果
-- 出力テーブルの定義
config {
  type: "table",
  schema: "dummyC",
  name: "output",
  partitions: ["timestamp"]
}

-- URLからurl_pathを抽出する関数
CREATE TEMP FUNCTION extract_url_path(url STRING) RETURNS STRING AS (
  REGEXP_EXTRACT(url, r"https?://[^/]+(/.*)")
);

-- SQL処理開始
WITH input_table1 AS (
  SELECT
    date,
    timestamp,
    url,
    user_id
  FROM
    `dummy.dummyA.input`
),
input_table2 AS (
  SELECT
    user_id,
    user_data,
    hidden_flag
  FROM
    `dummy.dummyB.input`
),
-- テーブル1とテーブル2をJOINし、user_dataがない場合は「未設定」とする
joined_data AS (
  SELECT
    t1.user_id,
    COALESCE(t2.user_data, "未設定") AS user_data,
    extract_url_path(t1.url) AS url_path,
    t1.timestamp
  FROM
    input_table1 t1
  LEFT JOIN
    input_table2 t2
  ON
    t1.user_id = t2.user_id
),
-- user_idごとに最新のtimestampを持つ行を取得
latest_data AS (
  SELECT
    user_id,
    user_data,
    url_path,
    timestamp
  FROM (
    SELECT
      *,
      ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY timestamp DESC) AS row_num
    FROM
      joined_data
  )
  WHERE
    row_num = 1
)

-- 出力テーブルにデータをINSERT
SELECT
  user_id,
  user_data,
  url_path,
  timestamp
FROM
  latest_data

説明
1.extract_url_path関数:

  • URLからパス部分を抽出するための正規表現を用いた関数を作成しています。
  • 例: https://example.com/path → /path

2.joined_data CTE:

  • 入力テーブル1 (dummyA.input) と入力テーブル2 (dummyB.input) を user_id で結合します。
  • 入力テーブル2に対応する user_id が存在しない場合は、COALESCEを使用してuser_dataに「未設定」を設定します。

3.latest_data CTE:

  • 各 user_id ごとに timestamp が最新の1件を取得します。
  • ROW_NUMBER() を使用して、timestampで降順ソートした最初の行を選択しています。

4.出力テーブルの設定:

  • timestamp を PARTITION として設定しています。
  • このスクリプトをDataformプロジェクトに追加することで、条件を満たす出力テーブルが生成されます。

結果

個人的な評価としては以下の通りです

Claude 3.5 Sonnet ChatGPT 4o
ケース1
ケース2
ケース3

所感

  • 基本的なSQLの生成についてはどちらも可能ですが、DataformのSQLXという部分に特化して考えるとClaudeの方に軍配が上がりました。ここまで精度が変わると思っていませんでしたので両方とも試して良かったです。
  • ただしClaudeも一回で正解を出してくれる訳ではないので、出力したものに関する確認は必須となります。一度出力されてからがスタートくらいのイメージでいると良さそうです。
  • Claudeが主キー設定する際に、assertionも同時に設定しているところはなるほどと感じました。私はDataformのassertionをうまく活用できていないですが、assertionのおすすめ設定をClaudeに聞けばデータ基盤がより強固なものになりそうな気がします。
  • Dataform関連の深い話をできる相手が人間では見つからない時には、Claudeに雑談相手になってもらうのもありかもしれないです。

今後試したいこと

今回はDataformで使うSQLXについて単体で質問しましたが、もう少し広げてデータ基盤のアーキテクチャを部分的に相談するのも面白いかもと感じました。最近はBigQueryとDataformが中心で他のデータ基盤関連サービスを使う機会が減っていますが、他のサービスもすぐに導入できるようClaudeと一緒に調べてみようと思いました。

脚注
  1. 弊社のデータ基盤にDataformを導入した際の詳細は、こちらの記事にまとめてあります。https://findy-tools.io/products/dataform/378/311 ↩︎

株式会社ガラパゴス(有志)

Discussion