📌

AirflowのCustom XCom Backend触ってみた

2022/12/18に公開

気にはなってるけど触ってないビッグデータ系のツール・サービスを触る Advent Calendar 2022の#18です。

tl;dr

  • Airflowでは、Task Instance間のやり取りにXComを使えるよ
  • デフォルトでは、AirflowはXComのデータをメタデータデータベースに保存するよ
  • Custom XCom Backendを使うと、メタデータデータベース以外の場所に保存したり、間に処理を挟んだりできるよ

XComとは

公式ドキュメント曰く、

XComs (short for “cross-communications”) are a mechanism that let Tasks talk to each other, as by default Tasks are entirely isolated and may be running on entirely different machines.

です。Airflowに組み込まれているTask間のデータやり取りの機能で、

でデータをやり取りします。XComの使用例は、例えばAirflowに同梱されているexample_xcom.pyをご参考してください。

Airflow 1.10.12以前、および、それ以降のデフォルトの設定(BaseXCom)では、XComはメタデータデータベースに保存されます(コードで言うとここら辺)。

Custom XCom Backendとは

Airflow 1.10.12リリースされた、XComの保存をカスタマイズ出来るようにする機能です。
AirflowのXComに対応する、PrefectのResultsや、DagsterのAssetsは保存先をカスタマイズできるので、それに追随した機能とも言えそうです。

XComの保存をカスタマイズする動機としては、

  • データのサイズ制限
    • デフォルトではメタデータデータベース(RDB)に保存しますが、この時のカラムの型はSQLAlchemyのPickleTypeです。例えばMySQLではサイズ制限があります
  • 性能やコスト
    • メタデータデータベースより安いストレージを使いたい、逆に、メタデータデータベースより性能が良いストレージを使いたい等
  • バックアップ戦略
  • audit情報の記録
  • データの保存期間のコントロール
    • デフォルトのメタデータデータベースでは、ずっとデータを保存しつづけるはずです

などが考えられそうです。

触ってみる

Airflowが提供している開発用のdocker-composeで試してみます。Ubuntu 20.04 (Windows10のWSL2上)、Airflow2.5.0で試しました。

手順としては、

  1. Custom XCom Backendを定義するPythonファイルを作成
  2. 作成したCustom XCom Backendを使用するように、Airflowの設定(docker-compose.yaml)を変更
  3. 動作確認のプログラムを動かす
  4. Task Instanceのログやメタデータデータベースを確認

の流れで行います。

簡単なダミーのBackendと、ファイルに保存するBackendを試してみました。

Custom XCom Backendの定義(ダミー)

Custom XCom BacknedではBaseXComを継承したクラスを作り、

  • Task Instanceからデータを受け取り、メタデータデータベース以外への保存と、メタデータデータベースに保存する値の返却を行う、serialize_valueメソッド
    • 今回は使っていませんが、DAG ID・Task ID、XComのキーなども引数で受け取れるようです
  • メタデータデータベースのxcomテーブルから取得した値を受け取り、対応するデータを返すdeserialize_valueメソッド

の二つのメソッドを定義します。
Custom XCom Backendでもメタデータデータベースを使わないわけではなく、XComに関するメタデータを保存する点は注意が必要そうです。

設定を確認する意味で、適当な値を返すダミーのCustom XCom Backnedを記載し、dags/backend.pyに保存します。

from airflow.models.xcom import BaseXCom
class DummyBuckend(BaseXCom):
    @staticmethod
    def serialize_value(value):
        return BaseXCom.serialize_value("abc")

    @staticmethod
    def deserialize_value(result):
        return "31"

Airflowの設定

docker-compose.yamlの、x-airflow-common・environmentにAIRFLOW__CORE__XCOM_BACKENDを追加します。

  environment:
    &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
    AIRFLOW__CORE__FERNET_KEY: ''
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
    AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
    AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
    _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
    # 追加部分 
    AIRFLOW__CORE__XCOM_BACKEND: 'backend.DummyBackend'

コンテナを起動します。

docker-compose up

動作確認のプログラムを動かす

適当にXComに出し入れするDAGを作り、dagsディレクトリの下にxcom_test.pyという名前で記録します。

from datetime import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python_operator import PythonOperator

with DAG(
    'xcom_test',
    schedule_interval=None,
    start_date=datetime(2021, 1, 1),
    catchup=False,
) as dag:

    t1 = BashOperator(
        task_id='t1',
        bash_command='echo t1',
        do_xcom_push=True
    )

    t2 = BashOperator(
        task_id='t2',
        bash_command='echo {{ ti.xcom_pull(task_ids="t1") }}',
    )
    t1 >> t2


    push = PythonOperator(
        task_id="push",
        python_callable=lambda:3,
        do_xcom_push=True
    )

    pull = PythonOperator(
        task_id="pull",
        python_callable=lambda ti:print(ti.xcom_pull(task_ids="push")),
    )


    push >> pull

動かしてみる

Airflow WebUI(http://localhost:8080)にアクセスし、xcom_testのpauseを解除・トリガーします。

XComをpullしているTask(t2とpull)のログを見ると、意図通りダミーの値(31)が取得されています。

t2のログ(抜粋)

[2022-12-17, 22:03:11 UTC] {subprocess.py:75} INFO - Running command: ['/bin/bash', '-c', 'echo 31']
[2022-12-17, 22:03:11 UTC] {subprocess.py:86} INFO - Output:
[2022-12-17, 22:03:11 UTC] {subprocess.py:93} INFO - 31

pullのログ(抜粋)

[2022-12-17, 22:03:11 UTC] {logging_mixin.py:137} INFO - 31

メタデータデータベースの中も見てみます。

docker exec -it  airflow-postgres-1  psql -h 127.0.0.1 -p 5432 -U airflow
airflow=# SELECT * FROM xcom ORDER BY timestamp DESC LIMIT 2;
 dag_run_id | task_id |     key      |    value     |          timestamp           |  dag_id   |                  run_id                  | map_index
------------+---------+--------------+--------------+------------------------------+-----------+------------------------------------------+-----------
         58 | t2      | return_value | \x2261626322 | 2022-12-17 22:03:11.39103+00 | xcom_test | manual__2022-12-17T22:03:09.075052+00:00 |        -1
         58 | t1      | return_value | \x2261626322 | 2022-12-17 22:03:10.1942+00  | xcom_test | manual__2022-12-17T22:03:09.075052+00:00 |        -1
(2 rows)

格納する値(serialize_valueの戻り値)が文字列の場合、文字列の値ががbytea型で入っています(Airflowの処理で言うとここらへん)。キャストするとserialize_valueで設定した値が入っていることを確認できます。

airflow=# SELECT value,encode(value, 'escape')  FROM xcom ORDER BY timestamp DESC LIMIT 2;
    value     | encode
--------------+--------
 \x2261626322 | "abc"
 \x2261626322 | "abc"
(2 rows)

Custom XCom Backendの定義(ファイルに保存)

DummyBuckendはあまりに適当なので、もう少し本番っぽい処理のBackendとして、ファイルシステムに保存するBackendを作ってみます。

このBackendでは、

  • xcom_push(serialize_value)では、UUIDでファイル名を決め、そこにPickle化したデータを保存
  • xcom_pull(deserialize_value)では、引数(メタデータデータベースの値)から取得対象のファイル名を決めて、ファイルの中身からデータを復元

しています。

なお、Pickleを使っていますが、外部で取得したデータを保存する場合はセキュリティ上の問題で使わないほうが良いかもしれません(詳しくは例えばこの記事)。

from airflow.models.xcom import BaseXCom
import pickle
import uuid
import os


class LocalFSBackend(BaseXCom):
    @staticmethod
    def serialize_value(value):
        key =  'data_' + str(uuid.uuid4())
        print(f'saving  {str(value)} to {key}')
        with open(os.path.join('/opt/airflow/xcom',key), 'wb') as f:
            pickle.dump(value, f)
        return BaseXCom.serialize_value(key)

    @staticmethod
    def deserialize_value(result):
        key = BaseXCom.deserialize_value(result)
        print(f'restoring from  {key}')
        with open(os.path.join('/opt/airflow/xcom', key), 'rb') as f:
            result = pickle.load(f)

        return result

docker-compose.yamlも合わせて変更します

  environment:
    &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
    AIRFLOW__CORE__FERNET_KEY: ''
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
    AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
    AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
    _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
    # 変更部分
    AIRFLOW__CORE__XCOM_BACKEND: 'backend.LocalFSBackend'
  volumes:
    - ./dags:/opt/airflow/dags
    - ./logs:/opt/airflow/logs
    - ./plugins:/opt/airflow/plugins
      # 変更部分
    - ./xcom:/opt/airflow/xcom

保存先のディレクトリを作り(下)、Airflowクラスタを再起動(docker-composeを停止・再度up)します。

mkdir xcom
# 起動していたdocker-composeコマンドを止めた後
docker-compose up

XComをpushしているTask(t1とpush)を見ると、LocalFSBackendが呼ばれ、ファイルに保存している旨のログが記録されています。

t1のログ(抜粋)

[2022-12-17, 22:45:21 UTC] {logging_mixin.py:137} INFO - saving  t1 to data_c202d5a2-65f4-47fe-99ea-bf64d5425366

pushのログ(抜粋)

[2022-12-17, 22:45:21 UTC] {logging_mixin.py:137} INFO - saving  3 to data_1d232c2e-e29f-42f3-aa95-392c4ead6d57

XComをpullしているTask(t2とpull)のログを見ると、意図通りの値(t1とpushがxcom_pushした値)が取得されています。

t2のログ(抜粋)

[2022-12-17, 22:45:22 UTC] {logging_mixin.py:137} INFO - restoring from  data_c202d5a2-65f4-47fe-99ea-bf64d5425366
[2022-12-17, 22:45:22 UTC] {subprocess.py:86} INFO - Output:
[2022-12-17, 22:45:22 UTC] {subprocess.py:93} INFO - t1

pullのログ(抜粋)

[2022-12-17, 22:45:22 UTC] {logging_mixin.py:137} INFO - restoring from  data_1d232c2e-e29f-42f3-aa95-392c4ead6d57
[2022-12-17, 22:45:22 UTC] {logging_mixin.py:137} INFO - 3

メタデータのデータベースを見てると、狙い通りデータそのものではなく、ファイル名(serialize_valueの戻り値)が保存されています。

docker exec -it  airflow-postgres-1  psql -h 127.0.0.1 -p 5432 -U airflow
airflow=# SELECT value,encode(value, 'escape') FROM xcom ORDER BY timestamp DESC LIMIT 2;
                                          value                                           |                   encode
------------------------------------------------------------------------------------------+---------------------------------------------
 \x22646174615f64623666343333302d306435392d343930392d616532612d61663738346466333364643122 | "data_db6f4330-0d59-4909-ae2a-af784df33dd1"
 \x22646174615f63323032643561322d363566342d343766652d393965612d62663634643534323533363622 | "data_c202d5a2-65f4-47fe-99ea-bf64d5425366"
(2 rows)

ファイルを見るためには、pickle化したファイルなので、Pythonからpickle.loadを実行します。

with open('xcom/data_c202d5a2-65f4-47fe-99ea-bf64d5425366', 'rb') as t1_fp, open('xcom/data_1d232c2e-e29f-42f3-aa95-392c4ead6d57', 'rb') as push_fp:
   ...:     print(pickle.load(t1_fp))
   ...:     print(pickle.load(push_fp))
   ...:
t1
3

よさそうですね。

(余談)DAGやTask毎にCustom XCom Backendを使い分けたい時

DAGやTask、XCom毎にCustom XCom Backendを使い分けたい場合もあるかと思います(機密性で場所を変えたり、コスト最適化など)。

DagsterのAsset(IOManager)やPrefectのResultは処理毎に保存先を変更できそうですが、(調べた限り)AirflowのCustom XCom BackendはAirflowクラスター単位の設定なようです(Stackoverflow)。

DAGやTask、XCom毎に変更する場合は、

  • 窓口となるCustom XCom Backendを一つ作成
  • serialize_value・desrialize_valueでDAG ID・Task ID、XComのキーなど応じて、実際の処理を行う、別のCustom XCom Backendに横流し

すると良さそうです。

Discussion