🐙

🐙🐙🐙オーケストレーションツールDagster入門してみた(Airflow編)🐙🐙🐙

2022/12/13に公開

気にはなってるけど触ってないビッグデータ系のツール・サービスを触る Advent Calendar 2022の#13です。
🐙🐙🐙オーケストレーションツールDagster入門してみた(概念編)🐙🐙🐙の続きで、Airflow連携を触ってみました。

tl;dr

  • DagsterでAirflowのDAGを実行できる(こともある)よ
  • DagsterでAirflowのOperatorを実行できる(こともある)よ
  • DagterのJobからAirflowのDAGを作ることもできるよ

Dagsterとは

公式サイト曰く

Dagster is an orchestrator that's designed for developing and maintaining data assets, such as tables, data sets, machine learning models, and reports.

です。詳しくは、🐙🐙🐙オーケストレーションツールDagster入門してみた(概念編)🐙🐙🐙や公式サイトを参照してください。

Apache Airflowとは

公式サイト曰く

Airflow is a platform created by the community to programmatically author, schedule and monitor workflows.

です。詳しくは公式サイトや、Data Pipelines with Apache Airflowなどを参照してください。

DagsterとAirflowの連携

DagsterとApache Airflowは同じ分野(ワークフローツールやオーケストレーションツール)の製品で、競合にあたります。
Dagster側は比較記事を出していたりします)

DagsterとAirflowを組み合わせるための機能がいくつか(Dagster側に)あります。このAirflow連携の機能により、AirflowからDagsterの移行が容易になりそうです。また、移行ではなく新規にDagsterで構築する場合も、Airflowエコシステムの資産を流用できるのは色々便利そうです。

Airflow連携機能としては、具体的には、

  • AirflowのDAGをDagsterのJobに変換する
  • AirflowのOperatorDagsterのopにする
  • AirflowからDagsterのJobを起動する
  • DagsterのJobをAirflowのDAGに変換する

の四種類があるようです。
(組み合わせ的には、「DagsterからAirflowのDAGを起動する」、「DagsterのOpをAirflowのOperatorにする」も考えられそうですが、これはなさそう)

この記事では、このうち

の三つを試してみました(あと一つの「DagsterのJobをAirflowのDAGに変換する」は力尽きました)。

Using Dagster with Airflow Example

概要

https://github.com/dagster-io/dagster/tree/1.1.6/examples/with_airflow

このリポジトリのexamples/with_airflowディレクトリでは、DagsterとAirflowの連携の例がいくつか提供されています。具体的には、

の例が含まれています。それに加え、AirflowのDAGをDagsterのJobに変換する例として、dagster-airflow-migration-exampleからのDAGを取り込みも含まれています。こちらのリポジトリには、

など、Airflowの色々な機能を使ったDAGが含まれています。DagsterでAirflowのDAGを使う場合に、Airlfowの機能を使える事(or使えない事)の確認に使えそうです。

なお、dbt連携のexampleではexampleの使い方や意味を紹介した、チュートリアルが公開されていますが、Airflow連携のexampleでは特にチュートリアルはなさそうです。

準備

exampleを動かしてみます。Ubuntu 20.04 (Windows10のWSL2上)で試しました。

dagsterコマンドをインストールします。

pip install dagster==1.1.5

exampleのコードを取得し、依存パッケージをインストールします

dagster project from-example --name tutorial_airflow --example with_airflow
cd tutorial_airflow
pip install -e ".[dev]"

DagsterのWebUI(dagit)を起動します

dagit

私の環境では、いくつかのDAG(airflow_taskflow_dag等)に関連してエラーメッセージが出ましたが、dagitの起動自体はできました。

AirflowのDAGをDagsterのJobに変換する例を試してみる

いくつか例がありますが、まずは一番簡単そうなDAGを試しみます。

このPythonコードは

  • 四つのBashOperator
  • それと依存関係にある、一つのDummyOperator

からなる完全なDAGで、Airflowで実際に動作させることも出来ます。
Airflow提供のdocker-compose・Airflow2.4.3で確認)

このDAGをDagsterで動かしてみます。先ほどの準備のコマンドの結果

が含まれていますので、左上のハンバーガメニューからairflow_simple_dagを選ぶとJobを表示することが出来ます。同じDAGなので当たり前ですが、AirflowのGraph Viewと同じ依存関係になっていますね。

Launchpadタブを選び、「Lauch Run」を押すとこのDAGをDagsterで実行することが出来ます。

このDAGのTaskでは、AirflowのJinja2マクロ(exuecution_dateやtask_instance_key_str)をechoコマンドで出力しています。画面下の「Raw Compute Log」を選ぶとTask(に対応するOp)が、日時やTaskの名前を出力していることを確認できます。

AirflowのOperatorをDagsterのopにする

一度dagitを停止し。airflow_operator_to_op.pyを指定して起動します(repository.pyに含まれていないため、単にdagitでは処理対象に含まれない)。

dagit -f airflow_operator_to_op.py

SimpleHttpOperatorに対応するOpを一つだけあるJobが登録されているはずです。

Launchpadから実行します。StatusがSucessになり、HTTPリクエストしているログが記録されています。

exampleそのままではリクエスト結果が(多分)確認できないので、airflow_operator_to_opreturn_outputパラメータを加えて再度実行してみます。

dagster_op = airflow_operator_to_op(http_task, connections=connections, return_output=True)

今度の実行では、「Handled output "result" using IO manager "io_manager"」のログが、保存先のディレクトリと共に記録されるようになります。ログで記載されているファイルを開くと、HTTPリクエストの結果を確認できます。
(先頭に何かバイナリが入っている原因は不明)

head -n1  /home/notrogue/project/dagster/tutorial_airflow/with_airflow/tmpxn3syf1r/storage/dd508c7b-c69a-49f1-a06b-a37409410403/http_task/result
���9X�9<!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="ja"><head><meta content="Google &#30011;&#20687;&#26908;&#32034; &#12454;&#12455;&#12502;&#19978;&#12398;&#30011;&#20687;&#12434;&#32294;&#27178;&#28961;&#23613;&#12395;&#26908;&#32034;" name="description"><meta content="text/html; charset=UTF-8" http-equiv="Content-Type"><meta content="/images/branding/googleg/1x/googleg_standard_color_128dp.png" itemprop="image"><title>Google &#30011;&#20687;&#26908;&#32034;</title><script nonce="d0EAv5ItbNyQxiat4pelKg">(function(){window.google={kEI:'P1KYY87hBMn_hwOp4YDwCw',kEXPI:'0,1359409,6058,207,4804,2316,383,246,5,1129120,1197715,380775,16112,28687,22430,1362,12318,17581,4998,13226,3849,10623,22740,6675,1278,2742,149,1103,840,2197,4100,3514,606,2023,2297,14670,3227,2845,7,33218,553,4464,3783,9359,3,346,230,6459,149,13975,4,1528,2304,34770,7357,13658,4437,16786,5821,2536,4094,17,4035,3,3541,2,39041,2,3110,2,14022,6248,1398,18093,5679,1020,2378,28744,4568,6259,23416,1254,5835,14967,4333,7484,445,2,2,1,23825,10962,7381,15969,874,19633,7,1922,5784,16410,9364,9543,4832,26077,427,106,5690,7,3077,2932,8324,14,82,3890,751,13384,1502,679,1342,280,781,997,3852,1125,1747,2039,4265,70,3,785,81,246,454,1505,1284,2412,722,1020,813,1514,2346,84,852,565,568,988,970,418,706,441,399,96,42,385,1033,42,291,2669,1167,481,402,1029,3,261,524,2,216,96,2318,135,515,447,342,246,950,1,1442,483,294,92,4,6,845,857,301,18,127,2,537,2304,383,71,3,442,97,598,551,918,339,95,64,700,579,1086,61,917,73,201,5,342,461,716,130,88,20,139,323,303,130,108,115,10,78,355,10,3,3,144,35,4,327,769,276,236,254,120,82,255,3,501,42,1,529,42,12,2065,1047,571,8,218,360,195,239,18,114,4,99,36,135,29,297,12,105,138,789,3,710,1,650,502,38,358,250,1337,211,56,398,118,261,1254,956,525,838,3,254,83,2,114,251,7,88,45,44,89,1686,5273225,6273,8798849,3311,141,795,19735,1,303,44,2759,41,403,2,1,2,151,6,1,5,1,5,1,3,2,1,3,2,2,1,5,2,1,2,2,2,3,155,2,4,26,6,2,23946881,512,18,4041612,1964,16672,3406,5595,11,3835,4590,311,950,1429',kBL:'oABN'};google.sn='imghp';google.kHL='ja';})();(function(){

AirflowからDagsterのJobを起動する

おそらくexampleに含まれていないので、自前で頑張ります。以下の流れで、AirflowからDagsterのJobを起動します。

  1. dagster-airflowをインストールした、Airflowクラスターの準備
  2. Airflow Connectionを設定
  3. DagsterOperatorを呼び出すDAGを作成
  4. Dagster(dagit)を起動
  5. (3)で作成したDAGをAirflowからTrigger

起動対象のJobは何でも良いですが、ここでは先ほど使ったairflow_operator_to_op.pyを起動してみます。

Airflowクラスターの準備

Airlfowが提供しているDockerイメージとdocker-composeを使います(dagster-airflowがインストール出来れば環境は選ばないと思います)。

デフォルトのコンテナイメージには、必要なパッケージが抜けていますので、追加したDockerfileを作ります。

FROM apache/airflow:2.4.3
RUN pip install apache-airflow dagster-airflow apache-airflow-providers-docker

docker-compose.yamlを変更し、作成したコンテナイメージを使うようにします。

version: '3'
x-airflow-common:
  &airflow-common
  # In order to add custom dependencies or upgrade provider packages you can use your extended image.
  # Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml
  # and uncomment the "build" line below, Then run `docker-compose build` to build the images.
  build: .
(以下略)

コンテナビルドして、Airflowクラスタを起動します。

docker-compose build
docker-compose up

Airflow Connection

AdminタブのConnectionから、新しいConnectionを設定します。

  • Dagster URLに設定しているIP Addressは、AirflowのコンテナからDagster(Dagit)にアクセスするためのIPアドレスです。環境に合わせて適当に変えてください
  • 空白にしている部分はDagster Cloudにリクエストする時に必要な情報です(多分)。self-hostingの場合は無くても動きました

DagsterOperatorを呼び出すDAGを作成

from airflow import models
from dagster_airflow import DagsterOperator
from airflow.utils.dates import days_ago

args = {
    "start_date": days_ago(2),
}

simple_dag = models.DAG(dag_id="dagster", default_args=args, schedule_interval=None)

DagsterOperator(
    task_id="sink_task",
    dag=simple_dag,
    repository_name='__repository__my_http_job',
    repostitory_location_name='airflow_operator_to_op.py',
    job_name='my_http_job',
    user_token="aaa",
)
  • repository_name、repostitory_location_nameはDagster(Dagit)のDeployment->Definitionsで確認した値です(下図)
  • user_tokenはDagster Cloudを呼ぶ場合のみ必要ですが、設定しないとエラーが発生したため適当な値を入れています

DagsterOperatorの詳細はコード読んでください。

Dagster(dagit)を起動

Dagster(dagit)からairflow_operator_to_opを起動する時と同じ手順ですが、同じホストで起動しない場合はhオプションを設定する点は注意が必要です。

dagit -f airflow_operator_to_op.py  -h 0.0.0.0

起動前はRunが空です。

(3)で作成したDAGをAirflowからTrigger

AirflowのWebUI(http://localhost:8080)からDAGのpauseを解除し、DAGをTriggerします。

うまくいくと、Dagster(dagit)でRunが追加されているはずです。

Discussion