🎡

Pythonのastモジュールを活用してAirflowのDAG依存関係をDAG Docsに表示する

2022/05/04に公開

AirflowでExternalTaskSensorを使って他のDAGの完了を待ち受けるような構成の場合、Web UIにDAGの依存関係が表示されると障害対応の時などに便利です。Web UIへのドキュメントの表示はDAG Docsに記述すれば良いのですが、DAG更新のたびに手入力するのも面倒だし更新漏れも発生します。
なので、自動で依存関係を抽出してDAG Docsに表示する仕組みを考えて実装してみました。以下のように表示されるようになります。

仕組み

dagsディレクトリに以下のような依存関係を定義したJSONファイルを保存しておいて、DAGファイルから読み込んでMarkdownでいい感じ表示するだけです。

dependencies.json
{
  "dag_21": {
    "upstream": [
      "dag_11"
    ],
    "downstream": []
  },
  "dag_11": {
    "upstream": [],
    "downstream": [
      "dag_31",
      "dag_21"
    ]
  },
  "dag_31": {
    "upstream": [
      "dag_11"
    ],
    "downstream": []
  }
}

DAG側ではヘルパー関数を使って、JSONをロードしてMarkdownに整形した文字列をdoc_mdに設定します。

dag_11.py
DEPENDENCY_FILE = f'{os.path.dirname(os.path.abspath(__file__))}/dependencies.json'
def print_dependencies(dag_id):
    with open(DEPENDENCY_FILE, 'r+') as f:
        dependencies = json.load(f)
    upstream_dag_list = "\n".join(["- " + dag for dag in dependencies[dag_id]["upstream"]])
    downstream_dag_list = "\n".join(["- " + dag for dag in dependencies[dag_id]["downstream"]])
    return dedent('''\
    ### Dependencies
    #### Upstream DAGs
    {upstream_dag_list}
    #### Downstream DAGs
    {downstream_dag_list}
    '''
    ).format(upstream_dag_list=upstream_dag_list, downstream_dag_list=downstream_dag_list)
    
with DAG(
    'dag_11',
    ...
) as dag:
    ...
    dag.doc_md = print_dependencies(dag.dag_id)
    ...

肝心のdependencies.jsonの作成・更新方法ですが、こちらはPythonのastモジュールでDAGファイルをパースしてExternalTaskSensorの定義からDAGの依存関係を抽出しています。

dag_depndency.py
import ast
...

DEPENDENCY_FILE = f'{os.path.dirname(os.path.abspath(__file__))}/dependencies.json'

class DependencyAnalyzer(ast.NodeVisitor):
    def __init__(self, filename):
        super().__init__()
        self.__dependencies = set()
        self.current_dag = ''
        self.filename = filename

    # DAGファイルのコードをパースして依存DAGをself.__dependenciesに設定する
    def analyze(self):
        with open(self.filename, 'r') as f:
            node = ast.parse(f.read())
        self.visit(node)

    # ExternalTaskSensorの呼び出しから引数external_dag_idの値を抽出する
    def visit_Call(self, node: ast.Call):
        func = node.func
        if isinstance(func, ast.Name):
            if func.id == 'ExternalTaskSensor':
                for kwarg in node.keywords:
                    if kwarg.arg == 'external_dag_id':
                        self.__dependencies.add(kwarg.value.value)
            elif func.id == 'DAG':
                self.current_dag = node.args[0].value

    @property
    def dependencies(self):
        return (self.current_dag, self.__dependencies)


def update_dependency(dag_id, dependencies):
    if os.path.exists(DEPENDENCY_FILE):
        with open(DEPENDENCY_FILE, 'r+') as f:
            dependencies_master_json = json.load(f)
    else:
        dependencies_master_json = {}

    # 対象のDAGのupstreamを設定・更新
    deps = dependencies_master_json.setdefault(dag_id, {'upstream': [], 'downstream': []})
    deps['upstream'] = list(dependencies)

    # upstreamのDAGのdownstreamに対象のDAGを設定
    for dependent_dag in dependencies:
        deps = dependencies_master_json.setdefault(dependent_dag, {'upstream': [], 'downstream': []})
        downstream = set(deps['downstream'])
        downstream.add(dag_id)
        deps['downstream'] = list(downstream)

    # 対象のDAGが依存していないDAGのdownstreamから対象のDAGを削除
    for dag, deps in dependencies_master_json.items():
        if dag not in dependencies:
            deps['downstream'] = list(set(deps['downstream']) - {dag_id})

    # dependencies.jsonに書き出す
    with open(DEPENDENCY_FILE, 'w+') as f:
        json.dump(dependencies_master_json, f)

if __name__ == '__main__':
    filename = sys.argv[1]  # DAGファイル名
    v = DependencyAnalyzer(filename)
    v.analyze()
    update_dependency(*v.dependencies)

このスクリプトをCI/CDで自動実行するようにしておけばDAGファイルの更新の都度、自動でDAG Docsの依存関係ドキュメントも更新されるようになります。

# CI/CDでdependencies.jsonを更新
$ python dag_dependency.py dag_11.py

今回はupstream DAGとdownstream DAGをDAG Docsに表示するだけでしたが、dependencies.jsonがあればend-to-endでDAGの依存関係を表示することもできるかと思います。そうすることで、問題発生時の影響範囲が一目瞭然となります。シンプルな仕組みながら色々な使い方ができそうです。

Discussion