SQLXの1人実装を生成AIで解消しようとした(Dataform編)
この記事は株式会社ガラパゴス(有志)アドベントカレンダー2024の10日目の記事です。
最近会社の同僚が広い部屋に引っ越して人生変わったくらいの感じでSlackに投稿されているのを見て、私は今年何も変わってないなあと感じる今日この頃です。人生変わった話はいずれ記事化されてほしいと密かに思っているほどです。多分ならないと思いますが。
それでは本題に移ります。
背景
うちの会社ではデータ基盤にDataformを使用しています。[1]昨年導入した際には公式ドキュメントくらいしか見当たりませんでしたが、最近ではDataform関連の記事も増えてきました。
とはいえDataformに関して定番や基本的な内容のものがほとんどで、少し変わったことをするようなものはまだそれほど見当たりません。Dataformでいろいろ試したいけれど、1人で一から調査しながら試すのも時間がかかってしまう…
そこで今回も生成AIにDataformでの実装方法を質問してみて、実際にDataformで実装をするうえで使えそうかいろいろなケースで試してみました。
試したこと
ケース1:スプシソーステーブルの作成
BigQueryではGoogleのスプレッドシートをデータソースとする外部テーブルを作成することができます。その作成をDataformで行う場合のSQLXの実装について質問してみました。
渡したインプット
Dataformで以下の構造を持つGoogleスプレッドシートをソースとするテーブルを作成するSQLXを生成してください:
- カラム
- A列:ユーザーID(値の例:12)
- B列:日時(値の例:2024-12-09)
- C列:購入金額(値の例:5,000)
- D列:アイテム名(値の例:詰め合わせセット)
- 留意点
- スプレッドシートのURLおよびシート名はダミー値を設定してください
- スプレッドシートの1行目はカラム名が入っているため、データとしては2行目以降を利用してください
- 各カラムには値の例を記載していますが、突然全く異なる形式のデータが入る可能性があります。そのような場合で作成したテーブルに
SELECT *
をしてもエラーが起きないようにしてください
Claude 3.5 SonnetとChatGPT 4oに質問してみましたが、どちらも1回では欲しい結果を得られませんでした。回答で得られたコードをDataform上で実行し、表示されたエラーをまた質問するという形式で何度かやり直しました。
最終的にClaudeは正しい実装が得られましたが、ChatGPTの方は一部のエラーを解消できませんでした。(Dataformの文法では、type: "operations"
が正しいがtype: "operation"
の回答しか得られず、何度かエラーメッセージを伝えても正しいコードを出力できなかった)
Claude 3.5 Sonnet での結果(7回目、コードのみ)
-- definitions/purchase_records_external.sqlx
config {
type: "operations",
hasOutput: true,
name: "purchase_records_external"
}
CREATE OR REPLACE EXTERNAL TABLE your_dataset.purchase_records_external (
user_id STRING,
purchase_datetime STRING,
purchase_amount STRING,
item_name STRING
)
OPTIONS (
format = 'GOOGLE_SHEETS',
uris = ['https://docs.google.com/spreadsheets/d/dummy_spreadsheet_id/edit#gid=0'],
skip_leading_rows = 1
);
-- definitions/purchase_records.sqlx
config {
type: "table",
name: "purchase_records",
description: "Purchase records imported from Google Spreadsheet. Each record contains user ID, datetime, purchase amount, and item name.",
dependencies: ["purchase_records_external"]
}
WITH validated_data AS (
SELECT
-- User ID: Convert to integer, set to NULL if invalid
CAST(
CASE
WHEN REGEXP_CONTAINS(TRIM(COALESCE(user_id, '')), r'^[0-9]+$')
THEN TRIM(user_id)
ELSE NULL
END AS INT64
) AS user_id,
-- DateTime: Convert to timestamp, set to NULL if invalid
CAST(
CASE
WHEN REGEXP_CONTAINS(TRIM(COALESCE(purchase_datetime, '')), r'^\d{4}-\d{2}-\d{2}( \d{2}:\d{2}(:\d{2})?)?$')
THEN TRIM(purchase_datetime)
ELSE NULL
END AS TIMESTAMP
) AS purchase_datetime,
-- Purchase Amount: Remove commas, convert to numeric, set to NULL if invalid
CAST(
CASE
WHEN REGEXP_CONTAINS(REPLACE(TRIM(COALESCE(purchase_amount, '')), ',', ''), r'^\d+(\.\d+)?$')
THEN REPLACE(TRIM(purchase_amount), ',', '')
ELSE NULL
END AS NUMERIC
) AS purchase_amount,
-- Item Name: Clean string, set to empty string if NULL
TRIM(COALESCE(item_name, '')) AS item_name,
-- Add metadata column for processing timestamp
CURRENT_TIMESTAMP() AS processed_at,
-- Add row number for tracking
ROW_NUMBER() OVER() AS source_row_number
FROM
your_dataset.purchase_records_external
)
SELECT
-- Main columns
user_id,
purchase_datetime,
purchase_amount,
item_name,
-- Metadata columns
source_row_number,
processed_at,
-- Error flag columns for data quality monitoring
CASE WHEN user_id IS NULL THEN TRUE ELSE FALSE END AS has_invalid_user_id,
CASE WHEN purchase_datetime IS NULL THEN TRUE ELSE FALSE END AS has_invalid_datetime,
CASE WHEN purchase_amount IS NULL THEN TRUE ELSE FALSE END AS has_invalid_amount
FROM
validated_data
ChatGPT 4o での結果(8回目、コードのみ)
config {
type: "operation",
description: "Googleスプレッドシートを外部テーブルとして作成"
}
CREATE EXTERNAL TABLE IF NOT EXISTS `project_id.dataset_id.spreadsheet_table`
OPTIONS (
format = 'GOOGLE_SHEETS',
uris = ['https://docs.google.com/spreadsheets/d/DUMMY_SPREADSHEET_ID']
);
config {
type: "table",
description: "外部テーブルを基に加工したテーブル",
dependencies: ["create_external_table"] -- 依存関係を指定
}
WITH filtered_data AS (
SELECT
*,
ROW_NUMBER() OVER () AS row_num -- 各行に行番号を付与
FROM
`project_id.dataset_id.spreadsheet_table`
)
SELECT
SAFE_CAST(CAST(A AS STRING) AS INT64) AS user_id, -- ユーザーIDを整数型にキャスト
SAFE.PARSE_DATETIME('%Y-%m-%d', CAST(B AS STRING)) AS purchase_date, -- 日時を日付型にキャスト
SAFE_CAST(REPLACE(CAST(C AS STRING), ',', '') AS INT64) AS purchase_amount, -- カンマを除去して整数型にキャスト
CAST(D AS STRING) AS item_name -- アイテム名は文字列型のまま
FROM
filtered_data
WHERE
row_num > 1 -- 1行目(ヘッダー)を除外
ケース2:主キーの設定
BigQueryに昨年追加された主キーの設定を、先ほどのケース1で作成するテーブルに対してDataformで行う実装について質問しました。
渡したインプット
先ほど作成したテーブルに、主キーを設定するようにしてください
この質問に対してはどちらも初回でそれらしい回答が得られました。
ただし主キーは複数設定できるため1項目でも[]で囲う必要があり、ChatGPTが示したuniqueKey: "user_id"
ではエラーが発生してしまいます。
Claude 3.5 Sonnet での結果
-- definitions/purchase_records.sqlx
config {
type: "table",
name: "purchase_records",
description: "Purchase records imported from Google Spreadsheet. Each record contains user ID, datetime, purchase amount, and item name.",
uniqueKey: ["user_id"],
assertions: {
uniqueKey: ["user_id"]
}
}
WITH validated_data AS (
SELECT
-- User ID: Convert to integer, set to NULL if invalid
CAST(
CASE
WHEN REGEXP_CONTAINS(TRIM(COALESCE(user_id, '')), r'^[0-9]+$')
THEN TRIM(user_id)
ELSE NULL
END AS INT64
) AS user_id,
-- DateTime: Convert to timestamp, set to NULL if invalid
CAST(
CASE
WHEN REGEXP_CONTAINS(TRIM(COALESCE(purchase_datetime, '')), r'^\d{4}-\d{2}-\d{2}( \d{2}:\d{2}(:\d{2})?)?$')
THEN TRIM(purchase_datetime)
ELSE NULL
END AS TIMESTAMP
) AS purchase_datetime,
-- Purchase Amount: Remove commas, convert to numeric, set to NULL if invalid
CAST(
CASE
WHEN REGEXP_CONTAINS(REPLACE(TRIM(COALESCE(purchase_amount, '')), ',', ''), r'^\d+(\.\d+)?$')
THEN REPLACE(TRIM(purchase_amount), ',', '')
ELSE NULL
END AS NUMERIC
) AS purchase_amount,
-- Item Name: Clean string, set to empty string if NULL
TRIM(COALESCE(item_name, '')) AS item_name,
-- Add metadata column for processing timestamp
CURRENT_TIMESTAMP() AS processed_at,
-- Add row number for tracking
ROW_NUMBER() OVER() AS source_row_number
FROM
${ref("purchase_records_external")}
),
-- Handle duplicate user_ids by keeping the latest record
deduplicated_data AS (
SELECT
user_id,
purchase_datetime,
purchase_amount,
item_name,
source_row_number,
processed_at,
ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY purchase_datetime DESC, source_row_number DESC) as row_num
FROM
validated_data
WHERE
user_id IS NOT NULL -- Exclude NULL user_ids as they can't be part of a primary key
)
SELECT
-- Main columns
user_id,
purchase_datetime,
purchase_amount,
item_name,
-- Metadata columns
source_row_number,
processed_at,
-- Error flag columns for data quality monitoring
CASE WHEN user_id IS NULL THEN TRUE ELSE FALSE END AS has_invalid_user_id,
CASE WHEN purchase_datetime IS NULL THEN TRUE ELSE FALSE END AS has_invalid_datetime,
CASE WHEN purchase_amount IS NULL THEN TRUE ELSE FALSE END AS has_invalid_amount
FROM
deduplicated_data
WHERE
row_num = 1 -- Only keep the latest record for each user_id
主な変更点:
1.configにuniqueKeyとassertionsを追加
2.重複するuser_idの処理を追加(最新のレコードのみを保持)
3.NULLのuser_idを除外する処理を追加
注意点:
1.この実装では、同じuser_idの複数レコードがある場合:
- 最新のpurchase_datetimeを持つレコードを保持
- 同じ日時の場合は、ソースデータの後の行(より大きいsource_row_number)を優先
2.以下のようなケースで主キー制約違反が発生する可能性があります:
- NULLのuser_id
- 無効なフォーマットのuser_id
- スプレッドシートに重複するuser_idがある場合
これらのケースを別の方法で処理したい場合は、要件に応じて修正できます。例えば:
- すべてのレコードを保持したい場合は、user_idとpurchase_datetimeの組み合わせを主キーにする
- 重複レコードをエラーとして扱いたい場合は、別のテーブルにエラーログを出力する などの対応が可能です。
ChatGPT 4o での結果
BigQueryでは、データベースシステムのように明示的な主キー制約をサポートしていません。ただし、主キーとして機能する一意性を保証するカラムを作成し、その一意性を維持するロジックをSQL内で実装することが一般的です。
Dataformでの対応として以下の方法を提案します:
修正版: 主キーを指定する方法
config内でuniqueKeyプロパティを指定する
Dataformでは、configセクションにuniqueKeyプロパティを設定することで主キーを仮想的に定義できます。このプロパティは、データがリフレッシュされるたびに一意性が保証されるカラムを指定します。
SQLで一意性を保証するためのロジックを追加
主キーに重複が生じないよう、データ処理中にROW_NUMBER()やDISTINCTを活用します。
config {
type: "table",
description: "外部テーブルを基に加工したテーブル",
dependencies: ["create_external_table"], -- 依存関係を指定
uniqueKey: "user_id" -- 主キーを指定
}
WITH filtered_data AS (
SELECT
*,
ROW_NUMBER() OVER () AS row_num -- 各行に行番号を付与
FROM
`project_id.dataset_id.spreadsheet_table`
),
deduplicated_data AS (
SELECT
SAFE_CAST(CAST(A AS STRING) AS INT64) AS user_id, -- ユーザーIDを整数型にキャスト
SAFE.PARSE_DATETIME('%Y-%m-%d', CAST(B AS STRING)) AS purchase_date, -- 日時を日付型にキャスト
SAFE_CAST(REPLACE(CAST(C AS STRING), ',', '') AS INT64) AS purchase_amount, -- カンマを除去して整数型にキャスト
CAST(D AS STRING) AS item_name, -- アイテム名は文字列型のまま
row_num
FROM
filtered_data
WHERE
row_num > 1 -- 1行目(ヘッダー)を除外
)
SELECT
user_id,
purchase_date,
purchase_amount,
item_name
FROM
deduplicated_data
GROUP BY
user_id, purchase_date, purchase_amount, item_name
HAVING
COUNT(user_id) = 1 -- 重複を防ぐロジック(主キーの一意性を保証)
修正内容のポイント
1.uniqueKeyを設定:
- uniqueKeyプロパティにuser_idを指定しました。
- Dataformでは、このプロパティを使用してデータの一意性が強調されますが、BigQuery自体が制約を設けるわけではないことに注意してください。
2.一意性の保証:
- 重複データが発生しないようにHAVING COUNT(user_id) = 1を使用しています。
- 万が一、user_idが重複するデータが存在した場合、Dataformの処理時点で削除されます。
3.重複を除外するロジック:
- 必要に応じて、ROW_NUMBER()やDISTINCTを活用し、重複データを除外しました。
4.加工データのみをSELECT:
- 最終出力では、データをuser_idに基づいて整理し、主キーとして機能するカラムを保持しました。
ケース3:DataformにおけるSQL作成
Dataformの良いところは、テーブル同士の関係をわかりやすくグラフとして可視化できる点ですが、それにはDataformの構文を用いる必要があります。Dataformで用いる一般的なSQLについても質問してみました。
渡したインプット
Dataformで以下の2つの入力テーブルからデータを取得して出力テーブルを作成するSQLを生成してください
○入力テーブル1
- プロジェクト名:dummy
- データセット名:dummyA
- テーブル名:input
- カラム構成
- date
- timestamp
- url
- user_id
○入力テーブル2
- プロジェクト名:dummy
- データセット名:dummyB
- テーブル名:input
- カラム構成
- user_id
- user_data
- hidden_flag
○出力テーブル
- プロジェクト名:dummy
- データセット名:dummyC
- テーブル名:output
- カラム構成
- user_id
- user_data
- url_path
- timestamp
- 条件
- 同じuser_idで入力テーブルをjoinし、user_idごとにtimestampが新しいものを1件のみ残す
- 入力テーブル1に存在しないuser_idは使用しない。入力テーブル2に存在しない場合のuser_dataは(未設定)とする
- urlからurl_pathを抽出する処理は関数として定義して利用する
- その他
- 出力テーブルはtimestampでpartitionを作成する
こちらも初回の回答でそれらしい結果が得られました。
ただし${ref()}
でテーブル名を指定しているClaudeの方が一枚上手に見えます。
(ChatGPTのようにテーブル名を直書きするとテーブル同士の関係性をグラフとして可視化できない)
Claude 3.5 Sonnet での結果
config {
type: "table",
schema: "dummyC",
name: "output",
description: "Transform and combine data from dummyA.input and dummyB.input",
columns: {
user_id: "User ID",
user_data: "User data from dummyB",
url_path: "Extracted path from URL",
timestamp: "Timestamp of the record"
},
bigquery: {
partitionBy: "TIMESTAMP_TRUNC(timestamp, DAY)"
}
}
CREATE TEMP FUNCTION extract_url_path(url STRING)
RETURNS STRING
AS (
-- URLからパスを抽出する関数
REGEXP_EXTRACT(url, r'^(?:https?:\/\/)?(?:[^\/]+)(\/.*)$')
);
SELECT
t1.user_id,
COALESCE(t2.user_data, '(未設定)') as user_data,
extract_url_path(t1.url) as url_path,
t1.timestamp
FROM (
-- 入力テーブル1からuser_idごとに最新のレコードを取得
SELECT
user_id,
url,
timestamp,
ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY timestamp DESC) as rn
FROM ${ref("dummy", "dummyA", "input")}
) t1
LEFT JOIN (
-- 入力テーブル2から非表示フラグがないレコードを取得
SELECT
user_id,
user_data
FROM ${ref("dummy", "dummyB", "input")}
WHERE hidden_flag IS NULL OR NOT hidden_flag
) t2
ON t1.user_id = t2.user_id
WHERE t1.rn = 1 -- 最新レコードのみを選択
このSQLでは以下の処理を行っています:
1.テーブル設定
- configブロックで出力テーブルの設定を定義
- timestampによるパーティション化を設定
2.URL処理関数
- extract_url_path関数を定義してURLからパスを抽出
3.データ処理
- 入力テーブル1からuser_idごとに最新のレコードを取得
- 入力テーブル2とLEFT JOINで結合
- user_dataが存在しない場合は「(未設定)」を設定
4.その他の特徴
- ref関数を使用して入力テーブルを参照
- ROW_NUMBER()を使用して最新レコードの選択を実現
- クエリの可読性を考慮してサブクエリを活用
このSQLを編集する必要がある場合や、特定の部分について質問がありましたらお知らせください。
ChatGPT 4o での結果
-- 出力テーブルの定義
config {
type: "table",
schema: "dummyC",
name: "output",
partitions: ["timestamp"]
}
-- URLからurl_pathを抽出する関数
CREATE TEMP FUNCTION extract_url_path(url STRING) RETURNS STRING AS (
REGEXP_EXTRACT(url, r"https?://[^/]+(/.*)")
);
-- SQL処理開始
WITH input_table1 AS (
SELECT
date,
timestamp,
url,
user_id
FROM
`dummy.dummyA.input`
),
input_table2 AS (
SELECT
user_id,
user_data,
hidden_flag
FROM
`dummy.dummyB.input`
),
-- テーブル1とテーブル2をJOINし、user_dataがない場合は「未設定」とする
joined_data AS (
SELECT
t1.user_id,
COALESCE(t2.user_data, "未設定") AS user_data,
extract_url_path(t1.url) AS url_path,
t1.timestamp
FROM
input_table1 t1
LEFT JOIN
input_table2 t2
ON
t1.user_id = t2.user_id
),
-- user_idごとに最新のtimestampを持つ行を取得
latest_data AS (
SELECT
user_id,
user_data,
url_path,
timestamp
FROM (
SELECT
*,
ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY timestamp DESC) AS row_num
FROM
joined_data
)
WHERE
row_num = 1
)
-- 出力テーブルにデータをINSERT
SELECT
user_id,
user_data,
url_path,
timestamp
FROM
latest_data
説明
1.extract_url_path関数:
- URLからパス部分を抽出するための正規表現を用いた関数を作成しています。
- 例: https://example.com/path → /path
2.joined_data CTE:
- 入力テーブル1 (dummyA.input) と入力テーブル2 (dummyB.input) を user_id で結合します。
- 入力テーブル2に対応する user_id が存在しない場合は、COALESCEを使用してuser_dataに「未設定」を設定します。
3.latest_data CTE:
- 各 user_id ごとに timestamp が最新の1件を取得します。
- ROW_NUMBER() を使用して、timestampで降順ソートした最初の行を選択しています。
4.出力テーブルの設定:
- timestamp を PARTITION として設定しています。
- このスクリプトをDataformプロジェクトに追加することで、条件を満たす出力テーブルが生成されます。
結果
個人的な評価としては以下の通りです
Claude 3.5 Sonnet | ChatGPT 4o | |
---|---|---|
ケース1 | △ | ✗ |
ケース2 | ○ | △ |
ケース3 | ○ | △ |
所感
- 基本的なSQLの生成についてはどちらも可能ですが、DataformのSQLXという部分に特化して考えるとClaudeの方に軍配が上がりました。ここまで精度が変わると思っていませんでしたので両方とも試して良かったです。
- ただしClaudeも一回で正解を出してくれる訳ではないので、出力したものに関する確認は必須となります。一度出力されてからがスタートくらいのイメージでいると良さそうです。
- Claudeが主キー設定する際に、assertionも同時に設定しているところはなるほどと感じました。私はDataformのassertionをうまく活用できていないですが、assertionのおすすめ設定をClaudeに聞けばデータ基盤がより強固なものになりそうな気がします。
- Dataform関連の深い話をできる相手が人間では見つからない時には、Claudeに雑談相手になってもらうのもありかもしれないです。
今後試したいこと
今回はDataformで使うSQLXについて単体で質問しましたが、もう少し広げてデータ基盤のアーキテクチャを部分的に相談するのも面白いかもと感じました。最近はBigQueryとDataformが中心で他のデータ基盤関連サービスを使う機会が減っていますが、他のサービスもすぐに導入できるようClaudeと一緒に調べてみようと思いました。
-
弊社のデータ基盤にDataformを導入した際の詳細は、こちらの記事にまとめてあります。https://findy-tools.io/products/dataform/378/311 ↩︎
Discussion