DataformでBigQueryのデータ変換を試してみた

15 min read読了の目安(約14100字

この記事は ZOZO テクノロジーズ #2 Advent Calendar 2020 20 日目の記事です。

Dataform とは

先日Google Cloud のブログDataformが Google Cloud に買収されたとの記事が公開されました。

https://cloud.google.com/blog/ja/products/data-analytics/welcoming-dataform-to-bigquery?authuser=2
https://dataform.co/

Dataform をかんたんに説明すると、ELT(Extract/Load/Transform)のうちの Transform を SQLX で記述し、テーブル定義やデータ変換のドキュメント化・依存関係の管理による DAG の自動生成・テストなどができるツールです。
現在以下の DWH をサポートしています。

  • BigQuery
  • Snowflake
  • Redshift
  • Postgres
  • Azure SQL data warehouse
  • Presto (under development)

GUI・CLI・REST API(beta)での機能が提供されており、基本的には無料(ただし DWH のプロジェクト側の利用料で課金)で利用できます。
Google Cloud に買収された新たなデータパイプラインツールとして気になったのでドキュメントを読んで試してみました。

今回はプロジェクトの作成時に「IMPORT PROJECT」から「Stackoverflow data on BigQuery」を選択して試してみました。

セットアップ

https://docs.dataform.co/getting-started-tutorial/set-up
詳細は省略しますが、BigQuery との連携のためにの BigQuery Admin 権限を持つサービスアカウントの秘密鍵が必要になります。また、Dataform の Default dataset location は BigQuery のリージョンと同じである必要があります。

始め方のベストプラクティス

https://docs.dataform.co/best-practices/start-your-dataform-project
最初のテーブルを作る前に一度ドキュメントのベストプラクティスに目を通すことをオススメします。
ディレクトリ構成や依存関係の管理についてかんたんにキャッチアップできます。

テーブル作成の基本的なフロー

https://docs.dataform.co/introduction/how-dataform-works#how-dataform-works

プロジェクトの作成が完了するとこのような画面になります。

editor

基本的な流れは以下の通りです。

  1. 左側のメニューの「NEW DATASET」または「New file」から SQLX ファイルを新規作成
  2. SQLX を書く
  3. SQLX がリアルタイムに SQL に compile される
  4. 「CREATE TABLE」または「START NEW RUN」で実行
  5. BigQuery にデータセット・テーブルが作成される

「NEW DATSET」または「New file」をクリックすると SQLX の新規作成モーダルが開きます。

define new dataset

ファイル名を「definitions/staging/sample.sqlx」に変更にし、「TABLE」テンプレートを選択して「CREATE TABLE」を作成すると SQLX ファイルの雛形が作成されます。

sample.sqlx

ここで SQLX についてかんたんに説明すると、SQLX は Open Source の SQL 拡張です。
.sqlx ファイルで標準 SQL を拡張した記法で SQL を記述し、それを SQL に compile することができます。

https://docs.dataform.co/introduction/dataform-in-5-minutes#how-do-you-develop-in-sqlx
https://github.com/dataform-co/dataform/tree/master/sqlx

Dataform のエディタでは下の画像のように、「Compiled query」と書かれたエリアがあります。
下の画像では${ref("badges")}と書かれた部分が「Compiled query」でbigquery-public-data.stackoverflow.badgesに変換されています。
このようにエディタで SQLX を記述し、編集中にリアルタイムに compile された結果を確認することができます。

editor2

SQLX を書き終え、validation に問題がなければ「CREATE VIEW(または CREATE TABLE)」から compile された SQL を実行し、結果を BigQuery に反映することができます。

new run

run result

ここまでがテーブル作成の基本的なフローです。

テーブルの preview

基本のフローを把握した上で、おそらく Dataform で一番初めに知っておくべき注意点は「CREATE TABLE/UPDATE TABLE」前の preview 機能です。
SQLX を定義したあとに「PREVIEW RESULTS」から事前に結果を確認できるのですが、実は裏側では BigQuery の SQL を実行しており GCP プロジェクト側で課金されます。

https://docs.dataform.co/dataform-web/tutorials/102#write-a-query

The Preview results button executes the query in your warehouse and returns the output at the bottom of the page. This can be useful during query development to check that the query returns expected output.

これは GCP の Operation Logging で BigQuery リソースのログから確認できます。
Dataform は無料で使えるとの触れ込みなのでこの preview 機能ももしや無料...!? と期待に胸を膨らませしたがそんなことはありませんでした。
しかも、Dataform のエディタではスキャンのデータ量および課金額を確認することができません。
BigQuery の preview の感覚で使ってしまうと、うっかり Dataform で 150 万円を溶かしてしまいかねないので気をつけましょう。

SQLX

https://docs.dataform.co/guides/sqlx

Dataform のコア機能である SQLX についてもう少し詳しく説明します。
SQLX は標準 SQL の拡張で、ドキュメントでは以下のように表現されています。

SQLX = transformation logic + data quality testing + documentation

SQLX では上記の 3 つの要素を、主に「Config」「JavaScript」「Build-in Functions」の 3 つの拡張によって実現しています。

Config

Dataform におけるデータセットの config を記述します。
たとえば、データセット新規作成時に「TABLE」を指定するとtype: table、「VIEW」を指定するとtype: viewの SQLX の雛形が作成され、実行すると BigQuery でそれぞれテーブルと View テーブルが作成されます。

config {
  type: "view",
  schema: "staging",
  description: "Cleaned version of stackoverflow.badges"
}

特にデータソースとなるテーブルは、SQL を書かずにこの config のみで宣言的に記述することが推奨されています。

https://docs.dataform.co/best-practices/start-your-dataform-project#define-your-source-data-with-declarations

We recommend defining raw data as declarations to build your projects without any direct relation reference to tables in your warehouse.

config {
  type: "declaration",
  database: "bigquery-public-data",
  schema: "stackoverflow",
  name: "users",
  description: "raw users table"
}

また、この config 内で後述するテストを記述します。

JavaScript

js {} のブロック、もしくは ${} で in-line の JavaScript を記述することができます。

js in sqlx

また、definitionsディレクトリで .js ファイルを記述したり、includes ディレクトリに .js ファイルを書いて SQLX 内で再利用することもできます。

https://docs.dataform.co/guides/javascript
https://docs.dataform.co/guides/javascript/includes#example-using-an-include

Built-in functions

Built in functions は JavaScript と同様にブロック内か in-line で記述できます。以下はその一部です。

  • ref()
    テーブル名を引数に渡すことで Dataform におけるデータセットの依存関係を解決します。Dataform で最も多く使うことになるでしょう。
  • resolve()
    ref()に似ていますが、こちらは依存関係を含みません。
  • self()
    database.schema.name で構成される、現在のデータセットの Fully Qualiefied Name に compile されます。
  • name()
    database.schema を除いた現在のデータセット名に compile されます。

pre_operations, post_operations

その他に pre_operations/post_operations といったライフサイクルフックがあり、SQL の実行前後で処理を行うことができます。

依存関係の管理

Dataform では非常に簡単に依存関係の管理ができます。
その方法とはテーブル名をref()で指定するだけです。
下の例は BigQuery のサンプルプロジェクトの SQLX の一部の抜粋です。

  • definitions/sources/users.sqlx
config {
  type: "declaration",
  database: "bigquery-public-data",
  schema: "stackoverflow",
  name: "users",
  description: "raw users table"
}
  • definitions/staging/stg_users.sqlx
config {
  type: "view",
  schema: "staging",
  // (省略)
}

select
  // (省略)
from
  ${ref("users")}
  • definitions/reporting/user_stats.sqlx
config {
  type: "table",
  schema: "reporting",
  // (省略)
  }
}

select
  // (省略)
from
  ${ref("stg_users")} as stg_users
  // (省略)
group by
  1,2,3,4

users テーブルはtype: definitionでデータソースとして定義され、2 つ目の stg_users テーブルでは View テーブルとしてこの users テーブルを指定しています。このとき、FROM ${ref("users")}ref()を使用しています。また、3 つ目の user_stats テーブルでも同じようにFROM ${ref("stg_users")}で指定しています。
このように記述すると「users <- stg_users <- user_stats」という依存関係が定義され、メニューの「Dependency Tree」からこのような DAG を確認できます。

dependency

これによって「START NEW RUN」でプロジェクト全体を実行したとき、依存関係の順番通りに処理が実行されていきます。

run result

処理が終わると BigQuery にデータセットとテーブルが作成されます。

run result2

このように簡単に依存関係の管理ができます。これが Dataform の大きなメリットのひとつです。

バージョン管理 & CI/CD

https://docs.dataform.co/dataform-web/version-control
GUI ではデフォルトで Dataform 内の Git によるバージョン管理ができます。
プロジェクトを作成すると「master(main ではありません(!))」と「(ユーザー名)_dev」ブランチが作成されます。
この状態では基本的には「(ユーザー名)_dev」ブランチでコミットを積み上げていき、master ブランチに push するフローになるでしょう。
このままでもバージョン管理が可能ですが、GitHub のリポジトリとの連携をおすすめします。(他にも GitLab などとの連携も可能なようです)
GitHub と連携すると Dataform からブランチを push し、GitHub 上で Pull Request を作成することができるようになります。
open pr

また、Dataform のDocker Imageも提供されています。
実践的な CI/CD では、GitHub Actions の workflow で「dataform testでテストを実行 -> レビューをしたあとにマージ -> dataform runでデプロイ」というフローがベストプラクティスになるかと思います。

https://docs.dataform.co/guides/ci-cd#using-github-actions

テスト

https://docs.dataform.co/guides/assertions

テストは通常の SQLX の config: { assertions: [] } 内に記述するか、 config { type: "assertion" } でテスト用の SQLX を記述できます。

テストでは以下のようなテストケースをサポートしています。

  • uniqueKey
  • nonNull
  • rowConditions
    boolean を返す JavaScript の式を書けます。
  • Manual assertions
    Dataform以外で作られたデータセットのために、config { type: "assertion" } と書くことでテスト用の SQLX を定義できます。
  • dependencies

テストが実行されると BigQuery にdataform_assertionsというデータセットが追加され、テストの失敗の原因となったデータを確認できる View テーブルが作成されます。

  • definitions/reporting/user_stats.sqlx
config {
  type: "table",
  schema: "reporting",
  // 省略
  assertions: {
    uniqueKey: ["user_id"],
    rowConditions: ["badge_count >= 0"]
  }
}

unique key test
row conditions test

ここまででテーブル作成のフロー・SQLX・依存関係の管理・バージョン管理・テストなどの基本的な機能について説明してきました。
この時点でも Dataform の有用性は十分に期待できるのではないでしょうか。
以降は上記以外の機能について紹介します。

定期実行

https://docs.dataform.co/dataform-web/scheduling
cron で daily や hourly の定期実行を計画することができます。
environment.json の「CREATE NEW SCHEDULE」から作成できます。
new schedule

定期実行の結果はSlackか Email でのみ通知することができます。
逆に定期実行以外の通常の RUN では通知することはできないようです。
channel

タグ付け

https://docs.dataform.co/guides/tags
config でタグ付けによるアクションの管理ができます。
タグ付けをしておくと、RUN 時に特定のタグをつけた SQLX や.jsファイルのみを実行することができます。
定期実行の際に hourlydaily などといったタグをつけたり、テスト用に test タグをつけてテストのみ実行することも可能です。
config {
  type: "view",
  name: "user_counts",
  tags: ["daily", "view"]
}

外部リソース

BigQuery 経由で Google Sheets にクエリすることもできます。

https://docs.dataform.co/warehouses/bigquery#configuring-access-to-google-sheets

Federated Query もできるのか試したかったのですが、Cloud SQL connection のエラーを解決できずハマってしまったので後日リトライします...。
どなたか実験して確認できたら情報提供していただけると助かります...。 :pray:

Partitioned Table & Clustered Table

https://docs.dataform.co/warehouses/bigquery#setting-table-partitions
config の partitionByclusterBy によって、Partitioned Table・Clustered Table にも対応しています。
config {
  type: "table",
  bigquery: {
    partitionBy: "DATE(ts)",
    clusterBy: ["name", "revenue"]
  }
}
SELECT CURRENT_TIMESTAMP() as ts, name, revenue

Incremental Table

https://docs.dataform.co/guides/datasets/incremental
Incremental Table もサポートされています。
下のサンプルでは user_actions テーブルのtimestampが現在のデータセットのdateよりも大きいデータのみ insert します。
config { type: "incremental" }

select date(timestamp) as date, action
from weblogs.user_actions

${ when(incremental(), `where timestamp > (select max(date) from ${self()})`)

注意すべき点は、更新のたびにフルスキャンが走るとコストが膨らんでしまうため、SELECT 文
の WHERE 句ではなく、pre_operationsで条件を指定してフルスキャンを回避する必要があります。

REST API(beta)

https://docs.dataform.co/dataform-web/api

現在 beta 版で、REST API では RUN と RUN のステータスを取得する 2 つのみ提供されています。

  • api.dataform.co/v1/project/{projectId}/run
  • api.dataform.co/v1/project/{projectId}/run/{runId}

API の実行には Settings 画面から API token の取得が必要です。
ドキュメントでは Airflow や Luigi などのワークフローツールから RUN を実行するサンプルが紹介されています。

時間単位の request 数上限などの説明は見当たりませんでした。
RUN の失敗時にリトライするようにワークフローを構築して request を送信しすぎないように注意したいですね。

CLI

npm install または Docker Container から使えます。
特に CI/CD での利用シーンが多くなるでしょう。

$ dataform help

Commands:
  dataform help [command]                                 Show help. If [command] is specified, the help is for the given command.
  dataform init <warehouse> [project-dir]                 Create a new dataform project.
  dataform install [project-dir]                          Install a project's NPM dependencies.
  dataform init-creds <warehouse> [project-dir]           Create a .df-credentials.json file for Dataform to use when accessing your warehouse.
  dataform compile [project-dir]                          Compile the dataform project. Produces JSON output describing the non-executable graph.
  dataform test [project-dir]                             Run the dataform project's unit tests on the configured data warehouse.
  dataform run [project-dir]                              Run the dataform project's scripts on the configured data warehouse.
  dataform format [project-dir]                           Format the dataform project's files.
  dataform listtables <warehouse>                         List tables on the configured data warehouse.
  dataform gettablemetadata <warehouse> <schema> <table>  Fetch metadata for a specified table.

Options:
  --help     Show help  [boolean]
  --version  Show version number  [boolean]

イベントドリブン

たとえば BigQuery 上のデータレイクとして扱うテーブルが更新されたタイミングでテーブルを更新するようなユースケースがあるかと思いますが、確認した限りではまだそのような機能はなさそうでした。
現状で対応するとすれば.jsファイルでアクションを定義するか、 REST API のドキュメントで紹介されているように Airflow などのワークフローに組み込む必要があるかと思います。

それ以外の方法で実現するとすれば、Pub/Sub を trigger に Cloud Run 上で CLI の dataform run か、または REST API を非同期実行するのが有力かと思います。
Pub/Sub 経由で RUN が実行できれば、BigQuery Data Transfer Service の結果を Topic に pushしたり、Monitoring で Dataform の結果を監視して pushするというような活用方法も可能になるでしょう。
(あとで時間があったら試したいのですが、他に名案があれば募集中です)

まとめ

つい先日買収されて無料化されたばかりで日本語の情報も少ないですが、データ基盤に関心のある方々から熱い視線を集めている気配を(主に Twitter で)感じました。また今回触ってみて、何かと煩雑になりがちなデータパイプラインの構築・運用の一助になってくれるポテンシャルを感じました。
production でも導入できる機会を積極的に探っていきたいですね。

参考

https://github.com/dataform-co/dataform
https://join.slack.com/t/dataform-users/shared_invite/zt-dark6b7k-r5~12LjYL1a17Vgma2ru2A
https://www.youtube.com/channel/UCyF9g-4tNWfeGH2_UIzb1aA