🐱

Astronomer CosmosによるETLからダッシュボードまでのシームレスなデータ基盤構築

2024/07/28に公開

概要

AirflowとdbtをCosmosパッケージで活用し、ETLからダッシュボード公開に至るまでを一元的に扱う構成を示しています。これによって、データの抽出・変換・ロードを一貫して管理でき、データレイクからデータマートの構築までスムーズに進められます。また、ダッシュボードの更新の自動化が見込めます。GitHubリポジトリで既存の構成を公開しています。

https://github.com/hayato540101/api2dashboard_example

構成

AirflowがあらゆるETL処理やデータベースへの書き込みを担当し、dbtがデータマートの作成を請け負うフローです。sqldefはまだ実装中で、本記事では対象外としています。

ディレクトリ構造
.
├── README.md
├── airflow/
│   ├── airflow-webserver.pid
│   ├── airflow.cfg
│   ├── airflow.db
│   ├── dags/
│   │   ├── xx/
│   │   │   ├── xx2db.py
│   │   │   └── dag.yaml
│   │   ├── dbt -> ../../dbt
│   │   ├── utils.py
│   │   └── yy/
│   │       ├── api2db.py
│   │       └── dag.yaml
│   ├── logs/
│   ├── scripts -> ../src/
│   └── webserver_config.py
├── dbt/
│   ├── dbt_project.yml
│   ├── macros/
│   ├── models/
│   │   ├── JQ/
│   │   │   ├── schema.yml
│   │   │   └── zz_analysis.sql
│   │   └── yy/
│   ├── profiles.yml
│   ├── seeds/
│   └── tests/
├── docker/
│   ├── Dockerfile
│   ├── airflow/
│   │   ├── clean_log.sh
│   │   ├── dag_trigger.sh
│   │   ├── init.sh
│   │   ├── kill.sh
│   │   ├── start.sh
│   │   └── task.sh
│   ├── build.sh
│   ├── requirements.txt
│   └── run.sh
└── src/
   ├── experimental/
   │   └── db_reserch.ipynb
   ├── python/
   │   ├── xx2csv.py
   │   ├── utils/
   │   │   ├── api_ops.py
   │   │   ├── cruds.py
   │   │   ├── io_csv.py
   │   │   └── utils.py
   │   └── yy_api2csv.py
   ├── sql/
   └── streamlit/
       └── app.py

実行フロー

Dockerイメージのビルドとコンテナの起動後、init.shとstart.shを使ってAirflow Webサーバーを立ち上げます。Airflowの管理画面から定期実行のバッチ処理をスケジュールでき、たとえばAPIからデータベースへ直接連携するDAGが動きます

  • docker/airflow/init.shでconnectionを追加することで、進捗をslackに通知している

  • DAG: deploy_streamlit を実行すると streamlit でダッシュボード公開することができる

実行フロー > データソース ~ CSV

Pythonスクリプトを用いてデータソースからテーブルへのインサート用CSVを生成します。

(新規テーブルの作成)

ref: https://discourse.getdbt.com/t/create-empty-table-through-dbt-seed/11667/7

次のようなインクリメンタルモデルを使用してテーブルを作成

airflow/dags/dbt/models/public/testtable.sql
{{
    config(
        materialized='incremental',
        unique_key='id',
        on_schema_change='sync_all_columns'
    )
}}

SELECT 
    NULL::INT AS id,
    NULL::VARCHAR AS name,
    NULL::TIMESTAMP AS created_at
WHERE 1 = 0 

実行フロー > csv ~ table(概念スキーマ)

psql で csv から table にバルクインサート。bashoperator が psql の 成否ステータスを正しく返せない点は要改善

実行フロー > table ~ mart(外部スキーマ)

dbt run を実行して mart を materialized view で作成

所感

特にディレクトリ構造の複雑化が顕著で dag-factory などのツールを活用してもコードベースが大規模化する傾向にある。管理については一部サブモジュールに切り出すなど、効率化の余地がある。構成はシステム全体がAirflowに依存する形になるので、各サービスが疎結合で独立している方が望ましそう。

Discussion