🔖

削除済みDAGがAirflow Web UIに表示され続ける仕様への対処方法 on Cloud Composer

2021/09/30に公開

Cloud ComposerでDAGファイルを削除してもWeb UIにはDAGが残る

https://cloud.google.com/composer/docs/how-to/using/managing-dags#deleting_a_dag_in_your_environment

Cloud ComposerでDAGを削除するには、Cloud Storageバケットのdagsディレクトリから対象のDAGファイル(.py)を削除します。これでDAG自体は環境から削除されて実行することはできなくなりますが、AirflowのWeb UIには表示され続けます。
長らく運用していると、Web UI上に実際に使用しているDAGと削除されたDAGが入り混じって表示されて使い勝手が悪くなります。

そこで、Web UIからDAGを自動で削除する方法を紹介します。

手動で削除する

そもそも、DAGを頻繁に削除しないような環境では自動化するより都度手動で削除した方が良いと思います。手動削除は冒頭のリンク先に記載の公式の手順でやれば良いです。つまり、以下の手順。

# Cloud StorageバケットのdagsディレクトリからDAGファイルを削除する
gcloud composer environments storage dags delete \
    --environment $ENVIRONMENT_NAME \
    --location $LOCATION \
    DAG_NAME.py

# Airflow Web UIからDAGを削除する
gcloud composer environments run $ENVIRONMENT_NAME \
    --location $LOCATION \
    delete_dag -- DAG_NAME

最初のコマンドの代わりにCloud Storageバケットから直接DAGファイルを削除しても同じです。

上記コマンドを都度実行するのが面倒な場合は自動化します。

自動で削除する

Web UIから自動でDAGを削除することで、運用の効率化だけでなく消し忘れや誤削除を防ぐことができます。

自動削除の仕組みを構築した背景

自分が運用に携わっている環境では、GitでDAGファイルをバージョン管理しています。また、GitHub上のmasterブランチとCloud Storageバケットのdagsディレクトリを自動で同期しています。GitHub上でmasterマージしたタイミングでGitHub Actionsにより自動同期するCDの仕組みを取り入れています。その為、リポジトリからDAGファイルを削除するとCloud Storageバケットからも削除されます。ただ、冒頭でも記載した通りWeb UIからも該当のDAGを削除したい場合は、別途削除処理が必要になります。そこで、Web UIからの削除もCDの流れに取り込むことにしました。

GitHub Actionsを使って、削除されたDAGを見つけ出しWeb UIから削除する処理は煩雑になります。自分が考えたのは、変更のコミットを含むブランチとmasterブランチとのdiffを取って削除されたDAGを見つけ出す方法です。この方法は、masterブランチを取得するためにdepth=0で全履歴をfetchする必要があり煩雑かつ、コミット履歴が多いリポジトリでは処理に時間が掛かります。

そこで、Gitによる差分抽出ではなく、Cloud ComposerとAirflow DBにあるメタ情報を使って差分抽出する方法を考えました。

自動削除の方法は以下になります。

  1. 実際に使用しているDAGの一覧をgcloudコマンドで取得する
  2. Web UIに表示されているDAGの一覧をAirflow DBのdagテーブルから取得する
  3. 以上の1と2の差分を取って削除されたDAGをWeb UIからも削除する

上記の処理をAirflowのDAGで定義しておいて、masterマージの時にGitHub Actionsからトリガー(trigger_dag)します。

処理をDAGで実装する理由は、Cloud Composer環境からだと上記2.のAirflow DBへのアクセスが簡単だからです。

DAGの実装

上記1〜3を実行するDAGの実装は以下のようになります。
タスクはPythonOperatorで実装しています。1.の処理はsubprocess.run()でgcloud composer ... list_dagsを実行して現在使用しているDAGの一覧を取得します。2.の処理ではSQLAlchemyを使って、Airflow DBにクエリSELECT dag_id FROM dag;を投げてWeb UI上に表示されるDAG一覧を取得します。最後に3.の処理ではsubprocess.run()でgcloud composer ... delete_dagを実行して差分のDAGを削除します。

from datetime import timedelta
import os
import subprocess

from airflow import DAG
from airflow.hooks.base_hook import BaseHook
from airflow.operators import PythonOperator
from sqlalchemy import create_engine


def get_remote_dags(composer_env, composer_loc):
    proc = subprocess.run(
        ['gcloud', 'composer', 'environments', 'run', composer_env,
         '--location', composer_loc, 'list_dags'],
        check=True,
        encoding='utf-8',
        stdout=subprocess.PIPE)
    ids = proc.stdout.split('-------------------------------------------------------------------')[2].splitlines()

    return {dag_id for dag_id in ids if dag_id != ''}


def get_dags_from_airflow_db():
    airflow_db_conn = BaseHook.get_connection('airflow_db')
    user = airflow_db_conn.login
    password = airflow_db_conn.password
    host = airflow_db_conn.host
    database = airflow_db_conn.schema
    engine = create_engine(
        f'mysql+mysqldb://{user}:{password}@{host}/{database}?charset=utf8')
    result = engine.execute('SELECT dag_id FROM dag;')

    return {row[0] for row in result}


def remove_unused_dags(**context):
    # 1. 現在使用しているDAGの一覧を取得
    dags_in_use = get_remote_dags(context['composer_env'], context['composer_loc'])
    # 2. Web UIに表示されているDAGの一覧を取得
    dags_in_db = get_dags_from_airflow_db()
    

    # 3. 1.と2.の差分を取得してWeb UIから不要なDAGを削除
    dags_to_remove = dags_in_db - dags_in_use
    for dag_id in dags_to_remove:
        proc = subprocess.run(
            ['gcloud', 'composer', 'environments', 'run', context['composer_env'],
             '--location', context['composer_loc'], 'delete_dag', '--', dag_id],
            check=True,
            encoding='utf-8',
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE)
        if proc.stdout:
            print(proc.stdout)
        if proc.stderr:
            print(proc.stderr)

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

with  DAG(
    'clean_up_web_ui',
    default_args=default_args,
    schedule_interval=None,
) as dag:
    _ = PythonOperator(
        task_id='remove_unused_dags',
        python_callable=remove_unused_dags,
        op_kwargs={
            'composer_env': os.environ['COMPOSER_ENVIRONMENT'],
	    'composer_loc': os.environ['COMPOSER_LOCATION'],
        },
        provide_context=True
    )

get_dags_from_airflow_db()の処理を見てわかる通り、Cloud ComposerにはデフォルトでAirflow DBへの接続情報がairflow_dbというConnectionとして登録されています。そのため、簡単にWeb UIに表示されているDAGの一覧を取得することができます。

masterマージの時にGitHub Actionsで、DAGファイルをCloud Storageバケットのdagsディレクトリに同期した後、このDAGをトリガーするようにすればWeb UIが常に正しいDAGの一覧を表示するようになります。


Airflowは運用していて不便なところが結構あります。特にCI/CDの仕組みを取り入れようとするとかなり自前の開発が必要になると思います。日々Airflowの運用で奮闘している皆さんにとって、この記事の内容が少しでも参考になれば幸いです。

Discussion