🦔

Digdagの機能紹介

2022/08/20に公開

ワークフローエンジンの機能は色々あるので網羅的にまとめたいと常々思っていた。Digdag vs Airflowなどの比較記事はあるが機能ベースで紹介されることは少ないというのがモチベーション。
そんな中で実践的データ基盤への処方箋を読んだところ非常に良い表があったのでそれをベースにDigdagの機能を紹介していく。

data-syohousen-workflow.jpg
実践的データ基盤への処方箋 第2章 16節より

Digdag用語

この記事内で用いるDigdagの用語を解説しておく。(実質Conceptsの翻訳)
Digdagの概要はいろんな良い記事があるのでそこを参照していただきたい。

  • task
    • Digdagにおけるジョブの最小単位
    • ファイルをダウンロードするとかSQLを実行するとか一つの処理だが、冪等性が期待される
  • workflow
    • 1つ1つのtaskを意味のある単位にまとめたもの
      • ファイルをS3からダウンロードし、整形し、DBにインポートするといった一連のフロー
    • Digdagにおいては1ファイル1ワークフロー
      • .digファイル(実質yaml)で記述される
  • session
    • workflowを実行するための計画
    • session_timeという「workflowが実行される時間」が実質的なIDになる
      • ここでの「workflowが実行される時間」とは実際に実行される時間とは異なるということが、ややこしいポイントでありつつもDigdagにおける最も重要な概念になっている
    • 例えば8時台のログを集計するようなworkflowを考えたとき、cronを使って実現するなら9時に実行を開始し、スクリプトの中で「現在時刻-1時間して更にhourly単位で時刻を丸める」みたいなことがあるあるだと思われる
      • ただし再実行が面倒な上に何かしらの原因で実行が10時になってしまったら8時台のログは集計されない
    • Digdagであればsession_timeを9:00で与えることでそのsession_timeをtask中で使えるので、実際に実行される時刻を気にすることなくworkflowを構成することができる
      • つまり「workflowが実行される(のを期待された)時刻」に依存する処理をなんと実際に実行される時間を気にせずにかけるのである!Digdag凄い
      • 指定の仕方は例えばhourly>: 10:00みたいに書くと9:10にsession_timeが9:00のsessionが実行される
      • この9:00のsessionを再実行する際も、session_timeは9:00のままなのでいつでも8時台のログを再集計できる
  • attempt
    • sessionの実際の実行を表す
    • 特に何もしなかったら1sessionにつき1attempt
    • sessionを再実行したりすると増えていく
      • sessionのステータス==最新のattemptのステータスと考えて良さそう
  • operator
    • taskの実行内容を規定するもので、task定義にはoperatorが必要
    • sh>のように書かれるもので、これはシェルスクリプトを実行するoperatorを意味している
      • 例えばsh>: echo "hello world"のように書くことでecho "hello world"というシェルスクリプトを実行できる

おそらく一番難しい概念であり一番大事な概念がsessionとattemptであり、何度も書くがworkflowが実行される(のを期待された)時刻と実際に実行された時刻を分けるというのが肝である。

Digdagの機能

以下の機能を実現するためにDigdagコマンドもしくはWebUIを利用する場合がある。基本的にWebUIの機能は必要最低限であり、コマンドからしかできないことも多い。
ただし以下で紹介されるような基本的なコマンドは、コマンドが無くてもREST APIを使うことで代替できる。
また以下ではlocal modeの機能は含まない。

起動時刻制御

時刻を指定して起動できる

今すぐあるsession_timeで実行するということはコマンドからできる。WebUIからは現時点ではできない。
またある時刻にあるsession_timeで実行するということはできないはず。

柔軟な指定ができる

毎分、毎時、毎日、毎週、毎月という単位だったりcronの記法でスケジュールを指定できる

起動順序制御

taskの起動順序はworkflowで書いた通りに上から実行される。
またworkflow間でも依存関係を定義できる。デフォルトの挙動は以下だが、オプションで変えられる。

依存先の状態 挙動
成功 そのまま続きのtaskを実行
実行中 終了するまで待機
実行されていない 依存先のworkflowを実行し、終了するまで待機
失敗 自身も失敗

再実行・スキップ

再実行はコマンドとWebUIからできる。スキップはコマンドからのみ。

アラート

異常終了をキャッチしてなにかしたり長時間実行が発生したらなにかしたりできる。メールを飛ばす機能はあるが、その他チャットツールなどに飛ばしたい場合はプラグインを探すか、自前で通知処理を書く必要がある。

状態管理

sessionのステータスに関してこちらから見えるのは実行中、正常終了、異常終了、キャンセルぐらい。個々のtaskは様々な状態をとる

タイムアウト制御

機能としてはない。一応sla operatorを用いることで無理やり強制終了させることはできる。

同時実行制御

同時に複数のtaskを実行できたり、同時実行数を設定できる。

リモート実行

機能としてはない。原理的にはsshして何かを実行するみたいなシェルをtaskとして書くことはできる。特に何も設定しないと実際の処理はagentが起動しているサーバーのローカルで実行される。
次のトピックであるコンテナ実行をサポートしていたり、Digdag自体を多数並べてHA構成をとることでリモート実行したいユースケースをカバーしているように思える。

コンテナ実行

実際の処理をDockerやECSで実行することができる。そもそもDigdag自体をコンテナ上で動かすというプラクティスもいくつかある。

処理のグループ化

処理=taskとすればworkflowが処理のグループ化に相当する。workflow単位で実行、再実行、スキップできる。
workflowのうちある特定のtaskだけ実行するなどはできないが、再実行時に失敗したtaskのみ再実行といったことはできる。

処理への引数指定

operatorには引数を渡すことができる。session_timeなどDigdagが用意している変数があったり、_exportなどによって自分で定義した環境変数のようなものを渡せる。

バックフィル

コマンドからバックフィルできる

条件分岐

_checkや_errorといった予約語をworkflow定義に用いることで、終了状態に応じた条件分岐ができる
他にもif operatorを用いることでも条件分岐でき、Javascriptを使って簡単な式を条件に書くことができる。

ログ保持

configを書いて起動時に読ませることで、ログをローカルやS3やGCSに保存できるようになる。

稼働情報保持

WebUIから1つ1つのsessionがどれくらい時間がかかったかはわかるが、実行時間の変化や正常終了の数といった統計情報は見ることができない。メトリクスを取得することはできるので、そこから自前で加工すればある程度は統計情報を得ることができる。

API

Language APIと呼ばれているPython APIRuby APIがある。これらはtask内でのみ利用することができ、環境変数を書き換えたり、子タスクを動的に生成したりすることができる。
REST APIもあり、これはworkflow自体を実行したり、sessionの情報を取得したり、workflowの設定自体を上書きしたりすることができる。

ビューワー

WebUIがあり、処理の状態、実行順序、実行結果などを確認できる。再実行やworkflowの編集・削除といったwrite操作も行うことができる。

ユーザー管理

機能としてはないので、自前でなにかしら用意する必要がある。ユーザー管理がないのでWebUIへのログイン機能もない(アクセスできれば誰でも見れる)。

プラグイン

プラグインによって、自由にoperatorをつくることができる。例えばslack pluginを入れることで、slack>というoperatorが使えるようになる。

まとめ

改めてまとめるとDigdagはちょうどよい機能がまとまっていることがわかった。おそらく要件的な課題として考えられるのは、WebUIの機能不足(特にユーザー管理)やタイムアウト制御といった部分になると思われるが、逆にその辺りが許容できれば良い選択になると思っている。
他のワークフローエンジンでも上記のようなトピックごとのまとめがあれば、すごく参考になると思うので誰か書いてほしい。
また改めてにはなるが、間違いがあれば是非指摘していただきたい。

Discussion