gcp環境のdbtでpythonコードによるtable定義
家計簿リネージ作成でサンキーダイアグラムを使いたいモチベが出てきました。
grafanaのpluginに存在したのでクエリを組むだけなのですが、いろいろ考えてもSQLでは厳しそう(できたところで可読性皆無になりそう)だったのでpythonにしようと考えました。
今回はAIではなく手書きです。
要件
- gcp環境で動作
- 可能な限り低料金
- 可能な限り単純
AI丸投げの結果
dataprocというプロダクトを使用する運びとなりました。
ざっくり言えばgcpマネージドのsparkで、pythonコードはgcp上で動作します。
dataprocとdataprepとdataflow
過去にもfirestoreとfilestoreで混乱しましたが、今回も現れました。
雑に認識した違いは以下
- dataproc
- hadoopやsparkのgcp上の実行環境
- dataprep
- GUIによるML準備のためのデータ加工
- 内部でdataflowを使っているとのこと
- dataflow
- データ処理のパイプライン
- バッチとストリーミング両対応
実装
dataprocの作業用のGCS bucketが必要とのことでした。
作成しパスをprofiles.ymlで定義します。
成り行きで今回はdataproc serverlessというものを使用しました。
クラスタ管理不要の従量課金制です。
自分の場合はMB行かないデータ量を週1回程度なのでクラスタは当然不要になります。
検証はできませんが頻度やデータ量でどこかに損益分岐点があると思いますし、そういう仕事はしてみたいですね。
profiles.yml
...
dataproc_region: us-central1
dataproc_cluster_name: dbt-python-cluster
gcs_bucket: "{{ env_var('DBT_GCS_BUCKET') }}"
submission_method: serverless
...
submission_method: serverless がポイントです。
使ってみて
pythonコード
pysparkデビューです。
dataframeのwrapperみたいな感じでchain methodでガシガシ繋げられます。
慣れればSQL文のように見えなくもないです。
from pyspark.sql.functions import col
def model(dbt, session):
# 元データを取得(Spark DataFrame)
base_spark_df = dbt.ref("table_name")
# カテゴリグループ -> カテゴリの集計
result_df = (
base_spark_df
.groupBy('category_group', 'category')
.sum('amount')
.withColumnRenamed('sum(amount)', 'value')
.select(
col('category_group').alias('group'),
col('category'),
col('value'),
)
)
return result_df
dataproc serverless
MB未満のデータ量で、1回10円行かないくらいのコストでした。
コスト面では文句なしです。
また、dbt定義のみでgcpのリソースは完結するので手間もかかりません。
ただ時間がかかりました。
1回4分程度かかり、データ量が多いとdbtのtimeoutを気にする必要があるかもです。
とはいえserverlessなので環境の起動とか、spark自体が並列志向なのでデータ量と実行時間は思ったより比例しないかもです。
イメージではy=ax+bのbがかなり大きいやつです。
( コストもそうだといいですが... )
こちらも機会があれば検証してみたいところです。
成果物
BQ上ではmodelのアイコンになります(MLなどと同じ)。
生成してしまえばtableと同じでただの2次元データな認識でtableとmodelの線引きが分からず🤔
深入りすると明確な違いがあるかもしれません。
終わりに
とりあえず土台まで作りました。
目標となるサンキーダイアグラム用のデータ生成は引き続き作成していきます。
思ったより気軽でSQLだと拗れそうなところはpythonという選択肢を手に入れることができました。
Discussion