Dataform + BigQuery MLで Let's 機械学習パイプライン
先日 GA になった Dataform と BigQuery ML を組み合わせたら機械学習パイプラインを組めるのではと思い触ってみました!
Dataform は BigQuery の Transform 周りで優秀な dbt の対抗サービスになりそうです。
VPC Service Controls がまだ対応していない点(2023.05 時点)は非常に惜しいですが、今後のアップデート含め期待がもてるサービスですね。
今回は Dataform は実際に触ってみるのも含め下記のようなパイプラインを構築しました。基本的な情報や使い方はこちらの記事が非常にわかりやすくまとまっているのでおすすめです。
Overview
Dataform と BQ ML を活用した ML パイプラインについて検証
- データのロードから加工の前処理フェーズと学習から推論の機械学習フェーズについて Dataform + BigQuery ML で実現できました
- Dataform のワークフロー定期実行についても組み込みの Workflow Configurations が便利です、また環境ごとの制御については Release Configurations で簡単に処理対象を変更できました
- モデルの評価結果もテーブルに格納して、その結果を Looker Studio で可視化できると精度劣化が起きていないかも確認できそうです
- 今までは BigQuery Scripting で長い SQL を記述して Scheduled Query で定期実行していましたが、メンテンナンス性が低い部分が気になっていました(長い SQL の一部を変更したいだけなのに見るスコープが広くなってしまう)
⇒ Dataform で細かく処理を分ける & 必要に応じて品質チェックをかますといったことをしやすくなることで 処理の再利用性も高まりますしメンテナンスする際にも対応したいスコープを狭めることもできる(特定の SQLX ファイルのみ見ればよい) のが良いなと思いました - 処理グラフの可視化も綺麗で一連の処理の関連性が見えるのも良い点です
-
BigQuery Scripting がワークフローで利用できない点や VPC Service Controls が未対応の点はアップデートを期待したいところです
⇒ (2023.05.30 追記) BigQuery Scripting は type を operations に変更することで利用可能でした!
キーワード
ML パイプラインの構築
今回は BigQuery ML のチュートリアルを題材に Dataform でパイプラインを構築していこうと思います。
ファイル構成はこんな感じです。
definitions
|- load
| |- raw_data_for_all.sqlx
|
|- transform
| |- view_for_learn.sqlx
| |- view_for_evaluate.sqlx
| |- view_for_inference.sqlx
|
|- ml
|- create_model.sqlx
|- evaluate_model.sqlx
|- predict_model_v1.sqlx
|- predict_model_v2.sqlx
データの準備
チュートリアル通りに Google Analytics Sample
を利用します。ただ、 US リージョンでの利用になるのですが東京リージョンで利用したいので、雑にファイルに落として東京リージョンの Cloud Storage にアップロードする形にしました。
最低限のデータ量で試したいので、カラム指定とレコード数指定で抽出。
データの期間はテーブル日付とワイルドカードで指定。
- 学習:ga_sessions_2017051* 300レコード
- 検証:ga_sessions_2017052* 200レコード
- 推論:ga_sessions_2017061* 100レコード
SELECT
totals.transactions AS transactions,
device.operatingSystem AS os,
device.isMobile AS is_mobile,
geoNetwork.country AS country,
totals.pageviews AS pageviews,
fullVisitorId,
date
FROM bigquery-public-data.google_analytics_sample.ga_sessions_2017051*
LIMIT 300;
データのロード
データのロードについて、SQL ステートメントで可能になったのでこちらを利用します。
definitions/load/raw_data_for_all.sqlx
に下記の SQL を記述。オプションについて少し解説します。
- hasOutput:OperationConfig の設定項目で参照可能なテーブルを作成できます。
ref 関数を使用して参照可能テーブルを作成することを宣言します。
true に設定すると、このアクションは self() コンテキスト関数を使用して、構成済みの名前でテーブルを作成します。例: create or replace table ${self()} as select ...
config {
type: "operations",
schema: "dataform",
name: "load_raw_for_all",
tags: "ML_PIPELINE",
hasOutput: true
}
LOAD DATA OVERWRITE ${self()}
(
transactions STRING,
operationsysytem STRING,
mobile BOOL,
country STRING,
pageviews INTEGER,
fullVisitorId STRING,
date INTEGER
)
FROM FILES (
skip_leading_rows=1,
format = 'CSV',
uris = ['gs://dataform-test/ga_sessions_raw.csv']);
データの加工
学習および推論時に必要となる形式の View を作成します。(Dataform の練習も兼ねているのであえて加工後の View を作成します。)
データ加工の SQL は defenitions/transform/view_for_hogehoge.sqlx
を下記のようにしました。これを日付を調整して学習用(for learn) / 評価用(for evaluate) / 推論用(for inference)ということで 3 つ View を作成しています。
config {
type: "view",
schema: "dataform",
name: "transform_for_learn",
tags: "ML_PIPELINE"
}
CREATE OR REPLACE VIEW ${self()} AS
SELECT
IF(transactions IS NULL, 0, 1) AS label,
IFNULL(operationsysytem, "") AS os,
mobile AS is_mobile,
IFNULL(country, "") AS country,
IFNULL(pageviews, 0) AS pageviews,
fullVisitorId
FROM ${ref("load_raw_for_all")}
WHERE 201705xx <= date and date < 201705xx
モデルの学習
モデルに食わせるデータを準備できたので学習させます。チュートリアルに従ってロジスティック回帰を BigQuery ML で実装しています。
definitions/ml/craete_model.sqlx
に記述していて、学習用データを参照しています。 ref 関数を利用することで依存関係を簡単に定義できるのはいいですね。
config {
type: "operations",
schema: "dataform",
name: "ml_create_logistic_model",
tags: "ML_PIPELINE",
hasOutput: true
}
CREATE OR REPLACE MODEL ${self()}
OPTIONS(
model_type='logistic_reg',
DATA_SPLIT_METHOD='random',
DATA_SPLIT_EVAL_FRACTION=0.20
) AS
SELECT
*
FROM
${ref("transform_for_learn")}
モデルの検証
学習済みモデルの評価をやっておきたいので学習後のフェーズとして実装します。
config {
type: "table",
schema: "dataform",
name: "ml_evaluate_result",
tags: "ML_PIPELINE"
}
CREATE OR REPLACE TABLE ${self()} AS
SELECT
*
FROM
ML.EVALUATE(MODEL ${ref("ml_create_logistic_model")}, (
SELECT
*
FROM
${ref("transform_for_evaluate")}))
ポイント
工夫した点として機械学習モデルの学習後に実施したいため、defenitions/ml/evaluate_model.sqlx
で明示的に依存関係を定義する必要がありました。
最終的には、ML.EVALUATE 関数の引数に学習フェーズの生成物であるモデルを MODEL ${ref("ml_create_logistic_model")}
という形で指定して実現しました。
参照できるのは table や view だけではないという点を確認できたのは良かったです!
モデルの推論 - 案① -
最後にモデルの推論フェーズです。こちらも評価フェーズの後に実施したいので少し工夫して依存関係を定義しています。
BigQuery Scripting を合わせて利用します。評価フェーズで取得できる AUC の値が閾値より大きければ推論を実施してテーブルに格納します。また、ML.PREDICT 関数についても引数にはref 関数を利用した受け渡しが可能になっています。
config {
type: "table",
schema: "dataform",
name: "ml_predict_result_v1",
tags: "ML_PIPELINE"
}
DECLARE auc FLOAT64;
SET auc = (
SELECT
roc_auc
FROM
${ref("ml_evaluate_result")}
);
SELECT auc;
IF (auc > 0.7) THEN
CREATE OR REPLACE TABLE ${self()} AS
SELECT
country,
SUM(predicted_label) as total_predicted_purchases
FROM
ML.PREDICT(MODEL ${ref("ml_create_logistic_model")}, (
SELECT
os
, is_mobile
, pageviews
, country
, fullVisitorId
FROM
${ref("transform_for_inference")}))
GROUP BY country
ORDER BY total_predicted_purchases DESC;
END IF
・・・・
というふうに無理やり依存関係を定義するために BigQuery Scripting を挟んだら下記のエラーとなりました。
Dataform の CODE 画面で SQL を実行するだけなら、BigQuery Scripting の実行は通るのですが EXECUTE 画面で一連のワークフローの中で実行するとエラーとなりました。
⇒ (2023.05.30 追記) こちらのエラーは type を table にしていることが原因でした。type を operations にすることで通りました。
モデルの推論 - 案② -
よりシンプルな方法で依存関係をしたいと思いドキュメントを読んでいたらありました!
config
ブロックで dependencies
項目にリスト形式で依存関係を定義したい name
を設定しておくと明示的に宣言できます。
config {
type: "table",
schema: "dataform",
tags: "ML_PIPELINE",
name: "ml_predict_result_v1",
dependencies: ["ml_evaluate_result"]
}
SELECT
country,
SUM(predicted_label) as total_predicted_purchases
FROM
ML.PREDICT(MODEL ${ref("ml_create_logistic_model")}, (
SELECT
os
, is_mobile
, pageviews
, country
, fullVisitorId
FROM
${ref("transform_for_inference")}))
GROUP BY country
ORDER BY total_predicted_purchases DESC
config {
type: "table",
schema: "dataform",
tags: "ML_PIPELINE",
name: "ml_predict_result_v2",
dependencies: ["ml_evaluate_result"]
}
SELECT
fullVisitorId,
SUM(predicted_label) as total_predicted_purchases
FROM
ML.PREDICT(MODEL ${ref("ml_create_logistic_model")}, (
SELECT
os
, is_mobile
, pageviews
, country
, fullVisitorId
FROM
${ref("transform_for_inference")})
)
GROUP BY fullVisitorId
ORDER BY total_predicted_purchases DESC
ここまでデータのロードと加工およびモデルの学習と推論から一通りできるようになりました。途中途中にデータの品質チェックなども入れるとなおよいかもしれません。
こちらを定義できると Cloud Storage のファイルをデータ量を変更したタイミングで一通り実行すれば取得した結果を得られると思います。
パイプラインの実行
一連のパイプラインを実行したい場合には、 Console 画面の START EXECUTE
をクリックします。選択肢には Actions
と Tags
があって Tags
から ML_PIPELINE
を選択します。
選択すると EXECUTES
から実行ログを見ることができます。
(色々とハマったポイントがあって、失敗ログが増えました。)
Tag に同一項目が設定されていると、実行対象となります。さらに「依存関係を含めた実行」「独立に実行」など実行におけるオプションも選択することができます。
今回は START EXECUTE
⇒ Tags
⇒ ML_PIPELINE
⇒ Include dependencies
という形で実行しています。
ハマったポイント
上記の失敗ログからトラブルシュートをまとめます。
権限不足
Dataform のサービスアカウントに適切な権限を付与することが解決しました。
コメントアウト
type: "table"
を宣言した上で余計なコメントアウトを残していたらエラーに。
削除したら解決しました。
CREATE 文
type: "table"
を宣言した上で CREATE TABLE
文を使っていたらエラーに。 CREATE TABLE
文を削除して SELECT
文から始めたら解決しました。
BigQuery Scripting
type: "table"
を宣言した上で BigQuery Scripting を利用したらエラーに。
type: "table"
のままで BigQuery Scripting 部分を削除するか、もしくは tyep: "operations"
することで解決できました。
パイプラインの実行環境制御
Dataform には Release Configuration というものがあり、構築したワークフローの実行対象であるプロジェクト ID やスキーマを実行環境ごとに制御することが可能のようです。
今回は同一プロジェクトの別データセットに処理結果を格納するようにします。
デフォルトでは dataform というデータセットに処理結果が格納されていきますが、Release Configuration を利用して dataform_v2 に結果を格納してみます。
schema suffix に v2 と設定することで dataform_v2 が処理対象になります。
下記のように簡単に処理結果の複製ができました!!これは使い勝手がいいですね!!
まとめ
BigQuery ML のチュートリアルを題材に Dataform を触ってみました。
想定した通り BigQuery ML を中心とした機械学習パイプラインが構築できそうなのがわかりました。
また、冒頭に書いた通り、今まで BigQuery Scripting を利用して長い SQL に記述していたことで気になっていたメンテナンス性や再利用性が、Dataform では細切れに処理を記述することで向上しそうだと感じました。
VPC Service Controls に対応されたらぜひ導入してみたいサービスですね!!
参考記事
さいごに
AWS と Google Cloud で構築したデータ基盤の開発・運用に携わっているデータエンジニアです。5 年くらい携わっていて、この業務がきっかけで Google Cloud が好きになりました。
Google Cloud 関連の情報を発信をしています。
Discussion