🎁

DataformでBigQueryのデータパイプラインを構築する

2020/12/24に公開

https://qiita.com/advent-calendar/2020/bigquery

BigQuery の COVID-19 一般公開データセットをもとに、Google Cloud ファミリーの一員となったDataformで BigQuery のデータパイプラインを組んでみたので紹介します。

Dataform とは

https://dataform.co/

Dataform は、従来の ETL パイプラインではなくELT パイプラインで役立つツールです。
ELT パイプラインとは、データソースから抽出(Extract)した生データをそのままデータウェアハウス(DWH)へ読み込み(Load)、DWH 内で目的に応じたテーブル構造に変換(Transform)する一連の流れのことを指します。Dataform は、ELT パイプラインにおけるTransformに特化したツールです。

Google Cloud 傘下に

2020 年 12 月 8 日(米国時間)、Google Cloud が Dataform を買収したというアナウンスがありました。同時に、すべてのユーザーが無料で Dataform を使用できるようになりました。

Google Cloud に Dataform を迎え入れることを嬉しく思います。組織全体ですべてのユーザーが分析情報を利用できるようにするという使命を引き続き遂行してまいります。このたび、すべてのユーザーに Dataform のサービスを無料で提供することにしました。今後、Dataform と BigQuery の優れた機能を組み合わせて利用できるようになることを楽しみにしています。

実際に試してみる

今回のハンズオンで作成した Dataform のソースコードはこちらです。
https://github.com/OTA2000/dataform_advent_calendar

前提

  • ソースデータとしてCOVID-19 Public Datasets(一般公開データ)を使用します
    • 大抵のユースケースでは事前に BigQuery にソースデータを格納するパイプラインを組む必要があります(今回は省略)

今回やったこと

  • Dataform プロジェクトの立ち上げ
  • ソースデータを定義する SQLX を作成する
  • ビューを定義する SQLX を作成する
  • パーティションテーブルを定義する SQLX を作成する
  • 通常のテーブルを定義する SQLX を作成する
  • 各テーブルの依存関係を確認する
  • スケジュールを組んで定時実行させる

Dataform プロジェクトの立ち上げ

[GCP] サービスアカウントを作成する

対象の GCP プロジェクトでサービスアカウントを作成しBigQuery 管理者ロールを付与します。


サービスアカウントを作成したら JSON の秘密鍵をダウンロードして保管します。
※Dataform から発行される BigQuery のジョブはすべてこのサービスアカウントで実行されます。

[Dataform] ユーザーアカウントを作成する

以下のリンクから Dataform のユーザーアカウントを作成します
https://app.dataform.co/

[Dataform] プロジェクトを作成する

サインアップが完了したら新規プロジェクトを作成します。

  1. 任意のプロジェクト名を入力する
  2. 接続対象の GCP プロジェクト ID を入力する
  3. データセットを作成するロケーションとサービスアカウントキーを指定して BigQuery と接続する

(オプション) 任意の Git プロバイダでバージョン管理する

Dataform 上で作成するソースコードは Git でバージョン管理されます。Git 管理のプロバイダを任意のもの(GitHub, GitLab, Azure DevOps)に移行することも出来ます。
Git プロバイダを移行したい場合、左ペインの Project settings > Version control から設定してください(空のレポジトリとアクセストークンの用意が別途必要)。

SQLX とは

Dataform では SQLX という形式でテーブルやビューを定義します。まずは、SQLX の概要や記法を理解しましょう。
https://docs.dataform.co/introduction/dataform-in-5-minutes
https://docs.dataform.co/guides/sqlx

ソースデータを定義する SQLX を作成する

以下のテーブルをソースデータとして使用することを SQLX で定義します。

  • bigquery-public-data.covid19_open_data.covid19_open_data
  • bigquery-public-data.covid19_public_forecasts.japan_prefecture_28d

例: bigquery-public-data.covid19_open_data.covid19_open_dataをソースデータとして定義する SQLX

definitions/declaration/covid19_open_data.sqlx
config {
  type: "declaration",
  database: "bigquery-public-data",
  schema: "covid19_open_data",
  name: "covid19_open_data"
}

config ブロックのtypeで"declaration"を指定し、読込元のプロジェクト ID(database)・データセット名(schema)・テーブル名(name)を定義します。この宣言により、テーブルやビューを作成する SQLX からこのテーブルを呼び出したり、Dataform に依存関係を解釈させたりすることが出来ます。

ビューを定義する SQLX を作成する

当道府県別の新規感染者数と新規死者数を算出するビューを SQLX で定義し BigQuery 上に作成します。

definitions/japan_actual.sqlx
config {
  type: "view",
  schema: "covid19",
  name: "japan_actual",
  description: "都道府県別感染状況(日別)",
  columns: {
    date: "日",
    prefecture_code: "都道府県コード",
    new_confirmed: "新規感染者数",
    new_deaths: "新規死者数"
  }
}

# standardSQL
SELECT
  date,
  REPLACE(location_key, "_", "-") AS prefecture_code,
  SUM(new_confirmed) AS new_confirmed,
  SUM(new_deceased) AS new_deaths
FROM
  ${ref("covid19_open_data")}
WHERE
  country_code = "JP"
  AND location_key <> "JP"
GROUP BY
  date, prefecture_code

config ブロックでtypeに"view"を指定しビューの説明(description)や各カラムの説明(columns)を定義します。SQL ブロックにはビューで使用する標準 SQL を記述します。
FROM 句で${ref("covid19_open_data")}という形で先程宣言したソースデータを呼び出しています。この形式でテーブル参照を記述することで Dataform が依存関係を解釈することが出来ます。

SQLX を作成すると、右ペインでビューの作成先や依存関係(bigquery-public-data.covid19_open_data.covid19_open_dataに依存している)、コンパイルされたクエリ(${ref("covid19_open_data")}の内容が展開されている)などが表示されます。PREVIEW RESULTSでクエリ結果の確認(実際に BigQuery にクエリジョブが発行されるので課金量など要注意)、CREATE VIEWでビューの作成が出来ます。

パーティションテーブルを定義する SQLX を作成する

bigquery-public-data.covid19_public_forecasts.japan_prefecture_28dから都道府県別感染状況予測(日別)をパーティションテーブルで作成するための SQLX を作成します。

definitions/japan_forecast.sqlx
config {
  type: "incremental",
  schema: "covid19",
  name: "japan_forecast",
  description: "都道府県別感染状況予測(日別)",
  columns: {
    prefecture_code: "都道府県コード",
    prefecture_name_kanji: "都道府県名",
    forecast_date: "予測実施日",
    prediction_date: "予測対象日",
    new_confirmed: "新規感染者数予測",
    new_deaths: "新規死者数予測"
  },
  bigquery: {
    partitionBy: "prediction_date"
  },
  tags: ["daily"]
}

# standardSQL
SELECT
  prefecture_code,
  prefecture_name_kanji,
  forecast_date,  -- 予測実施日
  prediction_date,  -- 予測対象日
  new_confirmed,
  new_deaths,
FROM
  ${ref("japan_prefecture_28d")}
${ when(incremental(), `WHERE forecast_date > (SELECT MAX(forecast_date) FROM ${self()})`) }

config ブロックでtypeに"incremental"を指定することでパーティションテーブルを作成できます。パーティションの詳細についてはbigqueryの中で指定します(partitionByなど)。
初回実行時やフルリフレッシュを指定した場合は以下のクエリジョブが BigQuery に発行され新規テーブルが作成されます。

create or replace table `dataform-advent-calendar.covid19.japan_forecast` partition by prediction_date as

# standardSQL
SELECT
  prefecture_code,
  prefecture_name_kanji,
  forecast_date,  -- 予測実施日
  prediction_date,  -- 予測対象日
  new_confirmed,
  new_deaths,
FROM
  `bigquery-public-data.covid19_public_forecasts.japan_prefecture_28d`

二回目以降の実行時は以下のようなクエリジョブが発行され INSERT がおこなわれます。

insert into `dataform-advent-calendar.covid19.japan_forecast`
(prefecture_code,prefecture_name_kanji,forecast_date,prediction_date,new_confirmed,new_deaths)
select prefecture_code,prefecture_name_kanji,forecast_date,prediction_date,new_confirmed,new_deaths
from (

# standardSQL
SELECT
  prefecture_code,
  prefecture_name_kanji,
  forecast_date,  -- 予測実施日
  prediction_date,  -- 予測対象日
  new_confirmed,
  new_deaths,
FROM
  `bigquery-public-data.covid19_public_forecasts.japan_prefecture_28d`
WHERE forecast_date > (SELECT MAX(forecast_date) FROM `dataform-advent-calendar.covid19.japan_forecast`)) as insertions

INSERT の場合、SQL ブロックの以下の部分が評価されます。

${ when(incremental(), `WHERE forecast_date > (SELECT MAX(forecast_date) FROM ${self()})`) }

これは、incremental(INSERT)のときは、WHERE forecast_date > (SELECT MAX(forecast_date) FROM ${self()})をクエリに追加するというものです。${self()}には定義したパーティションテーブル自身のテーブル指定が入ります。

通常のテーブルを定義する SQLX を作成する

実績値のビューと予測値のパーティションテーブルを定義したのでこれらを結合して予測値と実績値を比較するテーブルを作成します。

definitions/japan_forecast_comparison.sqlx
config {
  type: "table",
  schema: "covid19",
  name: "japan_forecast_comparison",
  description: "予測値評価(日別)",
  columns: {
    date: "日",
    prefecture_name_kanji: "都道府県名",
    forecast_new_confirmed: "新規感染者数(予測値)",
    new_confirmed: "新規感染者数",
    forecast_new_deaths: "新規死者数(予測値)",
    new_deaths: "新規死者数",
    latest_forecast_date: "最新の予測実施日"
  },
  tags: ["daily"]
}

# standardSQL
WITH t AS (
  SELECT
    f.prefecture_name_kanji,
    f.prediction_date AS date,
    f.forecast_date,
    MAX (f.forecast_date) OVER (
      PARTITION BY f.prefecture_code,
      f.prediction_date
    ) AS latest_forecast_date,
    j.new_confirmed,
    f.new_confirmed AS forecast_new_confirmed,
    j.new_deaths,
    f.new_deaths AS forecast_new_deaths
  FROM
    ${ref("japan_forecast")} f
    LEFT JOIN ${ref("japan_actual")} j ON f.prediction_date = j.date
    AND f.prefecture_code = j.prefecture_code
  WHERE
    f.prediction_date >= "2020-12-21"
)
SELECT
  date,
  prefecture_name_kanji,
  new_confirmed,
  forecast_new_confirmed,
  new_deaths,
  forecast_new_deaths,
  latest_forecast_date
FROM
  t
WHERE
  forecast_date = latest_forecast_date

各テーブルの依存関係を確認する

ハンバーガーメニューから Dependency Tree を開くと、定義したテーブルやビューの依存関係を確認できます。

各ソーステーブルから、実績値のビュー・予測値のパーティションテーブルが作られ、予測比較のテーブルへとつながっていることが分かります。

スケジュールを組んで定時実行させる

毎朝 9 時(JST)に予測値のパーティションテーブルと予測比較のテーブルを更新したいのでスケジュールを組みます。
ファイル一覧からenvironments.jsonを開くと以下のような画面が表示されます(View as plain text のトグルを有効にすると json を直に編集することも出来る)。

CREATE NEW SCHEDULEから対象のタグや実行時に依存関係を含めて実行するかなどを指定します。

  • スケジュールは cron 形式で表記する
  • Tags to runで定時実行対象のタグ名を入力する
    • 事前に japan_forecast.sqlx と japan_forecast_comparison.sqlx の config でtags: ["daily"]を指定済み
  • 実行時にテーブルをフルリフレッシュ(作り直し)するか選択する
  • 依存関係を含んで実行するか選択する
  • (事前にメールや Slack の通知設定を作成している場合)どのチャネルに通知を飛ばすか選択する
    • 「成功のみ」「失敗のみ」「すべて」の 3 パターンから選ぶことが出来ます。

おわりに

BigQuery を使っている方のなかにはデータパイプラインをスケジュールクエリで頑張っているようなケースも少なくないかと思います。スケジュールクエリは依存関係などを考慮することが出来ません(実行タイミングをズラして頑張るしかない)。ワークフローエンジンを導入していないBigQueryユーザーは導入して損は無いツールかと思います(SQLXはSQLが分かる人であれば学習コストもほとんどかからないはず)。
また、Dataform のパイプラインは API で呼び出すことも可能なので、Cloud Composer などのワークフローエンジンを導入している場合でも一連の流れに組み込むことが出来ます。

本稿では触れられませんでしたが、データ品質管理(アサーション)やクエリの単体テストなど充実した機能がまだまだあります。冬休み、Dataform を試して業務に活かしてみてはいかがでしょうか。

Discussion