個人開発した野球Webアプリに、勉強のアウトプットとして、データ分析基盤追加してみた。
あらまし
(今まで個人の業務経験的に)データ分析基盤周りに着眼すると、AWS環境、RDBを中心とした分析環境に対し様々なノードからのETL処理の開発やBIツールを使った可視化など携われた。
なお、分散処理、DWHに関する昨今のツールや分析基盤の開発運用の知見を増やすため、ツールの資格取得し、データエンジニアリング関連の本を何冊か読んだ。それを踏まえ、以前に個人開発したWebアプリのデータを題材に分析基盤を構築したのでその内容をアウトプットする。
以下を意識して書いた。
- 特に、データ分析基盤における基本的な開発、運用の流れやその項目を全体として記載。
- また、流れの項目の中でツールをどう利用するのか。
- (IaCまで対応できてないが)構築の再現性。
要件整理、設計
ユースケース
野球のスコアブックWebアプリ(の検証環境に)分析環境を構築する。
- 既存
- 野球のスコアブックWebアプリは、1試合単位で投球データを自動集計できる。
- 新規追加機能
- 分析基盤で複数の試合をまたいでSQL、BIデータ分析できるようにする。
※メモ
勉強目的でお試しで構築のため、本機能は利用ユーザーさんに公開しない。利用データについても検証環境の私が登録したデータのみを対象に分析基盤の環境の構築を行う
既存業務、オペレーション整理
複数試合分析するときが、既存のオペレーションだと手間と考える。
CRUD図、関係者やデータソース整理
ユーザーからWebアプリに試合の情報を入力してもらってデータは発生する。新規データ追加で考えているお天気情報はAPIから取得する。
ETLパイプライン
(勉強のため、ETLの構築で使ったことないツールを利用)
Airflowが全体のELTパイプラインを管理。
- ①. DynamoDBにある各種テーブル情報を取得してS3にデータをJSON形式で配置
- ②. ①が成功したらDatabricks Jobに登録したPythonスクリプトを実行して、Delta Lake形式で各種テーブルをロード
- ③-1. ②が成功したらDatabricks Jobに登録したdbtタスクを実行して、スタースキーマに変換
- ③-2. ③-1が成功したらユースケースを満たすためによく使われると想定のテーブルを作成
- ④. BIやSQLで③-2をベースに分析
※メモ
当Airflowジョブが1時間に1回起動。1時間の間に追加、更新があった内容を取得。
- 基本的にデータはインサートオンリーを前提。
- ユーザー情報テーブルのニックネームが1時間の間に複数回の更新があった場合は最終の更新しか、フォローできないが、分析においてニックネーム情報は重要はでないので許容する。
- またユーザー情報テーブルのEmailアドレス情報(検証環境のため私のアドレスのみだが)は分析基板側にロードするタイミングでハッシュ化。
設計、構築作業
データプロファイリング
ソースシステム側のデータ品質状況を確認し、作業内容を調整する。
- 想定外の値のカラムが入っていないか。nullチェック。
- 異常値はないか。許容範囲を超える値は入っていないか。
- 予定のデータモデリングの項目とデータソースのマッピングを明確にし、情報として満たされているか、不足している情報はないか
上記実施し、天気の情報、及び打者が何打席目(という情報があるのだが)、それらは今回はスコープ外とする。
- 天気の情報については、ソースデータに位置を示すような情報が入っていなかったので実現するためにはWebアプリに球場など場所を入れる機能が必要。
- また、打席情報についてもアプリケーションから登録もなく、また内部的なコードでも特に管理されておらず、デフォルト値の1が入っているだけであった。何打席目かという視点で分析が必要とする場合は、Webアプリの改修とデータの再集計が必要。
データモデリング
- テーブル定義の図はこちらに。
- データソースはRDBではなく、重複データがあり第三正規化されたものではない。以下4テーブルに別れSequece(投球結果)を中心として、それぞれの情報を一度すべて結合したものをイベントテーブルとして整理する。
- Game(試合、メンバー情報)
- Situation(アウトカウント、ランナー、相手バッターなどの対戦シチュエーション情報
- Sequece(投球結果)
- User(ユーザー情報)
- 整理したイベントテーブルからディメンション、ファクトテーブルを切り出す。
- ディメンションテーブルとして切り出したもの、その考え
- 分析の軸とする項目(クエリを構築するときの可読性のため)
- 正規化すべき繰り返し。
- 値が決まっている。表記揺れなどはディメンションテーブルで抑止
- 履歴として別管理するような項目
- ユニークキーや数値集計要素はファクトテーブルだが、事実や数値に対しての説明情報がディメンション
- ディメンションテーブルとして同じくくりにするかどうかについて
- 粒度(一意性)がある程度同じで組み合わせが大幅に増幅しない
- ファクトテーブル、ディメンションテーブルの元ネタ(イベントテーブルの)抜粋
- ディメンションテーブルとして切り出したもの、その考え
- 変更履歴としておいかけるのはユーザー情報のテーブルだけなのでそのテーブルについては、有効開始、終了日、現在値の判別カラムをもうけた。
- なお、打者や投手の情報はGameやSituationの中に重複して入っているので、今々はできないが、ソースデータを元に、RDBでマスターデータ管理をやってビジネスキーを定めれば、打者、投手情報についても履歴管理したら、選手が移籍しても同一選手として分析するとかできる
ETL構築
Airflowのローカル構築
Airflow Breezeというローカル開発用のコードが準備されていたが、勉強用なので準備がシンプルだったAWSマネージドサービスMWAAのローカルエミュレート用にとしてローカル環境にたてた。
構築の流れ
-
docker/config/.env.localrunnerにdagのコードで利用する環境変数を定義
- Databricksのジョブは作成すると払い出されるIDがあるのでAirflowとDatabricksとの間では、API KEYの他、ジョブIDで連携。
-
requirements/requirements.txtに以下のdatabricksのパッケージを追記。
apache-airflow-providers-databricks
- 起動コマンド
./mwaa-local-env build-image
./mwaa-local-env start
- 起動時のメモ
- airflowのローカルDBにPostgreSQLを利用しているがコンテナ開発環境にRancherデスクトップを使っており、起動に以下の記事の対処を行い助かった。
- localhost:8080にアクセスすると画面が起動される。
- DatabricksのAPIKEYはadmin→Connections→Connection IDがDatabricks defaultのPasswordに設定。同じ画面の入力欄のHost名も登録必要
- 登録Host名のサンプル https://dbc-xxxxx.cloud.databricks.com/
- DatabricksのAPI KEYは、Databricks画面のユーザーの人形アイコンから「設定」→「ユーザー」→「開発者」→「アクセストークン」から発行可能
- DAGはdagsフォルダにいれるとロードしてくれる。
- DAGのコード
- コードの内容はETLパイプラインの内容の通りで全体のフローを管理。
- DAGsのGraphの画面にて各タスクの実行状態が確認できる。
- また、失敗した場合でもClearボタンにて失敗したタスクから再実行できる。
Databricksのジョブの登録
- Delta Lake形式でDatabricks環境にロードするときにPythonスクリプトを使用しており、Gitで管理したそのPythonスクリプトをDatabricksジョブとして登録する方法
-
「ワークフロー」→「ジョブとパイプライン」→「作成」→「ジョブ」
-
以下の項目を入力
- Gitのコードから読み込むようにしているのでGitのURLとブランチmainを指定
- またコードのパスも指定してあげる
-
右ペインの「クラスター」の「設定する」から開くページで、AWSのIAMで事前にインスタンスプロファイルを作成し(※)、それを設定する必要がある。
元々Databricksが自分のAWSアカウント上で構築された際に作成されているEC2関連の許可に、PythonスクリプトがやりたいことはS3からJSONデータ取得するという内容なのでS3の権限追加する形となる。
(※)AWSのIAMで事前にインスタンスプロファイルを作成する手順
https://docs.databricks.com/aws/ja/connect/storage/tutorial-s3-instance-profile -
ジョブを作成すると「ワークフロー」→「ジョブとパイプライン」→「ジョブ名」をクリックするとJOB IDが右ペインに表示されるのでそのJOB IDを控えておいて、Airflowのコードに埋め込む。
コードの以下の箇所
# Databricksへロードジョブ(rawレイヤー)
load_from_s3_to_raw_layer = DatabricksRunNowOperator(
task_id=f"load_{table_name}_to_delta",
databricks_conn_id=DATABRICKS_CONN_ID,
job_id=DATABRICKS_JOB_ID_LOAD_TO_RAW, # ここ
python_params=[table_name, s3_path],
trigger_rule=TriggerRule.ALL_SUCCESS
)
- 次に、クレンジングやスタースキーマ及びデータマート層のテーブル作成にdbtを使用しており、同じくGit管理したdbtのソースをDatabricksジョブとして登録する方法
- 同じく「ワークフロー」→「ジョブとパイプライン」→「作成」→「ジョブ」
- 以下の項目を入力。
- Gitのプロジェクトディレクトリはdbtという名前でフォルダを作成しているのでそれを指定
- dbtコマンドは上から順に実行される。コマンドの内容は次の事項で記述。
- クラスターのインスタンスプロファイルの追加はこのジョブについてはDatabricks内の処理のみなので不要
- 同じくジョブを作成すると「ワークフロー」→「ジョブとパイプライン」→「ジョブ名」をクリックするとJOB IDが右ペインに表示されるのでそのJOB IDを控えておいて、Airflowのコードに埋め込む。
コードの以下の箇所
# datamartレイヤーの作成まで実行するDatabricksジョブ(最終処理)
create_pitching_datamart_from_raw = DatabricksRunNowOperator(
task_id="create_pitching_datamart_from_raw",
databricks_conn_id=DATABRICKS_CONN_ID,
job_id=DATABRICKS_JOB_ID_CREATE_PITCHING_DATAMART_FROM_RAW, ## ここ
python_params=[],
trigger_rule=TriggerRule.ALL_SUCCESS
)
dbt
Delta形式でロードしてから、クレンジング、スタースキーマ、データマートの変換にdbt Coreを利用している。
- dbtも初めて使うのでまずはこちらで勉強させていただいた。
-
dbtのコード
- 野球のルールに準拠した固定的なディメンショナルテーブルを対象(ボールカウント、アウトカウント、イニング、ランナー、結果判定など)に、それに対しては、
dbt seed
を使ってcsvデータからテーブルを作成。 -
dbt run --select raw_pitching_event
でまずは、ソースからロードしたrawテーブルを結合してフラットデータのテーブル作成。 - 変更履歴を管理をすることにしたユーザー情報テーブルは
dbt snapshot
を利用し作成。
- 野球のルールに準拠した固定的なディメンショナルテーブルを対象(ボールカウント、アウトカウント、イニング、ランナー、結果判定など)に、それに対しては、
{% snapshot dim_owner %}
{{
config(
target_schema='default',
unique_key='owner_id',
strategy='check',
check_cols=["email","username"]
)
}}
SELECT
-- サロゲートキー
md5(concat_ws('||', owner, email, username)) AS owner_key,
owner AS owner_id,
email,
username,
current_timestamp() AS created_at_snapshot,
'dbt/snapshot' AS created_by,
current_timestamp() AS updated_at_snapshot,
'dbt/snapshot' AS updated_by,
'NO' AS is_missing
FROM {{ source('raw','raw_user_xxxxxxxxxx_staging') }}
WHERE coalesce(email, '') <> '' OR coalesce(username, '') <> ''
{% endsnapshot %}
- 上記dbt snapshotに使ったコード。
- check_colsの指定があるemailとusernameに変更があった場合、unique_keyの指定のowner_idが同一のものがあれば、dbtが変更履歴に準拠したカラムを準備して新しい行を追加してくれる。
- dbtでカラム追加してくれるもののひとつでdbt_valid_toがnullのものが最新の値となる。
- (なお、コード上、自分でサロゲートキー作っていたがdbtで作ってくれてるdbt_scd_idで代用できそうだ)
- フラットデータからincrementalモデルとして、各ディメンションテーブル、ファクトテーブルを作成。
dbt run --select 'staging.dim_*'
。dbt run --select fact_pitching
。 - データウェアハウス層に作成したスタースキーマのテーブルを活用し、サロゲートキーでファクトテーブルと各ディメンションテーブルを結合したテーブルを、最終的にデータマート層のテーブルとして作成。コマンドとしては
dbt run --select mart_pitching
テストコード
dbtでテストを少し用意した。
- Singular Test
- パイプライン動作時の最終的な網掛け的な立ち位置で、ローデータのデータを結合したフラットなイベントテーブルのデータ件数とデータマートで作成したテーブルのデータ件数を比較し、想定の割合を大きく下回った場合、想定外のデータ結合があり、データ品質が低いと考え、以下のようなテストを作成。データマートの件数がローイベントテーブルの8割以下だった場合、値1が返ってテストが失敗する。
WITH raw_pitching_count AS (
SELECT COUNT(*) AS cnt FROM {{ ref('raw_pitching_event') }}
),
mart_pitching_count AS (
SELECT COUNT(*) AS cnt FROM {{ ref('mart_pitching') }}
)
SELECT 1
FROM raw_pitching_count, mart_pitching_count
WHERE (mart_pitching_count.cnt / raw_pitching_count.cnt) < 0.8
- ジェネリックテスト
- 変動があるディメンションテーブルを対象に簡単なカラムのチェック
version: 2
models:
- name: dim_batter
columns:
- name: batter_key
tests:
- unique
- not_null
- name: dim_pitcher
columns:
- name: pitcher_key
tests:
- unique
- not_null
- name: dim_game_day
columns:
- name: game_day_key
tests:
- unique
- not_null
- name: fact_pitching
columns:
- name: pitching_id
tests:
- unique
- not_null
- (あとPythonのユニットテスト用意などしていきたい)
動作確認
Webアプリから野球の試合データを登録し、Airflowジョブを動かします。1時間に1回定時処理で動くようになっている。
このコードの以下のschedule_interval='@hourly'
@dag(start_date=datetime(2024, 1, 1), schedule_interval='@hourly', catchup=False, tags=['dynamodb', 's3', 'deltalake'])
def upsert_pitching_datamart_flow():
そして実行されて問題なく処理されると「DAGS」→「各dag名」→「Graph」の画面で以下のような各Jobがグリーンの枠でいろどられて、最後のJobまでSuccessが確認されるとデータマート層まで処理が問題なく完了できていることになる。
そして更新されたデータマート層のテーブルを参照して、Databricksでクエリを書き、その出力データを元に、Canvasで可視化を行った。
無事、Webアプリの機能である同一投手で同一試合の自動データ集計機能だけでなく、当初の目的である同一投手で、複数試合のデータ分析できる環境が整った。
※蛇足ですが、複数試合の分析環境の動作確認の詳細は本ページの「感想」の「蛇足」にも記述。
運用
データリネージ
データリネージの各ツールでの確認方法。
- AirflowのGraph
- 少なくともここでエンドエンドでどのようなジョブを使ってるかフローがGraphの画面で描画できるようにコードを書いた。
- 再掲となるが以下がそのGraph
- Databricksの各テーブルのリネージ
- Databricksに入ってからはテーブルのリネージ表示で自動で生成してくれる
- どこからそのデータが生成されているのかテーブルを追跡してくれる。
- dbt Coreのリネージ情報などのドキュメント生成
- ドキュメント生成
dbt docs generate
- ドキュメント表示のWebサーバ起動
dbt docs serve
- なお、sourceで元データテーブルを指定することでdbtで作成したテーブル以外にもリネージの描画も可能となる
- ドキュメント生成
品質チェック
- テストの他に、運用作業として定常的な品質チェックとしてDatabricksの各テーブルの「品質」タブからワンプッシュでダッシュボードの確認が良さそうだと感じた。データ件数やNull値やZero値が多いカラムの可視化、主要なデータプロファイリングを自動でやってくれ、データ品質モニタリングが可能。
- 実際に分析として利用されるのは主にデータマート層だが、なるべくデータ品質は上流から抑えていきたいため、ソース側のチェックも重要と考える。
メタデータ管理
(無料期間の時間切れであまり各種画面や機能を吟味できていませんが感想)
- テーブルの説明についてDatabricksのAIが素案を英語で生成してくれてた。
- また、テーブルの利用、参照頻度の表示も携帯の電波表示のようなアイコン自動で頻度表示してくれており、長い期間で使わなくなったものとかも視認できそうであった。
- 各データの説明については、この規模ではもちろんしっかり手動で説明を入れていく割合を増やしていきたいが、自動で突っ込まれた説明と手動で入力したメタデータの比率なども自動化してくれるのかなど後々気になった。
- テーブルのカラムの型定義については自分で定義せずとも、Databricksで自動推定してくれた。
- (メタデータというよりアクセス制御になるが、ユーザー個々でデータの表示を出し分けるやりかたとして、UnityCatalogで制御して特定カラムのユーザー識別子と一致しないと表示しないといった形もやってみたかった)。
感想
ツール利用について
Airflow、dbt、Databricksともにさわるのは初めてだった。
なお、いまさらなのだろうが、dbtの便利さに驚いた。
ちょっと前に開発でETLパイプラインを作成させてもらったときは、一時テーブルを作成して保存したり、外部データベースを参照するような設定をしたり、データの参照は一手間した覚えがある。Jinjaテンプレートなどで、コードでさっと参照でき、またテストも書きやすい。
Databricksのジョブも便利だった。キューに入れてそっからジョブを実行する設定としては極めて楽だった。以前はAWS Batchで構築した。また、あらゆるタイプのジョブに対応していて、Pythonスクリプトだろうが、dbtだろうがコードを書砕けで設定の負荷はそこまでないみたいな。
気になること
また、Databricksの2週間の無料期間が過ぎてしまったり、諸々の事情でここまでにとどまってしまったが、分析基盤を担当するようなことになれば以下、なんらかの方法で簡単に実装してみて理論だけでなく理解を定着、深めていきたい。
- メタデータ管理
- S3Tables
- ストリーミング
- 監視連携
- セルフサービス
- セマンティックレイヤー
- リバースETL
- モデリング
- スタースキーマ
- Data Valt
- 機械学習、AI
- streamlit
- どこまでIaCできるか
蛇足
データ分析基盤に流したデータは2025年の田中将大投手の投球データ(4,5月の1軍登板、5月のオイシックス戦の2軍登板)。Webアプリから登録しました。球種だけですが整理して、以下楽観的な素人示唆。
- スライダーが空振りを一番多く取れている球種の模様。打ち取っている球がストレート、SFFが多いが、打たれている球もストレート、SFFが多いので、スライダーの割合増やしたろうどうだろうと思った。
- 非常に投球割合としては少ないが、時折投げているチェンジアップがほぼ全球有効。突如としていつもと異なる球を投げることが有効な配球となるということもあると思うが、球種としてそもそも有効なのかもしれない。チェンジアップもっと使ってみたい?
- ギアチェンしたと思われる際のストレート。球速が早いほうはばっちり抑えられている。コースまでみてませんが、ハーフスピードはツーシーム、カットボールも危険?
田中投手の200勝みたい。圧倒的応援。人の応援してる場合じゃない説。
おしまい。
Discussion