DataformでBigQueryのデータ変換を試してみた

15 min読了の目安(約13800字TECH技術記事

この記事は ZOZOテクノロジーズ #2 Advent Calendar 2020 20日目の記事です。

Dataformとは

先日Google CloudのブログDataformがGoogle Cloudに買収されたとの記事が公開されました。


Dataformをかんたんに説明すると、ELT(Extract/Load/Transform)のうちのTransformをSQLXで記述し、テーブル定義やデータ変換のドキュメント化・依存関係の管理によるDAGの自動生成・テストなどができるツールです。
現在以下のDWHをサポートしています。

  • BigQuery
  • Snowflake
  • Redshift
  • Postgres
  • Azure SQL data warehouse
  • Presto (under development)

GUI・CLI・REST API(beta)での機能が提供されており、基本的には無料(ただしDWHのプロジェクト側の利用料で課金)で利用できます。
Google Cloudに買収された新たなデータパイプラインツールとして気になったのでドキュメントを読んで試してみました。

今回はプロジェクトの作成時に「IMPORT PROJECT」から「Stackoverflow data on BigQuery」を選択して試してみました。

セットアップ


詳細は省略しますが、BigQueryとの連携のためにのBigQuery Admin権限を持つサービスアカウントの秘密鍵が必要になります。また、DataformのDefault dataset locationはBigQueryのリージョンと同じである必要があります。

始め方のベストプラクティス


最初のテーブルを作る前に一度ドキュメントのベストプラクティスに目を通すことをオススメします。
ディレクトリ構成や依存関係の管理についてかんたんにキャッチアップできます。

テーブル作成の基本的なフロー

プロジェクトの作成が完了するとこのような画面になります。

editor

基本的な流れは以下の通りです。

  1. 左側のメニューの「NEW DATASET」または「New file」からSQLXファイルを新規作成
  2. SQLXを書く
  3. SQLXがリアルタイムにSQLにcompileされる
  4. 「CREATE TABLE」または「START NEW RUN」で実行
  5. BigQueryにデータセット・テーブルが作成される

「NEW DATSET」または「New file」をクリックするとSQLXの新規作成モーダルが開きます。

define new dataset

ファイル名を「definitions/staging/sample.sqlx」に変更にし、「TABLE」テンプレートを選択して「CREATE TABLE」を作成するとSQLXファイルの雛形が作成されます。

sample.sqlx

ここでSQLXについてかんたんに説明すると、SQLXはOpen SourceのSQL拡張です。
.sqlx ファイルで標準SQLを拡張した記法でSQLを記述し、それをSQLにcompileすることができます。


Dataformのエディタでは下の画像のように、「Compiled query」と書かれたエリアがあります。
下の画像では${ref("badges")}と書かれた部分が「Compiled query」でbigquery-public-data.stackoverflow.badgesに変換されています。
このようにエディタでSQLXを記述し、編集中にリアルタイムにcompileされた結果を確認することができます。

editor2

SQLXを書き終え、validationに問題がなければ「CREATE VIEW(またはCREATE TABLE)」からcompileされたSQLを実行し、結果をBigQueryに反映することができます。

new run

run result

ここまでがテーブル作成の基本的なフローです。

テーブルのpreview

基本のフローを把握した上で、おそらくDataformで一番初めに知っておくべき注意点は「CREATE TABLE/UPDATE TABLE」前のpreview機能です。
SQLXを定義したあとに「PREVIEW RESULTS」から事前に結果を確認できるのですが、実は裏側ではBigQueryのSQLを実行しておりGCPプロジェクト側で課金されます。

The Preview results button executes the query in your warehouse and returns the output at the bottom of the page. This can be useful during query development to check that the query returns expected output.

これはGCPのOperation LoggingでBigQueryリソースのログから確認できます。
Dataformは無料で使えるとの触れ込みなのでこのpreview機能ももしや無料...!? と期待に胸を膨らませしたがそんなことはありませんでした。
しかも、Dataformのエディタではスキャンのデータ量および課金額を確認することができません。
BigQueryのpreviewの感覚で使ってしまうと、うっかりDataformで150万円を溶かしてしまいかねないので気をつけましょう。

SQLX

Dataformのコア機能であるSQLXについてもう少し詳しく説明します。
SQLXは標準SQLの拡張で、ドキュメントでは以下のように表現されています。

SQLX = transformation logic + data quality testing + documentation

SQLXでは上記の3つの要素を、主に「Config」「JavaScript」「Build-in Functions」の3つの拡張によって実現しています。

Config

Dataformにおけるデータセットのconfigを記述します。
たとえば、データセット新規作成時に「TABLE」を指定するとtype: table、「VIEW」を指定するとtype: viewのSQLXの雛形が作成され、実行するとBigQueryでそれぞれテーブルとViewテーブルが作成されます。

config {
  type: "view",
  schema: "staging",
  description: "Cleaned version of stackoverflow.badges"
}

特にデータソースとなるテーブルは、SQLを書かずにこのconfigのみで宣言的に記述することが推奨されています。

We recommend defining raw data as declarations to build your projects without any direct relation reference to tables in your warehouse.

config {
  type: "declaration",
  database: "bigquery-public-data",
  schema: "stackoverflow",
  name: "users",
  description: "raw users table"
}

また、このconfig内で後述するテストを記述します。

JavaScript

js {} のブロック、もしくは ${} でin-lineのJavaScriptを記述することができます。

js in sqlx

また、definitionsディレクトリで .js ファイルを記述したり、includes ディレクトリに .js ファイルを書いてSQLX内で再利用することもできます。


Built-in functions

Built in functionsはJavaScriptと同様にブロック内かin-lineで記述できます。以下はその一部です。

  • ref()
    テーブル名を引数に渡すことでDataformにおけるデータセットの依存関係を解決します。Dataformで最も多く使うことになるでしょう。
  • resolve()
    ref()に似ていますが、こちらは依存関係を含みません。
  • self()
    database.schema.name で構成される、現在のデータセットのFully Qualiefied Nameにcompileされます。
  • name()
    database.schema を除いた現在のデータセット名にcompileされます。

pre_operations, post_operations

その他に pre_operations/post_operations といったライフサイクルフックがあり、SQLの実行前後で処理を行うことができます。

依存関係の管理

Dataformでは非常に簡単に依存関係の管理ができます。
その方法とはテーブル名をref()で指定するだけです。
下の例はBigQueryのサンプルプロジェクトのSQLXの一部の抜粋です。

  • definitions/sources/users.sqlx
config {
  type: "declaration",
  database: "bigquery-public-data",
  schema: "stackoverflow",
  name: "users",
  description: "raw users table"
}
  • definitions/staging/stg_users.sqlx
config {
  type: "view",
  schema: "staging",
  // (省略)
}

select
  // (省略)
from
  ${ref("users")}
  • definitions/reporting/user_stats.sqlx
config {
  type: "table",
  schema: "reporting",
  // (省略)
  }
}

select
  // (省略)
from
  ${ref("stg_users")} as stg_users
  // (省略)
group by
  1,2,3,4

usersテーブルはtype: definitionでデータソースとして定義され、2つ目のstg_usersテーブルではViewテーブルとしてこのusersテーブルを指定しています。このとき、FROM ${ref("users")}ref()を使用しています。また、3つ目のuser_statsテーブルでも同じようにFROM ${ref("stg_users")}で指定しています。
このように記述すると「users <- stg_users <- user_stats」という依存関係が定義され、メニューの「Dependency Tree」からこのようなDAGを確認できます。

dependency

これによって「START NEW RUN」でプロジェクト全体を実行したとき、依存関係の順番通りに処理が実行されていきます。

run result

処理が終わるとBigQueryにデータセットとテーブルが作成されます。

run result2

このように簡単に依存関係の管理ができます。これがDataformの大きなメリットのひとつです。

バージョン管理 & CI/CD


GUIではデフォルトでDataform内のGitによるバージョン管理ができます。
プロジェクトを作成すると「master(mainではありません(!))」と「(ユーザー名)_dev」ブランチが作成されます。
この状態では基本的には「(ユーザー名)_dev」ブランチでコミットを積み上げていき、masterブランチにpushするフローになるでしょう。
このままでもバージョン管理が可能ですが、GitHubのリポジトリとの連携をおすすめします。(他にもGitLabなどとの連携も可能なようです)
GitHubと連携するとDataformからブランチをpushし、GitHub上でPull Requestを作成することができるようになります。
open pr

また、DataformのDocker Imageも提供されています。
実践的なCI/CDでは、GitHub Actionsのworkflowで「dataform testでテストを実行 -> レビューをしたあとにマージ -> dataform runでデプロイ」というフローがベストプラクティスになるかと思います。

テスト

テストは通常のSQLXの config: { assertions: [] } 内に記述するか、 config { type: "assertion" } でテスト用のSQLXを記述できます。

テストでは以下のようなテストケースをサポートしています。

  • uniqueKey
  • nonNull
  • rowConditions
    booleanを返すJavaScriptの式を書けます。
  • Manual assertions
    Dataform以外で作られたデータセットのために、config { type: "assertion" } と書くことでテスト用のSQLXを定義できます。
  • dependencies

テストが実行されるとBigQueryにdataform_assertionsというデータセットが追加され、テストの失敗の原因となったデータを確認できるViewテーブルが作成されます。

  • definitions/reporting/user_stats.sqlx
config {
  type: "table",
  schema: "reporting",
  // 省略
  assertions: {
    uniqueKey: ["user_id"],
    rowConditions: ["badge_count >= 0"]
  }
}  

unique key test
row conditions test

ここまででテーブル作成のフロー・SQLX・依存関係の管理・バージョン管理・テストなどの基本的な機能について説明してきました。
この時点でもDataformの有用性は十分に期待できるのではないでしょうか。
以降は上記以外の機能について紹介します。

定期実行


cronでdailyやhourlyの定期実行を計画することができます。
environment.json の「CREATE NEW SCHEDULE」から作成できます。
new schedule

定期実行の結果はSlackかEmailでのみ通知することができます。
逆に定期実行以外の通常のRUNでは通知することはできないようです。
channel

タグ付け


configでタグ付けによるアクションの管理ができます。
タグ付けをしておくと、RUN時に特定のタグをつけたSQLXや.jsファイルのみを実行することができます。
定期実行の際に hourlydaily などといったタグをつけたり、テスト用に test タグをつけてテストのみ実行することも可能です。
config {
  type: "view",
  name: "user_counts",
  tags: ["daily", "view"]
}

外部リソース

BigQuery経由でGoogle Sheetsにクエリすることもできます。

Federated Queryもできるのか試したかったのですが、Cloud SQL connectionのエラーを解決できずハマってしまったので後日リトライします...。
どなたか実験して確認できたら情報提供していただけると助かります...。 :pray:

Partitioned Table & Clustered Table


configの partitionByclusterBy によって、Partitioned Table・Clustered Tableにも対応しています。
config {
  type: "table",
  bigquery: {
    partitionBy: "DATE(ts)",
    clusterBy: ["name", "revenue"]
  }
}
SELECT CURRENT_TIMESTAMP() as ts, name, revenue

Incremental Table


Incremental Tableもサポートされています。
下のサンプルではuser_actionsテーブルのtimestampが現在のデータセットのdateよりも大きいデータのみinsertします。
config { type: "incremental" }

select date(timestamp) as date, action
from weblogs.user_actions

${ when(incremental(), `where timestamp > (select max(date) from ${self()})`)

注意すべき点は、更新のたびにフルスキャンが走るとコストが膨らんでしまうため、SELECT文
のWHERE句ではなく、pre_operationsで条件を指定してフルスキャンを回避する必要があります。

REST API(beta)

現在beta版で、REST APIではRUNとRUNのステータスを取得する2つのみ提供されています。

  • api.dataform.co/v1/project/{projectId}/run
  • api.dataform.co/v1/project/{projectId}/run/{runId}

APIの実行にはSettings画面からAPI tokenの取得が必要です。
ドキュメントではAirflowやLuigiなどのワークフローツールからRUNを実行するサンプルが紹介されています。

時間単位のrequest数上限などの説明は見当たりませんでした。
RUNの失敗時にリトライするようにワークフローを構築してrequestを送信しすぎないように注意したいですね。

CLI

npm install またはDocker Containerから使えます。
特にCI/CDでの利用シーンが多くなるでしょう。

$ dataform help

Commands:
  dataform help [command]                                 Show help. If [command] is specified, the help is for the given command.
  dataform init <warehouse> [project-dir]                 Create a new dataform project.
  dataform install [project-dir]                          Install a project's NPM dependencies.
  dataform init-creds <warehouse> [project-dir]           Create a .df-credentials.json file for Dataform to use when accessing your warehouse.
  dataform compile [project-dir]                          Compile the dataform project. Produces JSON output describing the non-executable graph.
  dataform test [project-dir]                             Run the dataform project's unit tests on the configured data warehouse.
  dataform run [project-dir]                              Run the dataform project's scripts on the configured data warehouse.
  dataform format [project-dir]                           Format the dataform project's files.
  dataform listtables <warehouse>                         List tables on the configured data warehouse.
  dataform gettablemetadata <warehouse> <schema> <table>  Fetch metadata for a specified table.

Options:
  --help     Show help  [boolean]
  --version  Show version number  [boolean]

イベントドリブン

たとえばBigQuery上のデータレイクとして扱うテーブルが更新されたタイミングでテーブルを更新するようなユースケースがあるかと思いますが、確認した限りではまだそのような機能はなさそうでした。
現状で対応するとすれば.jsファイルでアクションを定義するか、 REST APIのドキュメントで紹介されているようにAirflowなどのワークフローに組み込む必要があるかと思います。

それ以外の方法で実現するとすれば、Pub/SubをtriggerにCloud Run上でCLIの dataform run か、またはREST APIを非同期実行するのが有力かと思います。
Pub/Sub経由でRUNが実行できれば、BigQuery Data Transfer Serviceの結果をTopicにpushしたり、MonitoringでDataformの結果を監視してpushするというような活用方法も可能になるでしょう。
(あとで時間があったら試したいのですが、他に名案があれば募集中です)

まとめ

つい先日買収されて無料化されたばかりで日本語の情報も少ないですが、データ基盤に関心のある方々から熱い視線を集めている気配を(主にTwitterで)感じました。また今回触ってみて、何かと煩雑になりがちなデータパイプラインの構築・運用の一助になってくれるポテンシャルを感じました。
productionでも導入できる機会を積極的に探っていきたいですね。

参考