dbtとastronomer_cosmosを用いた高度なデータ基盤構築
はじめに
こんにちは
株式会社Rehab for JAPAN 開発2部データプラットフォームチームのデータエンジニア のすです。
記事を書くにあたっての背景
私は普段、弊社のデータ基盤「CDP(Care Data Platform)」の設計、開発、運用、保守、可視化を担当しています。前回の記事では、dbt(data build tool)の導入を試みた背景や効果についてご紹介しました。
今回はさらに一歩進み、astronomer-cosmos と Airflow を活用して、dbtの自動化と高度な運用方法について詳しくお話しします。
ターゲット
以下の項目に当てはまるデータエンジニアやその関係者様:
- データ分析基盤を構築中 または 運用を見直したい方
- dbtを本格導入したい方
- Airflowとdbtの連携に興味がある方
- メタデータ管理を効率化したい方
要約
dbtは、データ基盤におけるETL(Extract, Transform, Load)プロセスのTransform部分を担うツールです。SQLをベースにした簡潔な記述が特徴で、以下の利点があります:
- メンテナンス性の向上: コードの再利用が容易。
- 運用効率化: 環境ごとの柔軟な切り替えが可能。
- データ品質テストの容易性: 標準機能でテストを実施。
本記事では、AirflowとCosmosを活用して以下を実現する手法をご紹介します:
- dbtモデルの実行自動化
- 環境ごとの切り替え
- モデル選択や除外の柔軟な設定
- テーブル名の重複を回避しつつ同一テーブル名を作成
AirflowとCosmosを使ったdbtの高度な活用方法
dbtとは?(簡単なおさらい)
dbt(data build tool)は、データ基盤で発生する大量のSQL処理を整理し、効率化するためのツールです。ETLプロセスの中でも「Transform」を専門に担い、以下の特徴があります:
- SQLを直接記述できるため、学習コストが低い。
- モジュール化されており、再利用性が高い。
- テストやメタデータ管理機能が標準搭載。
AirflowとCosmosを選んだ理由
Airflowは、スケジュールや依存関係を管理するワークフローオーケストレーターですが、dbtの設定はやや煩雑です。そこで、Cosmosを使うことで次のような課題を解決しました:
-
DAG作成の簡素化
Cosmosを使えば、dbtタスクを効率的にAirflow DAGに統合できます。 -
柔軟なモデル管理
Cosmosでは、特定のモデルだけを実行したり、除外したりする設定が簡単に行えます。 -
環境切り替え
Cosmosのプロファイル設定で、開発環境と本番環境を容易に切り替えられます。
使用するライブラリと設定方法
Cosmosで使用する主要ライブラリ
ライブラリ名 | 役割 |
---|---|
dbt-core | dbtの主要処理 |
astronomer-cosmos | Airflow DAG上でdbtタスクを簡素化 |
dbt-osmosis | メタデータ管理(BigQueryとの連携など) |
profiles.yml
の設定例
以下は、環境切り替えに対応するprofiles.yml
の例です:
dbt_cosmos:
target: "{{ var('environment') }}"
outputs:
prod:
type: bigquery
method: oauth
project: "prod-project-id"
dataset: "{{ var('schema') }}_{{ var('product_name') }}"
dev:
type: bigquery
method: oauth
project: "dev-project-id"
dataset: "{{ var('schema') }}_{{ var('product_name') }}"
DAG実行時に、以下のようにパラメータを渡します:
DBT_VARS = {
"environment": "prod",
"schema": "staging",
"product_name": "product_name",
}
Airflowでのdbtタスク自動化
CosmosによるDAGタスク構築
以下は、Cosmosを使ったDAGタスクの例です:
from cosmos.task_group import DbtTaskGroup
from cosmos.config import ProjectConfig, ExecutionConfig, RenderConfig, ExecutionMode, LoadMode, TestBehavior
dbt_task = DbtTaskGroup(
group_id="staging_{product_name}",
project_config=ProjectConfig(
dbt_project_path="/path/to/dbt_project",
),
profile_config={
"target": "prod",
"type": "bigquery",
"method": "oauth",
},
execution_config=ExecutionConfig(
execution_mode=ExecutionMode.VIRTUALENV,
),
render_config=RenderConfig(
load_method=LoadMode.DBT_LS,
test_behavior=TestBehavior.AFTER_EACH,
select=["path:models/staging_{product_name}/*"],
exclude=["path:models/staging_{product_name}/excluded_model.sql"],
),
)
特定モデルの選択と除外
Cosmosでは、特定のモデルだけを選択したり、除外したりする設定が簡単にできます。
モデル選択例
select=["path:models/staging_{product_name}/specific_model.sql"]
モデル除外例
exclude=["path:models/staging_{product_name}/excluded_model.sql"]
詰まったポイントと解決策
テーブル名の重複問題
プロジェクトや企業によっては、異なるデータセットでも同じテーブル名を使用したい場合があります。しかし、dbtのプロジェクトでは、各モデルに一意の名前を付ける必要があります。この制約により、同じテーブル名を使用することが難しいという問題が発生しました。
alias
の利用
解決策:この問題を解決するために、alias
を利用しました。alias
を使用することで、dbtモデルの名前とは異なるテーブル名をデータベース上に作成することができます。以下は、alias
を設定する例です:
version: 2
models:
- name: specific_model
description: "This is a specific model for staging_{product_name}."
columns:
- name: id
description: "The unique identifier for each record."
- name: name
description: "The name of the entity."
aliasの使い方
aliasを使用することで、データベース上でのテーブル名を変更することができます。例えば、上記の設定では、specific_modelというモデルがデータベース上ではspecific_model_aliasという名前で作成されます。
version: 2
models:
- name: specific_model
+alias: specific_model_alias
description: "This is a specific model for staging_{product_name}."
columns:
- name: id
description: "The unique identifier for each record."
- name: name
description: "The name of the entity."
この設定により、specific_modelというモデルがデータベース上ではspecific_model_aliasという名前のテーブルとして作成されます。これにより、異なるデータセットでも同じテーブル名を使用することが可能になりました。
プロジェクト構成とディレクトリ例
以下は、dbtとCosmosを用いたプロジェクトのディレクトリ例です:
dags/
├── dbt/
│ ├── dbt_project/
│ │ ├── models/
│ │ │ ├── staging_{product_name}/
│ │ │ └── warehouse_{product_name}/
│ │ ├── macros/
│ │ ├── dbt_project.yml
│ │ ├── profiles.yml
Cosmosとdbtの連携によるメリット
自動化の利点
Cosmosとdbtを連携させることで、データパイプラインの自動化が非常に簡単になります。これにより、手動での作業が大幅に減ります。また、定期的なデータ更新やバッチ処理が自動化されるため、常に最新のデータを保つことができます。
柔軟な環境設定
Cosmosのプロファイル設定を利用することで、開発環境と本番環境を簡単に切り替えることができます。これにより、異なる環境でのテストやデプロイがスムーズに行えるようになります。環境ごとの設定変更も容易に行えるため、運用の効率が向上します。
モデル管理の効率化
Cosmosを使用することで、特定のモデルだけを実行したり、除外したりする設定が簡単に行えます。これにより、必要なデータだけを効率的に処理することが可能になります。モデルの選択や除外が柔軟に行えるため、運用の自由度が高まります。
まとめ
AirflowとCosmosを導入することで、dbtを活用したデータ基盤の運用が格段に効率化されます。本記事の要点を振り返ると、以下のようなメリットがあります:
- 環境の柔軟な切り替え: 開発環境と本番環境を簡単に切り替えることができ、異なる環境でのテストやデプロイがスムーズに行えます。
- モデル選択や除外が容易: 特定のモデルだけを実行したり、除外したりする設定が簡単に行え、必要なデータだけを効率的に処理することが可能です。
- ワークフロー全体の効率的な自動化: データパイプラインの自動化が容易になり、手動での作業が減ります。また、定期的なデータ更新やバッチ処理が自動化されるため、常に最新のデータを保つことができます。
これらのメリットを活かして、より効率的で柔軟なデータ基盤の運用を実現しましょう。
Rehabではエンジニアの採用募集してます!
CDPのデータ基盤はできて間もないため、新しい技術やあなたの意見が取り入れやすい環境です。単なるデータ基盤ではなく、利用者様や介護領域への業界を変革する一つのリソースとなりうる基盤なため業界の変化を楽しみながら基盤構築に携わることが可能です。データ基盤の開発を通して社会課題に向き合いたい方歓迎です!
Discussion