⚒️

Dataform + BigQuery MLで Let's 機械学習パイプライン

2023/05/30に公開

先日 GA になった Dataform と BigQuery ML を組み合わせたら機械学習パイプラインを組めるのではと思い触ってみました!

Dataform は BigQuery の Transform 周りで優秀な dbt の対抗サービスになりそうです。
VPC Service Controls がまだ対応していない点(2023.05 時点)は非常に惜しいですが、今後のアップデート含め期待がもてるサービスですね。

今回は Dataform は実際に触ってみるのも含め下記のようなパイプラインを構築しました。基本的な情報や使い方はこちらの記事が非常にわかりやすくまとまっているのでおすすめです。

Overview

Dataform と BQ ML を活用した ML パイプラインについて検証

  • データのロードから加工の前処理フェーズと学習から推論の機械学習フェーズについて Dataform + BigQuery ML で実現できました
  • Dataform のワークフロー定期実行についても組み込みの Workflow Configurations が便利です、また環境ごとの制御については Release Configurations で簡単に処理対象を変更できました
  • モデルの評価結果もテーブルに格納して、その結果を Looker Studio で可視化できると精度劣化が起きていないかも確認できそうです

  • 今までは BigQuery Scripting で長い SQL を記述して Scheduled Query で定期実行していましたが、メンテンナンス性が低い部分が気になっていました(長い SQL の一部を変更したいだけなのに見るスコープが広くなってしまう)
    ⇒ Dataform で細かく処理を分ける & 必要に応じて品質チェックをかますといったことをしやすくなることで 処理の再利用性も高まりますしメンテナンスする際にも対応したいスコープを狭めることもできる(特定の SQLX ファイルのみ見ればよい) のが良いなと思いました
  • 処理グラフの可視化も綺麗で一連の処理の関連性が見えるのも良い点です
  • BigQuery Scripting がワークフローで利用できない点VPC Service Controls が未対応の点はアップデートを期待したいところです
    ⇒ (2023.05.30 追記) BigQuery Scripting は type を operations に変更することで利用可能でした!

キーワード

ML パイプラインの構築

今回は BigQuery ML のチュートリアルを題材に Dataform でパイプラインを構築していこうと思います。

ファイル構成はこんな感じです。

definitions
 |- load
 |   |- raw_data_for_all.sqlx
 |
 |- transform
 |   |- view_for_learn.sqlx
 |   |- view_for_evaluate.sqlx
 |   |- view_for_inference.sqlx
 |
 |- ml
     |- create_model.sqlx
     |- evaluate_model.sqlx
     |- predict_model_v1.sqlx
     |- predict_model_v2.sqlx

データの準備

チュートリアル通りに Google Analytics Sample を利用します。ただ、 US リージョンでの利用になるのですが東京リージョンで利用したいので、雑にファイルに落として東京リージョンの Cloud Storage にアップロードする形にしました。

最低限のデータ量で試したいので、カラム指定とレコード数指定で抽出。
データの期間はテーブル日付とワイルドカードで指定。

  • 学習:ga_sessions_2017051* 300レコード
  • 検証:ga_sessions_2017052* 200レコード
  • 推論:ga_sessions_2017061* 100レコード
SELECT
  totals.transactions AS transactions,
  device.operatingSystem AS os,
  device.isMobile AS is_mobile,
  geoNetwork.country AS country,
  totals.pageviews AS pageviews,
  fullVisitorId,
  date
FROM bigquery-public-data.google_analytics_sample.ga_sessions_2017051*
LIMIT 300;

データのロード

データのロードについて、SQL ステートメントで可能になったのでこちらを利用します。

definitions/load/raw_data_for_all.sqlx に下記の SQL を記述。オプションについて少し解説します。

  • hasOutput:OperationConfig の設定項目で参照可能なテーブルを作成できます。

ref 関数を使用して参照可能テーブルを作成することを宣言します。
true に設定すると、このアクションは self() コンテキスト関数を使用して、構成済みの名前でテーブルを作成します。例: create or replace table ${self()} as select ...

definitions/load/raw_data_for_all.sqlx
config {
  type: "operations", 
  schema: "dataform",
  name: "load_raw_for_all",
  tags: "ML_PIPELINE",
  hasOutput: true
}

LOAD DATA OVERWRITE ${self()}
(
  transactions STRING,
  operationsysytem STRING,
  mobile BOOL,
  country STRING,
  pageviews INTEGER,
  fullVisitorId STRING,
  date INTEGER
)
FROM FILES (
  skip_leading_rows=1,
  format = 'CSV',
  uris = ['gs://dataform-test/ga_sessions_raw.csv']);

データの加工

学習および推論時に必要となる形式の View を作成します。(Dataform の練習も兼ねているのであえて加工後の View を作成します。)

データ加工の SQL は defenitions/transform/view_for_hogehoge.sqlxを下記のようにしました。これを日付を調整して学習用(for learn) / 評価用(for evaluate) / 推論用(for inference)ということで 3 つ View を作成しています。

defenitions/transform/view_for_hogehoge.sqlx
config {
  type: "view", 
  schema: "dataform",
  name: "transform_for_learn",
  tags: "ML_PIPELINE"
}

CREATE OR REPLACE VIEW ${self()} AS
SELECT
  IF(transactions IS NULL, 0, 1) AS label,
  IFNULL(operationsysytem, "") AS os,
  mobile AS is_mobile,
  IFNULL(country, "") AS country,
  IFNULL(pageviews, 0) AS pageviews,
  fullVisitorId
FROM  ${ref("load_raw_for_all")}
WHERE 201705xx <= date and date < 201705xx 

モデルの学習

モデルに食わせるデータを準備できたので学習させます。チュートリアルに従ってロジスティック回帰を BigQuery ML で実装しています。

definitions/ml/craete_model.sqlx に記述していて、学習用データを参照しています。 ref 関数を利用することで依存関係を簡単に定義できるのはいいですね。

definitions/ml/craete_model.sqlx
config {
  type: "operations", 
  schema: "dataform",
  name: "ml_create_logistic_model",
  tags: "ML_PIPELINE",
  hasOutput: true
}

CREATE OR REPLACE MODEL ${self()}
OPTIONS(
  model_type='logistic_reg',
  DATA_SPLIT_METHOD='random',
  DATA_SPLIT_EVAL_FRACTION=0.20
) AS
SELECT
  *
FROM
  ${ref("transform_for_learn")}

モデルの検証

学習済みモデルの評価をやっておきたいので学習後のフェーズとして実装します。

defenitions/ml/evaluate_model.sqlx
config {
  type: "table", 
  schema: "dataform",
  name: "ml_evaluate_result",
  tags: "ML_PIPELINE"
}

CREATE OR REPLACE TABLE ${self()} AS
SELECT
  *
FROM 
  ML.EVALUATE(MODEL ${ref("ml_create_logistic_model")}, (
SELECT
  *
FROM
  ${ref("transform_for_evaluate")}))

ポイント

工夫した点として機械学習モデルの学習後に実施したいため、defenitions/ml/evaluate_model.sqlx で明示的に依存関係を定義する必要がありました。

最終的には、ML.EVALUATE 関数の引数に学習フェーズの生成物であるモデルを MODEL ${ref("ml_create_logistic_model")} という形で指定して実現しました。

参照できるのは table や view だけではないという点を確認できたのは良かったです!

モデルの推論 - 案① -

最後にモデルの推論フェーズです。こちらも評価フェーズの後に実施したいので少し工夫して依存関係を定義しています。

BigQuery Scripting を合わせて利用します。評価フェーズで取得できる AUC の値が閾値より大きければ推論を実施してテーブルに格納します。また、ML.PREDICT 関数についても引数にはref 関数を利用した受け渡しが可能になっています。

defenitions/ml/predict_model_v1.sqlx
config {
  type: "table", 
  schema: "dataform",
  name: "ml_predict_result_v1",
  tags: "ML_PIPELINE"
}

DECLARE auc FLOAT64;

SET auc = (
  SELECT
    roc_auc
  FROM
    ${ref("ml_evaluate_result")}
);

SELECT auc;

IF (auc > 0.7) THEN
  CREATE OR REPLACE TABLE ${self()} AS
  SELECT
    country,
    SUM(predicted_label) as total_predicted_purchases
  FROM
    ML.PREDICT(MODEL ${ref("ml_create_logistic_model")}, (
      SELECT
        os
        , is_mobile
        , pageviews
        , country
        , fullVisitorId
      FROM
        ${ref("transform_for_inference")}))
  GROUP BY country
  ORDER BY total_predicted_purchases DESC;
END IF



・・・・
というふうに無理やり依存関係を定義するために BigQuery Scripting を挟んだら下記のエラーとなりました。

Dataform の CODE 画面で SQL を実行するだけなら、BigQuery Scripting の実行は通るのですが EXECUTE 画面で一連のワークフローの中で実行するとエラーとなりました。
⇒ (2023.05.30 追記) こちらのエラーは type を table にしていることが原因でした。type を operations にすることで通りました。

モデルの推論 - 案② -

よりシンプルな方法で依存関係をしたいと思いドキュメントを読んでいたらありました!

config ブロックで dependencies 項目にリスト形式で依存関係を定義したい name を設定しておくと明示的に宣言できます。

defenitions/ml/predict_model_v1.sqlx
config {
  type: "table", 
  schema: "dataform",
  tags: "ML_PIPELINE",
  name: "ml_predict_result_v1",
  dependencies: ["ml_evaluate_result"]
}

SELECT
  country,
  SUM(predicted_label) as total_predicted_purchases
FROM
  ML.PREDICT(MODEL ${ref("ml_create_logistic_model")}, (
    SELECT
      os
      , is_mobile
      , pageviews
      , country
      , fullVisitorId
    FROM
      ${ref("transform_for_inference")}))
GROUP BY country
ORDER BY total_predicted_purchases DESC
defenitions/ml/predict_model_v2.sqlx
config {
  type: "table", 
  schema: "dataform",
  tags: "ML_PIPELINE",
  name: "ml_predict_result_v2",
  dependencies: ["ml_evaluate_result"]
}

SELECT
  fullVisitorId,
  SUM(predicted_label) as total_predicted_purchases
FROM
  ML.PREDICT(MODEL ${ref("ml_create_logistic_model")}, (
    SELECT
      os
      , is_mobile
      , pageviews
      , country
      , fullVisitorId
    FROM
      ${ref("transform_for_inference")})
  )
GROUP BY fullVisitorId
ORDER BY total_predicted_purchases DESC

ここまでデータのロードと加工およびモデルの学習と推論から一通りできるようになりました。途中途中にデータの品質チェックなども入れるとなおよいかもしれません。

こちらを定義できると Cloud Storage のファイルをデータ量を変更したタイミングで一通り実行すれば取得した結果を得られると思います。

パイプラインの実行

一連のパイプラインを実行したい場合には、 Console 画面の START EXECUTE をクリックします。選択肢には ActionsTags があって Tags から ML_PIPELINE を選択します。

選択すると EXECUTES から実行ログを見ることができます。
(色々とハマったポイントがあって、失敗ログが増えました。)

Tag に同一項目が設定されていると、実行対象となります。さらに「依存関係を含めた実行」「独立に実行」など実行におけるオプションも選択することができます。

今回は START EXECUTETagsML_PIPELINEInclude dependencies という形で実行しています。

ハマったポイント

上記の失敗ログからトラブルシュートをまとめます。

権限不足

Dataform のサービスアカウントに適切な権限を付与することが解決しました。

コメントアウト

type: "table" を宣言した上で余計なコメントアウトを残していたらエラーに。
削除したら解決しました。

CREATE 文

type: "table" を宣言した上で CREATE TABLE 文を使っていたらエラーに。 CREATE TABLE 文を削除して SELECT 文から始めたら解決しました。

BigQuery Scripting

type: "table" を宣言した上で BigQuery Scripting を利用したらエラーに。
type: "table" のままで BigQuery Scripting 部分を削除するか、もしくは tyep: "operations" することで解決できました。

パイプラインの実行環境制御

Dataform には Release Configuration というものがあり、構築したワークフローの実行対象であるプロジェクト ID やスキーマを実行環境ごとに制御することが可能のようです。

今回は同一プロジェクトの別データセットに処理結果を格納するようにします。
デフォルトでは dataform というデータセットに処理結果が格納されていきますが、Release Configuration を利用して dataform_v2 に結果を格納してみます。

schema suffix に v2 と設定することで dataform_v2 が処理対象になります。
下記のように簡単に処理結果の複製ができました!!これは使い勝手がいいですね!!

まとめ

BigQuery ML のチュートリアルを題材に Dataform を触ってみました。
想定した通り BigQuery ML を中心とした機械学習パイプラインが構築できそうなのがわかりました。

また、冒頭に書いた通り、今まで BigQuery Scripting を利用して長い SQL に記述していたことで気になっていたメンテナンス性や再利用性が、Dataform では細切れに処理を記述することで向上しそうだと感じました。

VPC Service Controls に対応されたらぜひ導入してみたいサービスですね!!

参考記事

さいごに

AWS と Google Cloud で構築したデータ基盤の開発・運用に携わっているデータエンジニアです。5 年くらい携わっていて、この業務がきっかけで Google Cloud が好きになりました。

Google Cloud 関連の情報を発信をしています。

https://twitter.com/pHaya72

Discussion