🗂️

dbtとastronomer_cosmosを用いた高度なデータ基盤構築

2025/02/03に公開

はじめに

こんにちは
株式会社Rehab for JAPAN 開発2部データプラットフォームチームのデータエンジニア のすです。

記事を書くにあたっての背景

私は普段、弊社のデータ基盤「CDP(Care Data Platform)」の設計、開発、運用、保守、可視化を担当しています。前回の記事では、dbt(data build tool)の導入を試みた背景や効果についてご紹介しました。

今回はさらに一歩進み、astronomer-cosmosAirflow を活用して、dbtの自動化と高度な運用方法について詳しくお話しします。


ターゲット

以下の項目に当てはまるデータエンジニアやその関係者様:

  • データ分析基盤を構築中 または 運用を見直したい方
  • dbtを本格導入したい方
  • Airflowとdbtの連携に興味がある方
  • メタデータ管理を効率化したい方

要約

dbtは、データ基盤におけるETL(Extract, Transform, Load)プロセスのTransform部分を担うツールです。SQLをベースにした簡潔な記述が特徴で、以下の利点があります:

  • メンテナンス性の向上: コードの再利用が容易。
  • 運用効率化: 環境ごとの柔軟な切り替えが可能。
  • データ品質テストの容易性: 標準機能でテストを実施。

本記事では、AirflowとCosmosを活用して以下を実現する手法をご紹介します:

  1. dbtモデルの実行自動化
  2. 環境ごとの切り替え
  3. モデル選択や除外の柔軟な設定
  4. テーブル名の重複を回避しつつ同一テーブル名を作成

AirflowとCosmosを使ったdbtの高度な活用方法

dbtとは?(簡単なおさらい)

dbt(data build tool)は、データ基盤で発生する大量のSQL処理を整理し、効率化するためのツールです。ETLプロセスの中でも「Transform」を専門に担い、以下の特徴があります:

  • SQLを直接記述できるため、学習コストが低い。
  • モジュール化されており、再利用性が高い。
  • テストやメタデータ管理機能が標準搭載。

AirflowとCosmosを選んだ理由

Airflowは、スケジュールや依存関係を管理するワークフローオーケストレーターですが、dbtの設定はやや煩雑です。そこで、Cosmosを使うことで次のような課題を解決しました:

  1. DAG作成の簡素化
    Cosmosを使えば、dbtタスクを効率的にAirflow DAGに統合できます。

  2. 柔軟なモデル管理
    Cosmosでは、特定のモデルだけを実行したり、除外したりする設定が簡単に行えます。

  3. 環境切り替え
    Cosmosのプロファイル設定で、開発環境と本番環境を容易に切り替えられます。


使用するライブラリと設定方法

Cosmosで使用する主要ライブラリ

ライブラリ名 役割
dbt-core dbtの主要処理
astronomer-cosmos Airflow DAG上でdbtタスクを簡素化
dbt-osmosis メタデータ管理(BigQueryとの連携など)

profiles.ymlの設定例

以下は、環境切り替えに対応するprofiles.ymlの例です:

dbt_cosmos:
  target: "{{ var('environment') }}"
  outputs:
    prod:
      type: bigquery
      method: oauth
      project: "prod-project-id"
      dataset: "{{ var('schema') }}_{{ var('product_name') }}"
    dev:
      type: bigquery
      method: oauth
      project: "dev-project-id"
      dataset: "{{ var('schema') }}_{{ var('product_name') }}"

DAG実行時に、以下のようにパラメータを渡します:

DBT_VARS = {
    "environment": "prod",
    "schema": "staging",
    "product_name": "product_name",
}

Airflowでのdbtタスク自動化
CosmosによるDAGタスク構築
以下は、Cosmosを使ったDAGタスクの例です:

from cosmos.task_group import DbtTaskGroup
from cosmos.config import ProjectConfig, ExecutionConfig, RenderConfig, ExecutionMode, LoadMode, TestBehavior

dbt_task = DbtTaskGroup(
    group_id="staging_{product_name}",
    project_config=ProjectConfig(
        dbt_project_path="/path/to/dbt_project",
    ),
    profile_config={
        "target": "prod",
        "type": "bigquery",
        "method": "oauth",
    },
    execution_config=ExecutionConfig(
        execution_mode=ExecutionMode.VIRTUALENV,
    ),
    render_config=RenderConfig(
        load_method=LoadMode.DBT_LS,
        test_behavior=TestBehavior.AFTER_EACH,
        select=["path:models/staging_{product_name}/*"],
        exclude=["path:models/staging_{product_name}/excluded_model.sql"],
    ),
)

特定モデルの選択と除外
Cosmosでは、特定のモデルだけを選択したり、除外したりする設定が簡単にできます。

モデル選択例

select=["path:models/staging_{product_name}/specific_model.sql"]

モデル除外例

exclude=["path:models/staging_{product_name}/excluded_model.sql"]

詰まったポイントと解決策

テーブル名の重複問題

プロジェクトや企業によっては、異なるデータセットでも同じテーブル名を使用したい場合があります。しかし、dbtのプロジェクトでは、各モデルに一意の名前を付ける必要があります。この制約により、同じテーブル名を使用することが難しいという問題が発生しました。

解決策:aliasの利用

この問題を解決するために、aliasを利用しました。aliasを使用することで、dbtモデルの名前とは異なるテーブル名をデータベース上に作成することができます。以下は、aliasを設定する例です:

version: 2

models:
  - name: specific_model
    description: "This is a specific model for staging_{product_name}."
    columns:
      - name: id
        description: "The unique identifier for each record."
      - name: name
        description: "The name of the entity."

aliasの使い方
aliasを使用することで、データベース上でのテーブル名を変更することができます。例えば、上記の設定では、specific_modelというモデルがデータベース上ではspecific_model_aliasという名前で作成されます。

version: 2

models:
  - name: specific_model
    +alias: specific_model_alias
    description: "This is a specific model for staging_{product_name}."
    columns:
      - name: id
        description: "The unique identifier for each record."
      - name: name
        description: "The name of the entity."

この設定により、specific_modelというモデルがデータベース上ではspecific_model_aliasという名前のテーブルとして作成されます。これにより、異なるデータセットでも同じテーブル名を使用することが可能になりました。

プロジェクト構成とディレクトリ例
以下は、dbtとCosmosを用いたプロジェクトのディレクトリ例です:

dags/
├── dbt/
│   ├── dbt_project/
│   │   ├── models/
│   │   │   ├── staging_{product_name}/
│   │   │   └── warehouse_{product_name}/
│   │   ├── macros/
│   │   ├── dbt_project.yml
│   │   ├── profiles.yml

Cosmosとdbtの連携によるメリット

自動化の利点

Cosmosとdbtを連携させることで、データパイプラインの自動化が非常に簡単になります。これにより、手動での作業が大幅に減ります。また、定期的なデータ更新やバッチ処理が自動化されるため、常に最新のデータを保つことができます。

柔軟な環境設定

Cosmosのプロファイル設定を利用することで、開発環境と本番環境を簡単に切り替えることができます。これにより、異なる環境でのテストやデプロイがスムーズに行えるようになります。環境ごとの設定変更も容易に行えるため、運用の効率が向上します。

モデル管理の効率化

Cosmosを使用することで、特定のモデルだけを実行したり、除外したりする設定が簡単に行えます。これにより、必要なデータだけを効率的に処理することが可能になります。モデルの選択や除外が柔軟に行えるため、運用の自由度が高まります。

まとめ

AirflowとCosmosを導入することで、dbtを活用したデータ基盤の運用が格段に効率化されます。本記事の要点を振り返ると、以下のようなメリットがあります:

  • 環境の柔軟な切り替え: 開発環境と本番環境を簡単に切り替えることができ、異なる環境でのテストやデプロイがスムーズに行えます。
  • モデル選択や除外が容易: 特定のモデルだけを実行したり、除外したりする設定が簡単に行え、必要なデータだけを効率的に処理することが可能です。
  • ワークフロー全体の効率的な自動化: データパイプラインの自動化が容易になり、手動での作業が減ります。また、定期的なデータ更新やバッチ処理が自動化されるため、常に最新のデータを保つことができます。

これらのメリットを活かして、より効率的で柔軟なデータ基盤の運用を実現しましょう。

Rehabではエンジニアの採用募集してます!

CDPのデータ基盤はできて間もないため、新しい技術やあなたの意見が取り入れやすい環境です。単なるデータ基盤ではなく、利用者様や介護領域への業界を変革する一つのリソースとなりうる基盤なため業界の変化を楽しみながら基盤構築に携わることが可能です。データ基盤の開発を通して社会課題に向き合いたい方歓迎です!

Rehab Tech Blog

Discussion