🐡

Docker上のマルチコンテナでDagsterとdbtを組み合わせて動かす

2023/08/28に公開

Docker上のマルチコンテナでDagsterとdbtを組み合わせて動かす

はじめに

この記事の目的と対象読者

この記事は、DagsterとdbtをDocker上で動かす方法を示すことを目的としています。
dockerのマルチコンテナにデプロイすることで、複数のdbtプロジェクトを依存させることなくDagsterに配置することができます。
dbtやDagsterに関する説明はしませんのでご了承ください。

公式ドキュメントとこの記事の違い

公式ドキュメントでは、Dagsterとdbtの統合や、Dagsterをdockerにデプロイする方法については説明されていますが、3つの組み合わせについてはサンプルが無いです。この記事ではdocker上でdbtとDagsterを動かす実装したのでその際の注意点などを記載します。

前提条件と必要な環境

環境
macOS 13.5.1
Docker Desktop 4.19.0
dbt 1.6.1
Dagster 1.4.10

プロジェクト構成

.
├── Dockerfile_dagster
├── Dockerfile_user_code
├── README.md
├── dagster.yaml
├── docker-compose.yml
├── jaffle_shop
│   ├── LICENSE
│   ├── README.md
│   ├── dbt_project.yml
│   ├── models
│   ├── profiles.yml
│   └── seeds
├── repo.py
└── workspace.yaml

DockerにDagsterをデプロイする際の考慮点

ベースは公式のMulti-container Docker deployment の通りです。

公式サンプル

dbtを動かすために以下のライブラリを追加しています。

  • dagster_dbt
  • dbt (今回はdbt-postgresを使用)
Dockerfile_user_code
FROM python:3.10-slim

RUN pip install \
    dagster \
    dagster-postgres \
-    dagster-docker
+    dagster-docker \
+    dagster_dbt \
+    dbt-postgres

# Add repository code
WORKDIR /opt/dagster/app

- COPY repo.py /opt/dagster/app
+ # COPY repo.py /opt/dagster/app

# Run dagster gRPC server on port 4000
EXPOSE 4000

# CMD allows this to be overridden from run launchers or executors that want
# to run other commands against your repository
CMD ["dagster", "api", "grpc", "-h", "0.0.0.0", "-p", "4000", "-f", "repo.py"]

また、エントリーポイントとなるrepo.pyは開発時には頻繁に変更することが予想されるので、COPYではなくマウントするようにします

Dockerfile_dagsterは変更無しです。

docker-compose.ymlの変更点は下記です。

  • dbtが操作するDWHサンプルとしてpostgresコンテナを追加
  • user_codeコンテナ
    • dagster_dbtがdbt projectをロードの度にパースするようにDAGSTER_DBT_PARSE_PROJECT_ON_LOADの環境変数を設定
    • エントリーポイントのrepo.pyとdbt projectのdirをマウント
docker-compose.yml
version: "3.7"
services:
  # dbtが操作するデータウェアハウス
+  postgres_dwh:
+    image: postgres:15
+    container_name: postgres_dwh
+    hostname: postgres_dwh
+    ports:
+      - 5432:5432
+    volumes:
+      - dwh:/var/lib/postgresql/data
+    environment:
+      - POSTGRES_USER=postgres
+      - POSTGRES_DB=dwh
+      - POSTGRES_PASSWORD=${POSTGRES_PASSWORD}
+    networks:
+      - docker_example_network

  # 以下dagsterのコンテナ
  docker_example_postgresql:
    image: postgres:15
    container_name: docker_example_postgresql
    environment:
      POSTGRES_USER: "postgres_user"
      POSTGRES_PASSWORD: "postgres_password"
      POSTGRES_DB: "postgres_db"
    networks:
      - docker_example_network
    volumes:
      - postgres_volume:/var/lib/postgresql/data

  docker_example_user_code:
    build:
      context: .
      dockerfile: ./Dockerfile_user_code
    container_name: docker_example_user_code
    image: docker_example_user_code_image
    restart: always
    environment:
      DAGSTER_POSTGRES_USER: "postgres_user"
      DAGSTER_POSTGRES_PASSWORD: "postgres_password"
      DAGSTER_POSTGRES_DB: "postgres_db"
      DAGSTER_CURRENT_IMAGE: "docker_example_user_code_image"
      DAGSTER_DBT_PARSE_PROJECT_ON_LOAD: 1
    networks:
      - docker_example_network
+    volumes:
+      - ./repo.py:/opt/dagster/app/repo.py
+      - ./jaffle_shop:/opt/dagster/app/jaffle_shop

  docker_example_webserver:
    build:
      context: .
      dockerfile: ./Dockerfile_dagster
    entrypoint:
      - dagster-webserver
      - -h
      - "0.0.0.0"
      - -p
      - "3000"
      - -w
      - workspace.yaml
    container_name: docker_example_webserver
    expose:
      - "3000"
    ports:
      - "3000:3000"
    environment:
      DAGSTER_POSTGRES_USER: "postgres_user"
      DAGSTER_POSTGRES_PASSWORD: "postgres_password"
      DAGSTER_POSTGRES_DB: "postgres_db"
    volumes: # Make docker client accessible so we can terminate containers from the webserver
      - /var/run/docker.sock:/var/run/docker.sock
      - /tmp/io_manager_storage:/tmp/io_manager_storage
    networks:
      - docker_example_network
    depends_on:
      - docker_example_postgresql
      - docker_example_user_code

  docker_example_daemon:
    build:
      context: .
      dockerfile: ./Dockerfile_dagster
    entrypoint:
      - dagster-daemon
      - run
    container_name: docker_example_daemon
    restart: on-failure
    environment:
      DAGSTER_POSTGRES_USER: "postgres_user"
      DAGSTER_POSTGRES_PASSWORD: "postgres_password"
      DAGSTER_POSTGRES_DB: "postgres_db"
    volumes: # Make docker client accessible so we can launch containers using host docker
      - /var/run/docker.sock:/var/run/docker.sock
      - /tmp/io_manager_storage:/tmp/io_manager_storage
    networks:
      - docker_example_network
    depends_on:
      - docker_example_postgresql
      - docker_example_user_code

networks:
  docker_example_network:
    driver: bridge
    name: docker_example_network

volumes:
  dwh:
  postgres_volume:
    name: dagster_postgres_volume

dagster.ymlにもマウントの情報を記載します。
ここが記載されていないとrunした時に No such file or directory で怒られてしまいます。
記述はなぜか絶対パスしか受け付けないので注意しましょう。

dagster.yml
scheduler:
  module: dagster.core.scheduler
  class: DagsterDaemonScheduler

run_coordinator:
  module: dagster.core.run_coordinator
  class: QueuedRunCoordinator

run_launcher:
  module: dagster_docker
  class: DockerRunLauncher
  config:
    env_vars:
      - DAGSTER_POSTGRES_USER
      - DAGSTER_POSTGRES_PASSWORD
      - DAGSTER_POSTGRES_DB
    network: docker_example_network
    container_kwargs:
      volumes: # Make docker client accessible to any launched containers as well
        - /var/run/docker.sock:/var/run/docker.sock
        - /tmp/io_manager_storage:/tmp/io_manager_storage
+        - /absolute_path/dagster_docker_dbt_sample/repo.py:/opt/dagster/app/repo.py
+        - /absolute_path/dagster_docker_dbt_sample/jaffle_shop:/opt/dagster/app/jaffle_shop

# 略

dagster-dbtの実装

dbtのプロジェクトはサンプルとして、jaffle_shopを使用しました

repo.pyの中身を変更し、dbtに統合させていきます。

ほぼ公式のチュートリアルの通りです。

repo.py
import os
from pathlib import Path

from dagster import Definitions, OpExecutionContext, ScheduleDefinition
from dagster_dbt import DbtCliResource, build_schedule_from_dbt_selection, dbt_assets

dbt_project_dir = Path(__file__).joinpath("..", "jaffle_shop").resolve()

dbt = DbtCliResource(
    project_dir=os.fspath(dbt_project_dir), profiles_dir=os.fspath(dbt_project_dir)
)

# If DAGSTER_DBT_PARSE_PROJECT_ON_LOAD is set, a manifest will be created at runtime.
# Otherwise, we expect a manifest to be present in the project's target directory.
if os.getenv("DAGSTER_DBT_PARSE_PROJECT_ON_LOAD"):
    dbt_parse_invocation = dbt.cli(["parse"], manifest={}).wait()
    dbt_manifest_path = dbt_parse_invocation.target_path.joinpath("manifest.json")
else:
    dbt_manifest_path = dbt_project_dir.joinpath("target", "manifest.json")


@dbt_assets(manifest=dbt_manifest_path)
def jaffle_shop_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource):
    yield from dbt.cli(["build"], context=context).stream()


daily_dbt_assets_schedule = build_schedule_from_dbt_selection(
    [jaffle_shop_dbt_assets],
    job_name="all_assets_daily_job",
    cron_schedule="0 0 * * *",
    dbt_select="fqn:*",
)

defs = Definitions(
    assets=[jaffle_shop_dbt_assets],
    schedules=[daily_dbt_assets_schedule],
    resources={
        "dbt": DbtCliResource(project_dir=os.fspath(dbt_project_dir)),
    },
)

そのままだと味気ないので、build_schedule_from_dbt_selectionでスケジュールも定義しました。

立ち上げと動作確認

docker compose build
docker compose up -d

localhost:3000 にアクセスし、dbtの各modelがdagsterのassetsになっていることと
runできることを確認。

dbt assets

実行もできている。
※ executerとしてDockerRunLauncher を指定しているので、実際に実行するとuser_codeとは別の(同一のimage)コンテナが立ち上がりその中でdbtが実行されます。

job run

まとめと次のステップ

この記事ではDocker上にマルチコンテナとしてDagsterとdbtをデプロイする方法についてまとめました
Dagsterとdbtの組み合わせは、データエンジニアリングとデータ分析の連携を非常に効率的にする強力な手段です。Dockerを使用することで、環境依存の問題を排除し、より簡単にデプロイとスケーリングが可能です。

今回は簡単な例を取り上げましたが、これを基に以下のような拡張が考えられます。

  • より高度なDagsterの機能(例:エラーハンドリング、監視、通知)を利用する
  • 複数のdbtプロジェクトやその他のデータソースとの統合
    以上のような拡張を行うことで、さらに堅牢で効率的なデータパイプラインを構築することが可能です。

記事中では1つのdbtプロジェクトをデプロイしていますが、user_codeコンテナを増やすことで複数のdbtプロジェクトをDagsterの異なるlocationとして配置することも可能です。
※その場合はworkspace.ymlに2つ目以降のgrpc_server(ホストとポート)を記載します。workspace.ymldagster.ymlもマウントさせた方が取り回しがしやすいかもしれません。

Nextはk8sへのデプロイと同時にdbtを統合する方法を調べていこうと思っています。

何か質問やフィードバックがあれば、気軽にコメントや問い合わせを頂ければと思います。


参考

https://docs.dagster.io/deployment/guides/docker#multi-container-docker-deployment

https://docs.dagster.io/integrations/dbt/using-dbt-with-dagster/load-dbt-models

Discussion