🐙🐙🐙オーケストレーションツールDagster入門してみた(Airflow編)🐙🐙🐙
気にはなってるけど触ってないビッグデータ系のツール・サービスを触る Advent Calendar 2022の#13です。
🐙🐙🐙オーケストレーションツールDagster入門してみた(概念編)🐙🐙🐙の続きで、Airflow連携を触ってみました。
tl;dr
- DagsterでAirflowのDAGを実行できる(こともある)よ
- DagsterでAirflowのOperatorを実行できる(こともある)よ
- DagterのJobからAirflowのDAGを作ることもできるよ
Dagsterとは
Dagster is an orchestrator that's designed for developing and maintaining data assets, such as tables, data sets, machine learning models, and reports.
です。詳しくは、🐙🐙🐙オーケストレーションツールDagster入門してみた(概念編)🐙🐙🐙や公式サイトを参照してください。
Apache Airflowとは
Airflow is a platform created by the community to programmatically author, schedule and monitor workflows.
です。詳しくは公式サイトや、Data Pipelines with Apache Airflowなどを参照してください。
DagsterとAirflowの連携
DagsterとApache Airflowは同じ分野(ワークフローツールやオーケストレーションツール)の製品で、競合にあたります。
(Dagster側は比較記事を出していたりします)
DagsterとAirflowを組み合わせるための機能がいくつか(Dagster側に)あります。このAirflow連携の機能により、AirflowからDagsterの移行が容易になりそうです。また、移行ではなく新規にDagsterで構築する場合も、Airflowエコシステムの資産を流用できるのは色々便利そうです。
Airflow連携機能としては、具体的には、
- AirflowのDAGをDagsterのJobに変換する
- AirflowのOperatorをDagsterのopにする
- AirflowからDagsterのJobを起動する
- DagsterのJobをAirflowのDAGに変換する
の四種類があるようです。
(組み合わせ的には、「DagsterからAirflowのDAGを起動する」、「DagsterのOpをAirflowのOperatorにする」も考えられそうですが、これはなさそう)
この記事では、このうち
- AirflowのDAGをDagsterのJobに変換する
- AirflowのOperatorをDagsterのopにする
- AirflowからDagsterのJobを起動する
の三つを試してみました(あと一つの「DagsterのJobをAirflowのDAGに変換する」は力尽きました)。
Using Dagster with Airflow Example
概要
このリポジトリのexamples/with_airflowディレクトリでは、DagsterとAirflowの連携の例がいくつか提供されています。具体的には、
-
AirflowのOperatorをDagsterのopにする
- SimpleHttpOperatorでGoogleにリクエスト
- AirflowのDAGをDagsterのJobに変換する
の例が含まれています。それに加え、AirflowのDAGをDagsterのJobに変換する例として、dagster-airflow-migration-exampleからのDAGを取り込みも含まれています。こちらのリポジトリには、
など、Airflowの色々な機能を使ったDAGが含まれています。DagsterでAirflowのDAGを使う場合に、Airlfowの機能を使える事(or使えない事)の確認に使えそうです。
なお、dbt連携のexampleではexampleの使い方や意味を紹介した、チュートリアルが公開されていますが、Airflow連携のexampleでは特にチュートリアルはなさそうです。
準備
exampleを動かしてみます。Ubuntu 20.04 (Windows10のWSL2上)で試しました。
dagsterコマンドをインストールします。
pip install dagster==1.1.5
exampleのコードを取得し、依存パッケージをインストールします
dagster project from-example --name tutorial_airflow --example with_airflow
cd tutorial_airflow
pip install -e ".[dev]"
DagsterのWebUI(dagit)を起動します
dagit
私の環境では、いくつかのDAG(airflow_taskflow_dag等)に関連してエラーメッセージが出ましたが、dagitの起動自体はできました。
AirflowのDAGをDagsterのJobに変換する例を試してみる
いくつか例がありますが、まずは一番簡単そうなDAGを試しみます。
このPythonコードは
- 四つのBashOperator
- それと依存関係にある、一つのDummyOperator
からなる完全なDAGで、Airflowで実際に動作させることも出来ます。
(Airflow提供のdocker-compose・Airflow2.4.3で確認)
このDAGをDagsterで動かしてみます。先ほどの準備のコマンドの結果
- DAGのコード
-
DAGをDagsterのjobにする設定
-
make_dagster_job_from_airflow_dag(
とRepositoryの設定
-
が含まれていますので、左上のハンバーガメニューからairflow_simple_dagを選ぶとJobを表示することが出来ます。同じDAGなので当たり前ですが、AirflowのGraph Viewと同じ依存関係になっていますね。
Launchpadタブを選び、「Lauch Run」を押すとこのDAGをDagsterで実行することが出来ます。
このDAGのTaskでは、AirflowのJinja2マクロ(exuecution_dateやtask_instance_key_str)をechoコマンドで出力しています。画面下の「Raw Compute Log」を選ぶとTask(に対応するOp)が、日時やTaskの名前を出力していることを確認できます。
AirflowのOperatorをDagsterのopにする
一度dagitを停止し。airflow_operator_to_op.pyを指定して起動します(repository.pyに含まれていないため、単にdagitでは処理対象に含まれない)。
dagit -f airflow_operator_to_op.py
SimpleHttpOperatorに対応するOpを一つだけあるJobが登録されているはずです。
Launchpadから実行します。StatusがSucessになり、HTTPリクエストしているログが記録されています。
exampleそのままではリクエスト結果が(多分)確認できないので、airflow_operator_to_opにreturn_outputパラメータを加えて再度実行してみます。
dagster_op = airflow_operator_to_op(http_task, connections=connections, return_output=True)
今度の実行では、「Handled output "result" using IO manager "io_manager"」のログが、保存先のディレクトリと共に記録されるようになります。ログで記載されているファイルを開くと、HTTPリクエストの結果を確認できます。
(先頭に何かバイナリが入っている原因は不明)
head -n1 /home/notrogue/project/dagster/tutorial_airflow/with_airflow/tmpxn3syf1r/storage/dd508c7b-c69a-49f1-a06b-a37409410403/http_task/result
���9X�9<!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="ja"><head><meta content="Google 画像検索 ウェブ上の画像を縦横無尽に検索" name="description"><meta content="text/html; charset=UTF-8" http-equiv="Content-Type"><meta content="/images/branding/googleg/1x/googleg_standard_color_128dp.png" itemprop="image"><title>Google 画像検索</title><script nonce="d0EAv5ItbNyQxiat4pelKg">(function(){window.google={kEI:'P1KYY87hBMn_hwOp4YDwCw',kEXPI:'0,1359409,6058,207,4804,2316,383,246,5,1129120,1197715,380775,16112,28687,22430,1362,12318,17581,4998,13226,3849,10623,22740,6675,1278,2742,149,1103,840,2197,4100,3514,606,2023,2297,14670,3227,2845,7,33218,553,4464,3783,9359,3,346,230,6459,149,13975,4,1528,2304,34770,7357,13658,4437,16786,5821,2536,4094,17,4035,3,3541,2,39041,2,3110,2,14022,6248,1398,18093,5679,1020,2378,28744,4568,6259,23416,1254,5835,14967,4333,7484,445,2,2,1,23825,10962,7381,15969,874,19633,7,1922,5784,16410,9364,9543,4832,26077,427,106,5690,7,3077,2932,8324,14,82,3890,751,13384,1502,679,1342,280,781,997,3852,1125,1747,2039,4265,70,3,785,81,246,454,1505,1284,2412,722,1020,813,1514,2346,84,852,565,568,988,970,418,706,441,399,96,42,385,1033,42,291,2669,1167,481,402,1029,3,261,524,2,216,96,2318,135,515,447,342,246,950,1,1442,483,294,92,4,6,845,857,301,18,127,2,537,2304,383,71,3,442,97,598,551,918,339,95,64,700,579,1086,61,917,73,201,5,342,461,716,130,88,20,139,323,303,130,108,115,10,78,355,10,3,3,144,35,4,327,769,276,236,254,120,82,255,3,501,42,1,529,42,12,2065,1047,571,8,218,360,195,239,18,114,4,99,36,135,29,297,12,105,138,789,3,710,1,650,502,38,358,250,1337,211,56,398,118,261,1254,956,525,838,3,254,83,2,114,251,7,88,45,44,89,1686,5273225,6273,8798849,3311,141,795,19735,1,303,44,2759,41,403,2,1,2,151,6,1,5,1,5,1,3,2,1,3,2,2,1,5,2,1,2,2,2,3,155,2,4,26,6,2,23946881,512,18,4041612,1964,16672,3406,5595,11,3835,4590,311,950,1429',kBL:'oABN'};google.sn='imghp';google.kHL='ja';})();(function(){
AirflowからDagsterのJobを起動する
おそらくexampleに含まれていないので、自前で頑張ります。以下の流れで、AirflowからDagsterのJobを起動します。
- dagster-airflowをインストールした、Airflowクラスターの準備
- Airflow Connectionを設定
- DagsterOperatorを呼び出すDAGを作成
- Dagster(dagit)を起動
- (3)で作成したDAGをAirflowからTrigger
起動対象のJobは何でも良いですが、ここでは先ほど使ったairflow_operator_to_op.pyを起動してみます。
Airflowクラスターの準備
Airlfowが提供しているDockerイメージとdocker-composeを使います(dagster-airflowがインストール出来れば環境は選ばないと思います)。
デフォルトのコンテナイメージには、必要なパッケージが抜けていますので、追加したDockerfileを作ります。
FROM apache/airflow:2.4.3
RUN pip install apache-airflow dagster-airflow apache-airflow-providers-docker
docker-compose.yamlを変更し、作成したコンテナイメージを使うようにします。
version: '3'
x-airflow-common:
&airflow-common
# In order to add custom dependencies or upgrade provider packages you can use your extended image.
# Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml
# and uncomment the "build" line below, Then run `docker-compose build` to build the images.
build: .
(以下略)
コンテナビルドして、Airflowクラスタを起動します。
docker-compose build
docker-compose up
Airflow Connection
AdminタブのConnectionから、新しいConnectionを設定します。
- Dagster URLに設定しているIP Addressは、AirflowのコンテナからDagster(Dagit)にアクセスするためのIPアドレスです。環境に合わせて適当に変えてください
- 空白にしている部分はDagster Cloudにリクエストする時に必要な情報です(多分)。self-hostingの場合は無くても動きました
DagsterOperatorを呼び出すDAGを作成
from airflow import models
from dagster_airflow import DagsterOperator
from airflow.utils.dates import days_ago
args = {
"start_date": days_ago(2),
}
simple_dag = models.DAG(dag_id="dagster", default_args=args, schedule_interval=None)
DagsterOperator(
task_id="sink_task",
dag=simple_dag,
repository_name='__repository__my_http_job',
repostitory_location_name='airflow_operator_to_op.py',
job_name='my_http_job',
user_token="aaa",
)
- repository_name、repostitory_location_nameはDagster(Dagit)のDeployment->Definitionsで確認した値です(下図)
- user_tokenはDagster Cloudを呼ぶ場合のみ必要ですが、設定しないとエラーが発生したため適当な値を入れています
DagsterOperatorの詳細はコード読んでください。
Dagster(dagit)を起動
Dagster(dagit)からairflow_operator_to_opを起動する時と同じ手順ですが、同じホストで起動しない場合はhオプションを設定する点は注意が必要です。
dagit -f airflow_operator_to_op.py -h 0.0.0.0
起動前はRunが空です。
(3)で作成したDAGをAirflowからTrigger
AirflowのWebUI(http://localhost:8080)からDAGのpauseを解除し、DAGをTriggerします。
うまくいくと、Dagster(dagit)でRunが追加されているはずです。
Discussion