🐱

社内の問題を解決するためにワークフローエンジンを開発した話

2022/04/23に公開

要約

既存の ETL パイプラインの管理を効率化するため、YAMLでワークフローを定義し、実行できるツールを開発しました。

https://github.com/yohamta/dagu

画面UI

なぜ Airflow や Prefect を使わないのか?

Airflow や Prefect は素晴らしいツールですが、これらのツールでワークフロー(DAG)を定義するには、Python でコードを書く必要があります。

私が関わっている環境では、すでに数十万行のシェルスクリプトや Perl スクリプトのコードがあります。この上に Python のレイヤーを重ねれば、さらに保守が難しくなってしまいます。また、既存のコードを Python に書き換えることも現実的ではありません。

そのため、プログラムを書かずに、ノーコード(YAML ファイル)でワークフローを定義でき、既存のプログラムをそのまま利用できる軽量なツールを開発することにしました。

Dagu の仕組み

  • Dagu は、1 つの実行ファイルだけで構成されており、実行履歴のデータはファイルシステムに JSON として保存されるため、DBMS は必要ありません。
  • Dagu は、YAML で定義されたワークフローを実行します。既存のプログラムを変更する必要はありません。

Quick start

1. インストール

dagu バイナリを Releases page からダウンロードし、パス ($PATH) の通ったディレクトリ(例: /usr/local/bin)に配置してください。

2. サンプルの DAG ファイルのダウンロード

サンプルの DAG ファイルexample.yaml という名前で現在のフォルダにダウンロードしてください。

3. Web UI の起動、DAG の実行

dagu server コマンドを実行し、http://localhost:8080 にアクセスすると Web UI が表示されます。そして、現在のフォルダにある example.yaml を選択し、Start ボタンで DAG を実行することができます。

Dagu コマンドの使い方

  • dagu start [--params=<params>] <file> - ワークフローの実行
  • dagu status <job file> - ワークフロー実行状況の表示
  • dagu retry --req=<request-id> <file> - ワークフローの再実行
  • dagu stop <file> - ワークフロー実行のキャンセル
  • dagu dry [--params=<params>] <file> - 試験実行
  • dagu server - Web UI の起動

ユーザーインターフェース

Web UI を起動するには、dagu server コマンドを実行します。デフォルトで http://127.0.0.1:8000 の URL で Web UI を表示できます。

  • DAGs: 全体の DAG 実行状況

    DAGs

  • Detail: 対象 DAG の詳細な実行ステータス、ログ

    Detail

  • Timeline: 対象 DAG のタイムライン(各ステップの実行時間グラフ)

    Timeline

  • History: 過去の実行履歴
    横軸に日付、縦軸にパイプラインの各ステップの実行結果が表示されます。JP1 のジョブネット実行履歴のような見やすい表示にしています。縦軸を選択すると、その下に実行時間やログなどの詳細が表示されます。

    History

ワークフロー (DAG) の定義方法

最小限の設定

最もシンプルな定義は次のとおりです。2つの Python スクリプトを順番に実行するだけです。

name: minimal configuration          # DAG's name
steps:                               # Steps inside the DAG
  - name: step 1                     # Step's name (should be unique within the file)
    command: python main_1.py        # Command and arguments to execute
  - name: step 2
    command: python main_2.py
    depends:
      - step 1                       # [optional] Name of the step to depend on

環境変数の使用

env フィールドで環境変数を定義し、ファイル全体で使用できます。

name: example
env:
  SOME_DIR: ${HOME}/batch
steps:
  - name: some task in some dir
    dir: ${SOME_DIR}
    command: python main.py

パラメータの使用

params フィールドで DAG 全体に対するパラメータを定義し、ファイル全体で使用できます。定義したパラメータは、シェルスクリプトと同様に $1, $2 といった変数で参照できます。パラメータには、コマンド置換や環境変数を使用することもできます。また、パラメータは、start コマンドの --params パラメータの値で上書きすることができます。リトライ時は初回実行時に指定されたパラメータが再利用されます。

name: example
params: param1 param2
steps:
  - name: some task with parameters
    command: python main.py $1 $2

コマンド置換の使用

各フィールドの値ではコマンド置換を使用できます。バッククォート (`) で囲まれた文字列は、コマンドとして評価され、標準出力の値で置き換えられます。

name: example         
env:
  TODAY: "`date '+%Y%m%d'`"
steps:                               
  - name: hello
    command: "echo hello, today is ${TODAY}"

条件分岐

特定の条件では、タスクを実行したくない場合があります。そのために precondition フィールドを使用できます。例えば、次のタスクは毎月1日のみに実行されます。

name: example
steps:
  - name: A monthly task
    command: monthly.sh
    preconditions:
      - condition: "`date '+%d'`"
        expected: "01"

あるタスクがスキップされたとしても、後続のタスクを継続したい場合、 continueOn フィールドを使用できます:

name: example
steps:
  - name: A monthly task
    command: monthly.sh
    preconditions:
      - condition: "`date '+%d'`"
        expected: "01"
    continueOn:
      skipped: true

ステートハンドラ

タスクが失敗、成功したときに何らかのアクションを実行するには、 handlerOn フィールドを使用できます。たとえば slack に通知をしたり、クリーンアップ処理を行うことができます。

name: example
steps:
  - name: A task
    command: main.sh
handlerOn:
  failure:
    command: notify_error.sh
  exit:
    command: cleanup.sh

繰り返しタスク

あるタスクを一定間隔で繰り返したい場合は、repeatPolicy フィールドを使用できます。繰り返しタスクを安全に終了するには、 stop コマンドを使用します。stop コマンドは、繰り返しタスクが実行中だった場合、完全に終了するのを待ってから終了します(最大待ち時間は MaxCleanUpTimeSec で設定可能です)。

name: example
steps:
  - name: A task
    command: main.sh
    repeatPolicy:
      repeat: true
      intervalSec: 60

使用できるフィールドの一覧

現在、以下のフィールドのすべての設定が DAG 設定ファイルの中で使用できます。これらの設定を組み合わせることによって、ワークフローの細かい制御が可能です。

共通の設定は、グローバルの設定ファイル ~/.dagu/config.yaml にまとめておくことができます。

name: all configuration              # DAG's name
description: run a DAG               # DAG's description
env:                                 # Environment variables
  LOG_DIR: ${HOME}/logs
  PATH: /usr/local/bin:${PATH}
logDir: ${LOG_DIR}                   # Log directory to write standard output
histRetentionDays: 3                 # Execution history retention days (not for log files)
delaySec: 1                          # Interval seconds between steps
maxActiveRuns: 1                     # Max parallel number of running step
params: param1 param2                # Default parameters for the DAG that can be referred to by $1, $2, and so on
preconditions:                       # Precondisions for whether the DAG is allowed to run
  - condition: "`echo 1`"            # Command or variables to evaluate
    expected: "1"                    # Expected value for the condition
mailOn:
  failure: true                      # Send a mail when the DAG failed
  success: true                      # Send a mail when the DAG finished
MaxCleanUpTimeSec: 300               # The maximum amount of time to wait after sending a TERM signal to running steps before killing them
handlerOn:                           # Handler on Success, Failure, Cancel, Exit
  success:                           
    command: "echo succeed"          # Command to execute when the DAG execution succeed
  failure:                           
    command: "echo failed"           # Command to execute when the DAG execution failed
  cancel:                            
    command: "echo canceled"         # Command to execute when the DAG execution canceled
  exit:                              
    command: "echo finished"         # Command to execute when the DAG execution finished
steps:
  - name: som task                   # Step's name
    description: some task           # Step's description
    dir: ${HOME}/logs                # Working directory
    command: python main.py $1       # Command and parameters
    mailOn:
      failure: true                  # Send a mail when the step failed
      success: true                  # Send a mail when the step finished
    continueOn:
      failed: true                   # Continue to the next regardless of the step failed or not
      skipped: true                  # Continue to the next regardless the preconditions are met or not 
    retryPolicy:                     # Retry policy for the step
      limit: 2                       # Retry up to 2 times when the step failed
    repeatPolicy:                    # Repeat policy for the step
      repeat: true                   # Boolean whether to repeat this step
      intervalSec: 60                # Interval time to repeat the step in seconds
    preconditions:                   # Precondisions for whether the step is allowed to run
      - condition: "`echo 1`"        # Command or variables to evaluate
        expected: "1"                # Expected Value for the condition

開発について

Dagu は開発を始めたばかりですが、実践で利用しながら、バグ修正、機能追加、改善をしていきたいと思います。

どなたでもお気軽にPR,Issueいただけると嬉しいです。

https://github.com/yohamta/dagu/

Discussion