Astronomer CosmosによるETLからダッシュボードまでのシームレスなデータ基盤構築
概要
AirflowとdbtをCosmosパッケージで活用し、ETLからダッシュボード公開に至るまでを一元的に扱う構成を示しています。これによって、データの抽出・変換・ロードを一貫して管理でき、データレイクからデータマートの構築までスムーズに進められます。また、ダッシュボードの更新の自動化が見込めます。GitHubリポジトリで既存の構成を公開しています。
構成
AirflowがあらゆるETL処理やデータベースへの書き込みを担当し、dbtがデータマートの作成を請け負うフローです。sqldefはまだ実装中で、本記事では対象外としています。
ディレクトリ構造
.
├── README.md
├── airflow/
│ ├── airflow-webserver.pid
│ ├── airflow.cfg
│ ├── airflow.db
│ ├── dags/
│ │ ├── xx/
│ │ │ ├── xx2db.py
│ │ │ └── dag.yaml
│ │ ├── dbt -> ../../dbt
│ │ ├── utils.py
│ │ └── yy/
│ │ ├── api2db.py
│ │ └── dag.yaml
│ ├── logs/
│ ├── scripts -> ../src/
│ └── webserver_config.py
├── dbt/
│ ├── dbt_project.yml
│ ├── macros/
│ ├── models/
│ │ ├── JQ/
│ │ │ ├── schema.yml
│ │ │ └── zz_analysis.sql
│ │ └── yy/
│ ├── profiles.yml
│ ├── seeds/
│ └── tests/
├── docker/
│ ├── Dockerfile
│ ├── airflow/
│ │ ├── clean_log.sh
│ │ ├── dag_trigger.sh
│ │ ├── init.sh
│ │ ├── kill.sh
│ │ ├── start.sh
│ │ └── task.sh
│ ├── build.sh
│ ├── requirements.txt
│ └── run.sh
└── src/
├── experimental/
│ └── db_reserch.ipynb
├── python/
│ ├── xx2csv.py
│ ├── utils/
│ │ ├── api_ops.py
│ │ ├── cruds.py
│ │ ├── io_csv.py
│ │ └── utils.py
│ └── yy_api2csv.py
├── sql/
└── streamlit/
└── app.py
実行フロー
Dockerイメージのビルドとコンテナの起動後、init.shとstart.shを使ってAirflow Webサーバーを立ち上げます。Airflowの管理画面から定期実行のバッチ処理をスケジュールでき、たとえばAPIからデータベースへ直接連携するDAGが動きます
-
docker/airflow/init.shでconnectionを追加することで、進捗をslackに通知している
-
DAG: deploy_streamlit を実行すると streamlit でダッシュボード公開することができる
実行フロー > データソース ~ CSV
Pythonスクリプトを用いてデータソースからテーブルへのインサート用CSVを生成します。
(新規テーブルの作成)
ref: https://discourse.getdbt.com/t/create-empty-table-through-dbt-seed/11667/7
次のようなインクリメンタルモデルを使用してテーブルを作成
airflow/dags/dbt/models/public/testtable.sql
{{
config(
materialized='incremental',
unique_key='id',
on_schema_change='sync_all_columns'
)
}}
SELECT
NULL::INT AS id,
NULL::VARCHAR AS name,
NULL::TIMESTAMP AS created_at
WHERE 1 = 0
実行フロー > csv ~ table(概念スキーマ)
psql で csv から table にバルクインサート。bashoperator が psql の 成否ステータスを正しく返せない点は要改善
実行フロー > table ~ mart(外部スキーマ)
dbt run を実行して mart を materialized view で作成
所感
特にディレクトリ構造の複雑化が顕著で dag-factory などのツールを活用してもコードベースが大規模化する傾向にある。管理については一部サブモジュールに切り出すなど、効率化の余地がある。構成はシステム全体がAirflowに依存する形になるので、各サービスが疎結合で独立している方が望ましそう。
Discussion