DataformでBigQueryのデータ変換を試してみた
Dataform とは
先日Google Cloud のブログでDataformが Google Cloud に買収されたとの記事が公開されました。
Dataform をかんたんに説明すると、ELT(Extract/Load/Transform)のうちの Transform を SQLX で記述し、テーブル定義やデータ変換のドキュメント化・依存関係の管理による DAG の自動生成・テストなどができるツールです。
現在以下の DWH をサポートしています。
- BigQuery
- Snowflake
- Redshift
- Postgres
- Azure SQL data warehouse
- Presto (under development)
GUI・CLI・REST API(beta)での機能が提供されており、基本的には無料(ただし DWH のプロジェクト側の利用料で課金)で利用できます。
Google Cloud に買収された新たなデータパイプラインツールとして気になったのでドキュメントを読んで試してみました。
今回はプロジェクトの作成時に「IMPORT PROJECT」から「Stackoverflow data on BigQuery」を選択して試してみました。
セットアップ
詳細は省略しますが、BigQuery との連携のためにの BigQuery Admin 権限を持つサービスアカウントの秘密鍵が必要になります。また、Dataform の Default dataset location は BigQuery のリージョンと同じである必要があります。
始め方のベストプラクティス
ディレクトリ構成や依存関係の管理についてかんたんにキャッチアップできます。
テーブル作成の基本的なフロー
プロジェクトの作成が完了するとこのような画面になります。
基本的な流れは以下の通りです。
- 左側のメニューの「NEW DATASET」または「New file」から SQLX ファイルを新規作成
- SQLX を書く
- SQLX がリアルタイムに SQL に compile される
- 「CREATE TABLE」または「START NEW RUN」で実行
- BigQuery にデータセット・テーブルが作成される
「NEW DATSET」または「New file」をクリックすると SQLX の新規作成モーダルが開きます。
ファイル名を「definitions/staging/sample.sqlx」に変更にし、「TABLE」テンプレートを選択して「CREATE TABLE」を作成すると SQLX ファイルの雛形が作成されます。
ここで SQLX についてかんたんに説明すると、SQLX は Open Source の SQL 拡張です。
.sqlx
ファイルで標準 SQL を拡張した記法で SQL を記述し、それを SQL に compile することができます。
Dataform のエディタでは下の画像のように、「Compiled query」と書かれたエリアがあります。
下の画像では${ref("badges")}
と書かれた部分が「Compiled query」でbigquery-public-data.stackoverflow.badges
に変換されています。
このようにエディタで SQLX を記述し、編集中にリアルタイムに compile された結果を確認することができます。
SQLX を書き終え、validation に問題がなければ「CREATE VIEW(または CREATE TABLE)」から compile された SQL を実行し、結果を BigQuery に反映することができます。
ここまでがテーブル作成の基本的なフローです。
テーブルの preview
基本のフローを把握した上で、おそらく Dataform で一番初めに知っておくべき注意点は「CREATE TABLE/UPDATE TABLE」前の preview 機能です。
SQLX を定義したあとに「PREVIEW RESULTS」から事前に結果を確認できるのですが、実は裏側では BigQuery の SQL を実行しており GCP プロジェクト側で課金されます。
The Preview results button executes the query in your warehouse and returns the output at the bottom of the page. This can be useful during query development to check that the query returns expected output.
これは GCP の Operation Logging で BigQuery リソースのログから確認できます。
Dataform は無料で使えるとの触れ込みなのでこの preview 機能ももしや無料...!? と期待に胸を膨らませしたがそんなことはありませんでした。
しかも、Dataform のエディタではスキャンのデータ量および課金額を確認することができません。
BigQuery の preview の感覚で使ってしまうと、うっかり Dataform で 150 万円を溶かしてしまいかねないので気をつけましょう。
SQLX
Dataform のコア機能である SQLX についてもう少し詳しく説明します。
SQLX は標準 SQL の拡張で、ドキュメントでは以下のように表現されています。
SQLX = transformation logic + data quality testing + documentation
SQLX では上記の 3 つの要素を、主に「Config」「JavaScript」「Build-in Functions」の 3 つの拡張によって実現しています。
Config
Dataform におけるデータセットの config を記述します。
たとえば、データセット新規作成時に「TABLE」を指定するとtype: table
、「VIEW」を指定するとtype: view
の SQLX の雛形が作成され、実行すると BigQuery でそれぞれテーブルと View テーブルが作成されます。
config {
type: "view",
schema: "staging",
description: "Cleaned version of stackoverflow.badges"
}
特にデータソースとなるテーブルは、SQL を書かずにこの config のみで宣言的に記述することが推奨されています。
We recommend defining raw data as declarations to build your projects without any direct relation reference to tables in your warehouse.
config {
type: "declaration",
database: "bigquery-public-data",
schema: "stackoverflow",
name: "users",
description: "raw users table"
}
また、この config 内で後述するテストを記述します。
JavaScript
js {}
のブロック、もしくは ${}
で in-line の JavaScript を記述することができます。
また、definitions
ディレクトリで .js
ファイルを記述したり、includes
ディレクトリに .js
ファイルを書いて SQLX 内で再利用することもできます。
Built-in functions
Built in functions は JavaScript と同様にブロック内か in-line で記述できます。以下はその一部です。
- ref()
テーブル名を引数に渡すことで Dataform におけるデータセットの依存関係を解決します。Dataform で最も多く使うことになるでしょう。 - resolve()
ref()に似ていますが、こちらは依存関係を含みません。 - self()
database
.schema
.name
で構成される、現在のデータセットの Fully Qualiefied Name に compile されます。 - name()
database
.schema
を除いた現在のデータセット名に compile されます。
pre_operations, post_operations
その他に pre_operations
/post_operations
といったライフサイクルフックがあり、SQL の実行前後で処理を行うことができます。
依存関係の管理
Dataform では非常に簡単に依存関係の管理ができます。
その方法とはテーブル名をref()で指定するだけです。
下の例は BigQuery のサンプルプロジェクトの SQLX の一部の抜粋です。
- definitions/sources/users.sqlx
config {
type: "declaration",
database: "bigquery-public-data",
schema: "stackoverflow",
name: "users",
description: "raw users table"
}
- definitions/staging/stg_users.sqlx
config {
type: "view",
schema: "staging",
// (省略)
}
select
// (省略)
from
${ref("users")}
- definitions/reporting/user_stats.sqlx
config {
type: "table",
schema: "reporting",
// (省略)
}
}
select
// (省略)
from
${ref("stg_users")} as stg_users
// (省略)
group by
1,2,3,4
users テーブルはtype: definition
でデータソースとして定義され、2 つ目の stg_users テーブルでは View テーブルとしてこの users テーブルを指定しています。このとき、FROM ${ref("users")}
でref()
を使用しています。また、3 つ目の user_stats テーブルでも同じようにFROM ${ref("stg_users")}
で指定しています。
このように記述すると「users <- stg_users <- user_stats」という依存関係が定義され、メニューの「Dependency Tree」からこのような DAG を確認できます。
これによって「START NEW RUN」でプロジェクト全体を実行したとき、依存関係の順番通りに処理が実行されていきます。
処理が終わると BigQuery にデータセットとテーブルが作成されます。
このように簡単に依存関係の管理ができます。これが Dataform の大きなメリットのひとつです。
バージョン管理 & CI/CD
プロジェクトを作成すると「master(main ではありません(!))」と「(ユーザー名)_dev」ブランチが作成されます。
この状態では基本的には「(ユーザー名)_dev」ブランチでコミットを積み上げていき、master ブランチに push するフローになるでしょう。
このままでもバージョン管理が可能ですが、GitHub のリポジトリとの連携をおすすめします。(他にも GitLab などとの連携も可能なようです)
GitHub と連携すると Dataform からブランチを push し、GitHub 上で Pull Request を作成することができるようになります。
また、Dataform のDocker Imageも提供されています。
実践的な CI/CD では、GitHub Actions の workflow で「dataform test
でテストを実行 -> レビューをしたあとにマージ -> dataform run
でデプロイ」というフローがベストプラクティスになるかと思います。
テスト
テストは通常の SQLX の config: { assertions: [] }
内に記述するか、 config { type: "assertion" }
でテスト用の SQLX を記述できます。
テストでは以下のようなテストケースをサポートしています。
- uniqueKey
- nonNull
- rowConditions
boolean を返す JavaScript の式を書けます。 - Manual assertions
Dataform以外で作られたデータセットのために、config { type: "assertion" }
と書くことでテスト用の SQLX を定義できます。 - dependencies
テストが実行されると BigQuery にdataform_assertions
というデータセットが追加され、テストの失敗の原因となったデータを確認できる View テーブルが作成されます。
- definitions/reporting/user_stats.sqlx
config {
type: "table",
schema: "reporting",
// 省略
assertions: {
uniqueKey: ["user_id"],
rowConditions: ["badge_count >= 0"]
}
}
ここまででテーブル作成のフロー・SQLX・依存関係の管理・バージョン管理・テストなどの基本的な機能について説明してきました。
この時点でも Dataform の有用性は十分に期待できるのではないでしょうか。
以降は上記以外の機能について紹介します。
定期実行
environment.json
の「CREATE NEW SCHEDULE」から作成できます。
定期実行の結果はSlackか Email でのみ通知することができます。
逆に定期実行以外の通常の RUN では通知することはできないようです。
タグ付け
タグ付けをしておくと、RUN 時に特定のタグをつけた SQLX や.js
ファイルのみを実行することができます。
定期実行の際に hourly
・daily
などといったタグをつけたり、テスト用に test
タグをつけてテストのみ実行することも可能です。
config {
type: "view",
name: "user_counts",
tags: ["daily", "view"]
}
外部リソース
BigQuery 経由で Google Sheets にクエリすることもできます。
Federated Query もできるのか試したかったのですが、Cloud SQL connection のエラーを解決できずハマってしまったので後日リトライします...。
どなたか実験して確認できたら情報提供していただけると助かります...。 :pray:
Partitioned Table & Clustered Table
partitionBy
や clusterBy
によって、Partitioned Table・Clustered Table にも対応しています。
config {
type: "table",
bigquery: {
partitionBy: "DATE(ts)",
clusterBy: ["name", "revenue"]
}
}
SELECT CURRENT_TIMESTAMP() as ts, name, revenue
Incremental Table
下のサンプルでは user_actions テーブルのtimestamp
が現在のデータセットのdate
よりも大きいデータのみ insert します。
config { type: "incremental" }
select date(timestamp) as date, action
from weblogs.user_actions
${ when(incremental(), `where timestamp > (select max(date) from ${self()})`)
注意すべき点は、更新のたびにフルスキャンが走るとコストが膨らんでしまうため、SELECT 文
の WHERE 句ではなく、pre_operations
で条件を指定してフルスキャンを回避する必要があります。
REST API(beta)
現在 beta 版で、REST API では RUN と RUN のステータスを取得する 2 つのみ提供されています。
- api.dataform.co/v1/project/{projectId}/run
- api.dataform.co/v1/project/{projectId}/run/{runId}
API の実行には Settings 画面から API token の取得が必要です。
ドキュメントでは Airflow や Luigi などのワークフローツールから RUN を実行するサンプルが紹介されています。
時間単位の request 数上限などの説明は見当たりませんでした。
RUN の失敗時にリトライするようにワークフローを構築して request を送信しすぎないように注意したいですね。
CLI
npm install
または Docker Container から使えます。
特に CI/CD での利用シーンが多くなるでしょう。
$ dataform help
Commands:
dataform help [command] Show help. If [command] is specified, the help is for the given command.
dataform init <warehouse> [project-dir] Create a new dataform project.
dataform install [project-dir] Install a project's NPM dependencies.
dataform init-creds <warehouse> [project-dir] Create a .df-credentials.json file for Dataform to use when accessing your warehouse.
dataform compile [project-dir] Compile the dataform project. Produces JSON output describing the non-executable graph.
dataform test [project-dir] Run the dataform project's unit tests on the configured data warehouse.
dataform run [project-dir] Run the dataform project's scripts on the configured data warehouse.
dataform format [project-dir] Format the dataform project's files.
dataform listtables <warehouse> List tables on the configured data warehouse.
dataform gettablemetadata <warehouse> <schema> <table> Fetch metadata for a specified table.
Options:
--help Show help [boolean]
--version Show version number [boolean]
イベントドリブン
たとえば BigQuery 上のデータレイクとして扱うテーブルが更新されたタイミングでテーブルを更新するようなユースケースがあるかと思いますが、確認した限りではまだそのような機能はなさそうでした。
現状で対応するとすれば.js
ファイルでアクションを定義するか、 REST API のドキュメントで紹介されているように Airflow などのワークフローに組み込む必要があるかと思います。
それ以外の方法で実現するとすれば、Pub/Sub を trigger に Cloud Run 上で CLI の dataform run
か、または REST API を非同期実行するのが有力かと思います。
Pub/Sub 経由で RUN が実行できれば、BigQuery Data Transfer Service の結果を Topic に pushしたり、Monitoring で Dataform の結果を監視して pushするというような活用方法も可能になるでしょう。
(あとで時間があったら試したいのですが、他に名案があれば募集中です)
まとめ
つい先日買収されて無料化されたばかりで日本語の情報も少ないですが、データ基盤に関心のある方々から熱い視線を集めている気配を(主に Twitter で)感じました。また今回触ってみて、何かと煩雑になりがちなデータパイプラインの構築・運用の一助になってくれるポテンシャルを感じました。
production でも導入できる機会を積極的に探っていきたいですね。
参考
YouTubeのvideoIDが不正です
Discussion