📊

Topological SortでBigQueryのジョブをスケジューリングする

2023/12/14に公開

https://qiita.com/advent-calendar/2023/zozo

概要

依存関係を持つBigQueryのSQLのジョブを、 Topological Sortによってスケジューリングする方法を紹介します。
dbtやDataformを利用すればおのずと依存関係に基づいたデータパイプラインを構築できますが、そのようなツールを導入しない簡易的な依存関係の解決手段としてお見知りおきください。

サンプル

サンプルのリポジトリです。今回はgraphlibを利用するためPythonで実装しています。
https://github.com/snagasawa/bigquery-job-scheduler

ユースケースとして、ECサイトで「ユーザーがお気に入りに登録しているショップで、20%以上の値引きがあり、なおかつ在庫が存在する商品」を取得するためのジョブを想定します。

スキーマ

データソースとなるテーブルのスキーマです。

schema

https://github.com/snagasawa/bigquery-job-scheduler/blob/main/sources.sql#L1-L38

SQL

5つのSQLを実行し、最後にfavorite_discount_goodsテーブルを作成します。

schema

https://github.com/snagasawa/bigquery-job-scheduler/blob/main/sqls/staging/shop_goods.sql

https://github.com/snagasawa/bigquery-job-scheduler/blob/main/sqls/staging/shop_goods_with_stocks.sql

https://github.com/snagasawa/bigquery-job-scheduler/blob/main/sqls/staging/discount_goods.sql

https://github.com/snagasawa/bigquery-job-scheduler/blob/main/sqls/staging/favorite_shops.sql

https://github.com/snagasawa/bigquery-job-scheduler/blob/main/sqls/reporting/favorite_discount_goods.sql

解説

主な処理の流れです。

https://github.com/snagasawa/bigquery-job-scheduler/blob/main/main.py#L109-L123

1. SQLファイルの取得

はじめにSQLファイルを取得し、SQL classのinstanceを生成します。
SQLファイルはsqlsディレクトリ下のすべての.sqlファイルが対象です。

https://github.com/snagasawa/bigquery-job-scheduler/blob/main/main.py#L110-L112

SQL classはread()でファイルを読み込み、get_dependencies()で自身の作成するテーブル名と、依存するテーブル名のlistからなるdictionaryを返します。

https://github.com/snagasawa/bigquery-job-scheduler/blob/main/main.py#L14-L42

例えば、sqls/reporting/favorite_discount_goods.sql はこのような値を返します。

{'project.reporting.favorite_discount_goods': ['project.staging.discount_goods',
                                               'project.staging.favorite_shops']}

2. 依存関係のグラフへの変換

すべてのSQLの依存関係からグラフを作成すると、以下の結果になります。

https://github.com/snagasawa/bigquery-job-scheduler/blob/main/main.py#L114-L116

{'project.reporting.favorite_discount_goods': ['project.staging.discount_goods',
                                               'project.staging.favorite_shops'],
 'project.staging.discount_goods': ['project.sources.discounts',
                                    'project.staging.shop_goods_with_stocks'],
 'project.staging.favorite_shops': ['project.sources.favorite_shops',
                                    'project.sources.shops',
                                    'project.sources.users'],
 'project.staging.shop_goods': ['project.sources.goods',
                                'project.sources.shop_goods',
                                'project.sources.shops'],
 'project.staging.shop_goods_with_stocks': ['project.sources.stocks',
                                            'project.staging.shop_goods']}

3. ジョブのスケジュールと実行

作成したグラフにTopological Sortをかけます。

https://docs.python.org/3/library/graphlib.html

https://github.com/snagasawa/bigquery-job-scheduler/blob/main/main.py#L118-L120

https://github.com/snagasawa/bigquery-job-scheduler/blob/main/main.py#L45-L56

結果、job_sequencesにはネストしたテーブル名のlistがセットされます。

[['project.sources.stocks',
  'project.sources.discounts',
  'project.sources.favorite_shops',
  'project.sources.shops',
  'project.sources.users',
  'project.sources.goods',
  'project.sources.shop_goods'],
 ['project.staging.favorite_shops',
  'project.staging.shop_goods'],
 ['project.staging.shop_goods_with_stocks'],
 ['project.staging.discount_goods'],
 ['project.reporting.favorite_discount_goods']]

このように依存関係にもとづいた順番になっていることがわかります。
先頭のlistはデータソースのテーブルのため、sliceで削除しています。

あとはlistごとに並列でジョブを実行します。

https://github.com/snagasawa/bigquery-job-scheduler/blob/main/main.py#L59-L86

futuresを出力すると、以下のような結果になります。

{<Future at 0x10477b800 state=finished returned NoneType>: 'project.staging.favorite_shops',
 <Future at 0x10479c5f0 state=finished returned NoneType>: 'project.staging.shop_goods'}
{<Future at 0x1040143b0 state=finished returned NoneType>: 'project.staging.shop_goods_with_stocks'}
{<Future at 0x104288560 state=finished returned NoneType>: 'project.staging.discount_goods'}
{<Future at 0x10477b6b0 state=finished returned NoneType>: 'project.reporting.favorite_discount_goods'}

4. グラフの描画

おまけで、最後にDigraphでグラフをPDFファイルに出力しています。
冒頭のSQLのグラフはこちらで出力したものです。

https://github.com/snagasawa/bigquery-job-scheduler/blob/main/main.py#L89-L106

まとめ

Topological SortでBigQueryのジョブをスケジューリングする方法を紹介しました。

Discussion