📝

Airflowでdbt Coreを動かした話

2022/12/21に公開

この記事はdbt Advent Calendar 2022の12月21日の記事です。

概要

私の所属しているチームではAirflowでジョブを動かしています。新たにdbtのジョブをAirflow実行する際にチームの前提事項をもとにどのように動かしたらいいのか検討した結果を記載します。

前提

サマリ

  • Airflowはオンプレミスで実行しており、柔軟なリソース増減はできない
  • チームは複数のプロジェクトで構成されており、Airflowを共用
  • チームでAirflowの基本機能は提供するが、実際のコンピューティングリソースはプロジェクトが用意する
  • dbt Cloudは使えない(もし、使えればdbt Cloud Operatorが存在するため、以下の議論すべて不要になったかも)

Airflow

私の所属するチームではAirflowのSaaSサービスを使用できないため、オンプレミスにAirflowを構築しています。

このAirflowを複数のプロジェクトで共有しているため、あるプロジェクトが高負荷の処理を行った場合に他プロジェクトに影響がないようにする必要がありました。
dbt Cloudが使えれば実際の処理はすべてdbt Cloudでやってくれるため、特段の考慮は不要でしたが、dbt Coreを使う前提なのでワーカーリソースは各プロジェクトで準備してもらうことにしました。
その方法として、DockerOperatorを使うことにし、各プロジェクトが準備したサーバーにてDockerイメージを実行してもらうことにしました。
今回はオンプレなのでDockerOperator一択でしたが、クラウド上であれば例えばAWSならばBatchOperatorECSOperatorも選択肢に入っていたと思います。

雑になりますが以下がイメージ図です。

検討事項

ベースのDockerイメージの作成

DockerOperatorで実行するDockerイメージはチームで統一したいため、共通のDockerイメージを探しました。
Docker Hubに公式のイメージがありましたが、DEPRECATEDとなっており、ghcr.ioに最新版があるとのことでした。
ここからダウンロードしてもよいのですが、ソースコード上にDockerfileも公開されていたため、このDockerfileをチームの環境に合わせて編集したものを使用しました。
ビルド時はBUILD KITを使っているのでBUILD KITを指定しないとエラーになりました。

DOCKER_BUILDKIT=1 docker build -t 198.168.100.50/my_project/dbt_base:v1.2.3 --target dbt-postgres .

実行用Dockerイメージの作成

公式のDockerfileをみるとWORK_DIRが/usr/app/dbt/に設定されているため、WORK_DIR以下に保存するように記述。my_project以下に実際のdbtのコード、後述に記載したprofiles.ymlをベースイメージに追加しています。

FROM 198.168.100.50/my_project/dbt_base:v1.2.3
ARG APP_PASS="/usr/app/dbt/"

ADD ./my_project $APP_PASS
COPY ./profiles.yml $APP_PASS

接続情報の受け渡し

接続情報はソースコードには記載せず、AirflowのConnectionsに記載し、DockerOperator実行時にVariableから情報を取得し環境変数に設定します。

profiles.ymlの設定

前述したprofiles.ymlのprodにはユーザ名とパスワードは環境変数を参照するようにしました。ローカルの開発ではprofiles.ymlは使用せずデフォルトのprofiles.ymlを使って開発者が自由に設定してもらっています。

config:
  send_anonymous_usage_stats: false
my_project:
  outputs:
    prod:
      type: postgres
      threads: 1
      host: 192.168.100.150
      port: 5432
      user: "{{ env_var('MY_PROJECT_USERNAME') }}"
      password: "{{ env_var('MY_PROJECT_PASSWORD') }}"
      dbname: my_app
      schema: my_function

DockerOperatorの設定

以下のことを注意して記載しました。

  • AirflowのConnectionsを取得しDockerイメージ実行時に環境変数に設定
  • targetをprodに設定
  • profileをDockerイメージに入れたものファイルを参照するように変更
from airflow.hooks.base_hook import BaseHook
conn = BaseHook.get_connection('MY_PROJECT_CONNECTION') 

DockerOperator(
        task_id="execute_dbt_docker",
        container_name=f"dbt_test",
        image="198.168.100.50/my_project/my_app:v1.0",
        force_pull=True,
        auto_remove=True,
        environment=dict(
            MY_PROJECT_USERNAME=conn.login,
            MY_PROJECT__PASSWORD=conn.password
        ),
        command=" run --profiles-dir=/usr/app/dbt/ --target=prod ",
        docker_url=f"tcp://198.168.100.100:2375",
        cpus=1,
        mem_limit="1g",
        mount_tmp_dir=False

課題

DockerOperatorで指定したcommandの実行中にエラーが発生した場合はon_failure_callbackでコールバックを受け取り、Slack等への通知ができるのですが、docker_urlで指定したサーバとの通信が失敗したり、imageで指定したコンテナリポジトリからのイメージ取得に失敗した場合はon_failure_callbackでコールバックを拾えないため、どのような形でエラーを検知するのかが課題となっています。

追記

この記事の作成と並行してAirflowコミュニティに質問をしていたところAirflow CommitteeのJarek Potiukさんより以下の回答をいただきました。
https://stackoverflow.com/questions/74609215/how-airflow-dockeroperator-send-notification-when-docker-image-creation-error/74861059#74861059

CustomOperatorを作成してエラーハンドリングを行えとのことで、頑張って実装してみたいと思います。

あとは少し汚くなりそうですが、DAGの最後にtrigger ruleを入れてエラー処理を行うのもありかとは思っています。

Discussion