社内の問題を解決するためにワークフローエンジンを開発した話
要約
既存の ETL パイプラインの管理を効率化するため、YAMLでワークフローを定義し、実行できるツールを開発しました。
なぜ Airflow や Prefect を使わないのか?
Airflow や Prefect は素晴らしいツールですが、これらのツールでワークフロー(DAG)を定義するには、Python でコードを書く必要があります。
私が関わっている環境では、すでに数十万行のシェルスクリプトや Perl スクリプトのコードがあります。この上に Python のレイヤーを重ねれば、さらに保守が難しくなってしまいます。また、既存のコードを Python に書き換えることも現実的ではありません。
そのため、プログラムを書かずに、ノーコード(YAML ファイル)でワークフローを定義でき、既存のプログラムをそのまま利用できる軽量なツールを開発することにしました。
Dagu の仕組み
- Dagu は、1 つの実行ファイルだけで構成されており、実行履歴のデータはファイルシステムに JSON として保存されるため、DBMS は必要ありません。
- Dagu は、YAML で定義されたワークフローを実行します。既存のプログラムを変更する必要はありません。
Quick start
1. インストール
dagu バイナリを Releases page からダウンロードし、パス ($PATH) の通ったディレクトリ(例: /usr/local/bin
)に配置してください。
2. サンプルの DAG ファイルのダウンロード
サンプルの DAG ファイル を example.yaml
という名前で現在のフォルダにダウンロードしてください。
3. Web UI の起動、DAG の実行
dagu server
コマンドを実行し、http://localhost:8080
にアクセスすると Web UI が表示されます。そして、現在のフォルダにある example.yaml
を選択し、Start
ボタンで DAG を実行することができます。
Dagu コマンドの使い方
-
dagu start [--params=<params>] <file>
- ワークフローの実行 -
dagu status <job file>
- ワークフロー実行状況の表示 -
dagu retry --req=<request-id> <file>
- ワークフローの再実行 -
dagu stop <file>
- ワークフロー実行のキャンセル -
dagu dry [--params=<params>] <file>
- 試験実行 -
dagu server
- Web UI の起動
ユーザーインターフェース
Web UI を起動するには、dagu server
コマンドを実行します。デフォルトで http://127.0.0.1:8000
の URL で Web UI を表示できます。
-
DAGs: 全体の DAG 実行状況
-
Detail: 対象 DAG の詳細な実行ステータス、ログ
-
Timeline: 対象 DAG のタイムライン(各ステップの実行時間グラフ)
-
History: 過去の実行履歴
横軸に日付、縦軸にパイプラインの各ステップの実行結果が表示されます。JP1 のジョブネット実行履歴のような見やすい表示にしています。縦軸を選択すると、その下に実行時間やログなどの詳細が表示されます。
ワークフロー (DAG) の定義方法
最小限の設定
最もシンプルな定義は次のとおりです。2つの Python スクリプトを順番に実行するだけです。
name: minimal configuration # DAG's name
steps: # Steps inside the DAG
- name: step 1 # Step's name (should be unique within the file)
command: python main_1.py # Command and arguments to execute
- name: step 2
command: python main_2.py
depends:
- step 1 # [optional] Name of the step to depend on
環境変数の使用
env
フィールドで環境変数を定義し、ファイル全体で使用できます。
name: example
env:
SOME_DIR: ${HOME}/batch
steps:
- name: some task in some dir
dir: ${SOME_DIR}
command: python main.py
パラメータの使用
params
フィールドで DAG 全体に対するパラメータを定義し、ファイル全体で使用できます。定義したパラメータは、シェルスクリプトと同様に $1, $2 といった変数で参照できます。パラメータには、コマンド置換や環境変数を使用することもできます。また、パラメータは、start
コマンドの --params
パラメータの値で上書きすることができます。リトライ時は初回実行時に指定されたパラメータが再利用されます。
name: example
params: param1 param2
steps:
- name: some task with parameters
command: python main.py $1 $2
コマンド置換の使用
各フィールドの値ではコマンド置換を使用できます。バッククォート (`
) で囲まれた文字列は、コマンドとして評価され、標準出力の値で置き換えられます。
name: example
env:
TODAY: "`date '+%Y%m%d'`"
steps:
- name: hello
command: "echo hello, today is ${TODAY}"
条件分岐
特定の条件では、タスクを実行したくない場合があります。そのために precondition
フィールドを使用できます。例えば、次のタスクは毎月1日のみに実行されます。
name: example
steps:
- name: A monthly task
command: monthly.sh
preconditions:
- condition: "`date '+%d'`"
expected: "01"
あるタスクがスキップされたとしても、後続のタスクを継続したい場合、 continueOn
フィールドを使用できます:
name: example
steps:
- name: A monthly task
command: monthly.sh
preconditions:
- condition: "`date '+%d'`"
expected: "01"
continueOn:
skipped: true
ステートハンドラ
タスクが失敗、成功したときに何らかのアクションを実行するには、 handlerOn
フィールドを使用できます。たとえば slack に通知をしたり、クリーンアップ処理を行うことができます。
name: example
steps:
- name: A task
command: main.sh
handlerOn:
failure:
command: notify_error.sh
exit:
command: cleanup.sh
繰り返しタスク
あるタスクを一定間隔で繰り返したい場合は、repeatPolicy
フィールドを使用できます。繰り返しタスクを安全に終了するには、 stop
コマンドを使用します。stop
コマンドは、繰り返しタスクが実行中だった場合、完全に終了するのを待ってから終了します(最大待ち時間は MaxCleanUpTimeSec
で設定可能です)。
name: example
steps:
- name: A task
command: main.sh
repeatPolicy:
repeat: true
intervalSec: 60
使用できるフィールドの一覧
現在、以下のフィールドのすべての設定が DAG 設定ファイルの中で使用できます。これらの設定を組み合わせることによって、ワークフローの細かい制御が可能です。
共通の設定は、グローバルの設定ファイル ~/.dagu/config.yaml
にまとめておくことができます。
name: all configuration # DAG's name
description: run a DAG # DAG's description
env: # Environment variables
LOG_DIR: ${HOME}/logs
PATH: /usr/local/bin:${PATH}
logDir: ${LOG_DIR} # Log directory to write standard output
histRetentionDays: 3 # Execution history retention days (not for log files)
delaySec: 1 # Interval seconds between steps
maxActiveRuns: 1 # Max parallel number of running step
params: param1 param2 # Default parameters for the DAG that can be referred to by $1, $2, and so on
preconditions: # Precondisions for whether the DAG is allowed to run
- condition: "`echo 1`" # Command or variables to evaluate
expected: "1" # Expected value for the condition
mailOn:
failure: true # Send a mail when the DAG failed
success: true # Send a mail when the DAG finished
MaxCleanUpTimeSec: 300 # The maximum amount of time to wait after sending a TERM signal to running steps before killing them
handlerOn: # Handler on Success, Failure, Cancel, Exit
success:
command: "echo succeed" # Command to execute when the DAG execution succeed
failure:
command: "echo failed" # Command to execute when the DAG execution failed
cancel:
command: "echo canceled" # Command to execute when the DAG execution canceled
exit:
command: "echo finished" # Command to execute when the DAG execution finished
steps:
- name: som task # Step's name
description: some task # Step's description
dir: ${HOME}/logs # Working directory
command: python main.py $1 # Command and parameters
mailOn:
failure: true # Send a mail when the step failed
success: true # Send a mail when the step finished
continueOn:
failed: true # Continue to the next regardless of the step failed or not
skipped: true # Continue to the next regardless the preconditions are met or not
retryPolicy: # Retry policy for the step
limit: 2 # Retry up to 2 times when the step failed
repeatPolicy: # Repeat policy for the step
repeat: true # Boolean whether to repeat this step
intervalSec: 60 # Interval time to repeat the step in seconds
preconditions: # Precondisions for whether the step is allowed to run
- condition: "`echo 1`" # Command or variables to evaluate
expected: "1" # Expected Value for the condition
開発について
Dagu は開発を始めたばかりですが、実践で利用しながら、バグ修正、機能追加、改善をしていきたいと思います。
どなたでもお気軽にPR,Issueいただけると嬉しいです。
Discussion