Docker上のマルチコンテナでDagsterとdbtを組み合わせて動かす
Docker上のマルチコンテナでDagsterとdbtを組み合わせて動かす
はじめに
この記事の目的と対象読者
この記事は、DagsterとdbtをDocker上で動かす方法を示すことを目的としています。
dockerのマルチコンテナにデプロイすることで、複数のdbtプロジェクトを依存させることなくDagsterに配置することができます。
dbtやDagsterに関する説明はしませんのでご了承ください。
公式ドキュメントとこの記事の違い
公式ドキュメントでは、Dagsterとdbtの統合や、Dagsterをdockerにデプロイする方法については説明されていますが、3つの組み合わせについてはサンプルが無いです。この記事ではdocker上でdbtとDagsterを動かす実装したのでその際の注意点などを記載します。
前提条件と必要な環境
環境
macOS 13.5.1
Docker Desktop 4.19.0
dbt 1.6.1
Dagster 1.4.10
プロジェクト構成
.
├── Dockerfile_dagster
├── Dockerfile_user_code
├── README.md
├── dagster.yaml
├── docker-compose.yml
├── jaffle_shop
│ ├── LICENSE
│ ├── README.md
│ ├── dbt_project.yml
│ ├── models
│ ├── profiles.yml
│ └── seeds
├── repo.py
└── workspace.yaml
DockerにDagsterをデプロイする際の考慮点
ベースは公式のMulti-container Docker deployment の通りです。
dbtを動かすために以下のライブラリを追加しています。
dagster_dbt
-
dbt
(今回はdbt-postgres
を使用)
FROM python:3.10-slim
RUN pip install \
dagster \
dagster-postgres \
- dagster-docker
+ dagster-docker \
+ dagster_dbt \
+ dbt-postgres
# Add repository code
WORKDIR /opt/dagster/app
- COPY repo.py /opt/dagster/app
+ # COPY repo.py /opt/dagster/app
# Run dagster gRPC server on port 4000
EXPOSE 4000
# CMD allows this to be overridden from run launchers or executors that want
# to run other commands against your repository
CMD ["dagster", "api", "grpc", "-h", "0.0.0.0", "-p", "4000", "-f", "repo.py"]
また、エントリーポイントとなるrepo.py
は開発時には頻繁に変更することが予想されるので、COPYではなくマウントするようにします
Dockerfile_dagster
は変更無しです。
docker-compose.yml
の変更点は下記です。
- dbtが操作するDWHサンプルとしてpostgresコンテナを追加
- user_codeコンテナ
- dagster_dbtがdbt projectをロードの度にパースするように
DAGSTER_DBT_PARSE_PROJECT_ON_LOAD
の環境変数を設定 - エントリーポイントの
repo.py
とdbt projectのdirをマウント
- dagster_dbtがdbt projectをロードの度にパースするように
version: "3.7"
services:
# dbtが操作するデータウェアハウス
+ postgres_dwh:
+ image: postgres:15
+ container_name: postgres_dwh
+ hostname: postgres_dwh
+ ports:
+ - 5432:5432
+ volumes:
+ - dwh:/var/lib/postgresql/data
+ environment:
+ - POSTGRES_USER=postgres
+ - POSTGRES_DB=dwh
+ - POSTGRES_PASSWORD=${POSTGRES_PASSWORD}
+ networks:
+ - docker_example_network
# 以下dagsterのコンテナ
docker_example_postgresql:
image: postgres:15
container_name: docker_example_postgresql
environment:
POSTGRES_USER: "postgres_user"
POSTGRES_PASSWORD: "postgres_password"
POSTGRES_DB: "postgres_db"
networks:
- docker_example_network
volumes:
- postgres_volume:/var/lib/postgresql/data
docker_example_user_code:
build:
context: .
dockerfile: ./Dockerfile_user_code
container_name: docker_example_user_code
image: docker_example_user_code_image
restart: always
environment:
DAGSTER_POSTGRES_USER: "postgres_user"
DAGSTER_POSTGRES_PASSWORD: "postgres_password"
DAGSTER_POSTGRES_DB: "postgres_db"
DAGSTER_CURRENT_IMAGE: "docker_example_user_code_image"
DAGSTER_DBT_PARSE_PROJECT_ON_LOAD: 1
networks:
- docker_example_network
+ volumes:
+ - ./repo.py:/opt/dagster/app/repo.py
+ - ./jaffle_shop:/opt/dagster/app/jaffle_shop
docker_example_webserver:
build:
context: .
dockerfile: ./Dockerfile_dagster
entrypoint:
- dagster-webserver
- -h
- "0.0.0.0"
- -p
- "3000"
- -w
- workspace.yaml
container_name: docker_example_webserver
expose:
- "3000"
ports:
- "3000:3000"
environment:
DAGSTER_POSTGRES_USER: "postgres_user"
DAGSTER_POSTGRES_PASSWORD: "postgres_password"
DAGSTER_POSTGRES_DB: "postgres_db"
volumes: # Make docker client accessible so we can terminate containers from the webserver
- /var/run/docker.sock:/var/run/docker.sock
- /tmp/io_manager_storage:/tmp/io_manager_storage
networks:
- docker_example_network
depends_on:
- docker_example_postgresql
- docker_example_user_code
docker_example_daemon:
build:
context: .
dockerfile: ./Dockerfile_dagster
entrypoint:
- dagster-daemon
- run
container_name: docker_example_daemon
restart: on-failure
environment:
DAGSTER_POSTGRES_USER: "postgres_user"
DAGSTER_POSTGRES_PASSWORD: "postgres_password"
DAGSTER_POSTGRES_DB: "postgres_db"
volumes: # Make docker client accessible so we can launch containers using host docker
- /var/run/docker.sock:/var/run/docker.sock
- /tmp/io_manager_storage:/tmp/io_manager_storage
networks:
- docker_example_network
depends_on:
- docker_example_postgresql
- docker_example_user_code
networks:
docker_example_network:
driver: bridge
name: docker_example_network
volumes:
dwh:
postgres_volume:
name: dagster_postgres_volume
dagster.yml
にもマウントの情報を記載します。
ここが記載されていないとrunした時に No such file or directory
で怒られてしまいます。
記述はなぜか絶対パスしか受け付けないので注意しましょう。
scheduler:
module: dagster.core.scheduler
class: DagsterDaemonScheduler
run_coordinator:
module: dagster.core.run_coordinator
class: QueuedRunCoordinator
run_launcher:
module: dagster_docker
class: DockerRunLauncher
config:
env_vars:
- DAGSTER_POSTGRES_USER
- DAGSTER_POSTGRES_PASSWORD
- DAGSTER_POSTGRES_DB
network: docker_example_network
container_kwargs:
volumes: # Make docker client accessible to any launched containers as well
- /var/run/docker.sock:/var/run/docker.sock
- /tmp/io_manager_storage:/tmp/io_manager_storage
+ - /absolute_path/dagster_docker_dbt_sample/repo.py:/opt/dagster/app/repo.py
+ - /absolute_path/dagster_docker_dbt_sample/jaffle_shop:/opt/dagster/app/jaffle_shop
# 略
dagster-dbtの実装
dbtのプロジェクトはサンプルとして、jaffle_shopを使用しました
repo.py
の中身を変更し、dbtに統合させていきます。
ほぼ公式のチュートリアルの通りです。
import os
from pathlib import Path
from dagster import Definitions, OpExecutionContext, ScheduleDefinition
from dagster_dbt import DbtCliResource, build_schedule_from_dbt_selection, dbt_assets
dbt_project_dir = Path(__file__).joinpath("..", "jaffle_shop").resolve()
dbt = DbtCliResource(
project_dir=os.fspath(dbt_project_dir), profiles_dir=os.fspath(dbt_project_dir)
)
# If DAGSTER_DBT_PARSE_PROJECT_ON_LOAD is set, a manifest will be created at runtime.
# Otherwise, we expect a manifest to be present in the project's target directory.
if os.getenv("DAGSTER_DBT_PARSE_PROJECT_ON_LOAD"):
dbt_parse_invocation = dbt.cli(["parse"], manifest={}).wait()
dbt_manifest_path = dbt_parse_invocation.target_path.joinpath("manifest.json")
else:
dbt_manifest_path = dbt_project_dir.joinpath("target", "manifest.json")
@dbt_assets(manifest=dbt_manifest_path)
def jaffle_shop_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
daily_dbt_assets_schedule = build_schedule_from_dbt_selection(
[jaffle_shop_dbt_assets],
job_name="all_assets_daily_job",
cron_schedule="0 0 * * *",
dbt_select="fqn:*",
)
defs = Definitions(
assets=[jaffle_shop_dbt_assets],
schedules=[daily_dbt_assets_schedule],
resources={
"dbt": DbtCliResource(project_dir=os.fspath(dbt_project_dir)),
},
)
そのままだと味気ないので、build_schedule_from_dbt_selection
でスケジュールも定義しました。
立ち上げと動作確認
docker compose build
docker compose up -d
localhost:3000
にアクセスし、dbtの各modelがdagsterのassetsになっていることと
runできることを確認。
実行もできている。
※ executerとしてDockerRunLauncher を指定しているので、実際に実行するとuser_codeとは別の(同一のimage)コンテナが立ち上がりその中でdbtが実行されます。
まとめと次のステップ
この記事ではDocker上にマルチコンテナとしてDagsterとdbtをデプロイする方法についてまとめました
Dagsterとdbtの組み合わせは、データエンジニアリングとデータ分析の連携を非常に効率的にする強力な手段です。Dockerを使用することで、環境依存の問題を排除し、より簡単にデプロイとスケーリングが可能です。
今回は簡単な例を取り上げましたが、これを基に以下のような拡張が考えられます。
- より高度なDagsterの機能(例:エラーハンドリング、監視、通知)を利用する
- 複数のdbtプロジェクトやその他のデータソースとの統合
以上のような拡張を行うことで、さらに堅牢で効率的なデータパイプラインを構築することが可能です。
記事中では1つのdbtプロジェクトをデプロイしていますが、user_codeコンテナを増やすことで複数のdbtプロジェクトをDagsterの異なるlocationとして配置することも可能です。
※その場合はworkspace.yml
に2つ目以降のgrpc_server(ホストとポート)を記載します。workspace.yml
とdagster.yml
もマウントさせた方が取り回しがしやすいかもしれません。
Nextはk8sへのデプロイと同時にdbtを統合する方法を調べていこうと思っています。
何か質問やフィードバックがあれば、気軽にコメントや問い合わせを頂ければと思います。
参考
Discussion