🦴

AirflowのKubernetesPodOperatorを試してみる

2022/04/30に公開
3

KubernetesPodOperatorとは?

  • Kubernetes Cluster上でPodを作り実行できるAirflowのOperator。
  • Kubernetes Executorとは異なる。

どうやって動くの?

事前準備

簡易的なPodを作ってみる

作成したいPodのyamlイメージ

apiVersion: v1
kind: Pod
metadata:
  labels:
    foo: bar
  name: hello-pod-work
spec:
  containers:
  - image: debian
    name: hello-pod-work

DAGサンプル

  • main.py
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
    KubernetesPodOperator,
)

default_args = {
    'owner': '467',
    'depends_on_past': False,
    'start_date': days_ago(2),
}

with DAG(
    'test_kubernetes_pod_operator_work',
    default_args=default_args,
    description='KubernetesPodOperatorを試す',
    tags=["work"],
) as dag:
    dag.doc_md = """
    KubernetesPodOperatorを試す
    """
    k = KubernetesPodOperator(
        namespace='default',
        name="hello-pod-work",
        image="debian",
        labels={"foo": "bar"},
        task_id="dry_run_demo",
        do_xcom_push=False,
        in_cluster=False,
    )
  • DAG実行後,Podは削除されている
$ k get pod
No resources found in default namespace.

Secretをmountしてみる

  • 事前に2つのsecretを準備しておく
$ k create secret generic airflow-secrets --from-literal=sql_alchemy_conn=hoge
secret/airflow-secrets created
$ k create secret generic airflow-secrets-2 --from-literal=sql_alchemy_conn2=hoge2
secret/airflow-secrets-2 created

作成したいPodのyamlイメージ

apiVersion: v1
kind: Pod
metadata:
  labels:
    foo: bar
  name: hello-pod-work
spec:
  containers:
  - image: debian
    name: hello-pod-work
    volumeMounts:
    - mountPath: /etc/sql_conn
      name: hoge
      readOnly: true
    env:
    - name: SQL_CONN
      valueFrom:
        secretKeyRef:
          name: airflow-secrets
          key: sql_alchemy_conn
    envFrom:
    - secretRef:
        name: airflow-secrets-2
  volumes:
  - name: hoge
    secret:
      secretName: airflow-secrets

DAGサンプル

  • main.py
    • 追加or変更した部分のみ記載
from kubernetes_pod_operator_work.dags import config_storage_apis
・・・
    k = KubernetesPodOperator(
        namespace='default',
        name="hello-pod-work",
        image="debian",
        labels={"foo": "bar"},
        task_id="dry_run_demo",
        secrets=[
            config_storage_apis.secret_file(),
            config_storage_apis.secret_env(),
            config_storage_apis.secret_all_keys()],
	do_xcom_push=False,
        in_cluster=False,
    )
  • config_storage_apis.py
from airflow.kubernetes.secret import Secret

def secret_file():
    return Secret(
        deploy_type='volume', deploy_target='/etc/sql_conn',
        secret='airflow-secrets'
        )

def secret_env():
    return Secret(
        deploy_type='env', deploy_target='SQL_CONN',
        secret='airflow-secrets', key='sql_alchemy_conn'
	)

def secret_all_keys():
    return Secret(
        deploy_type='env', deploy_target=None,
        secret='airflow-secrets-2'
	)

PersistentVolumeClaim(PVC)をmountしてみる

  • 事前にSTATUSBoundになっているpvcを準備しておく
$ k get pvc
NAME                  STATUS    VOLUME           CAPACITY   ACCESS MODES   STORAGECLASS    AGE
my-pvc                Bound     my-pv-hostpath   1Gi        RWO            manual          3s
$ k get pv
NAME             CAPACITY   ACCESS MODES   RECLAIM POLICY   STATUS      CLAIM            STORAGECLASS    REASON   AGE
my-pv-hostpath   1Gi        RWO            Retain           Bound       default/my-pvc   manual                   2m24s

作成したいPodのyamlイメージ

apiVersion: v1
kind: Pod
metadata:
  labels:
    foo: bar
  name: hello-pod-work
spec:
  containers:
  - image: debian
    name: hello-pod-work
    volumeMounts:
    - name: test-volume
      mountPath: /root/mount_file
  volumes:
  - name: test-volume
    persistentVolumeClaim:
        claimName: my-pvc

DAGサンプル

  • main.py
    • 追加or変更した部分のみ記載
    k = KubernetesPodOperator(
        namespace='default',
        name="hello-pod-work",
        image="debian",
        labels={"foo": "bar"},
        task_id="dry_run_demo",
        volumes=[config_storage_apis.volume()],
        volume_mounts=[config_storage_apis.volume_mount()],
        do_xcom_push=False,
        in_cluster=False,
    )
  • config_storage_apis.py
    • 追加or変更した部分のみ記載
from kubernetes.client import models as k8s

def volume_mount():
    return k8s.V1VolumeMount(
        mount_path='/root/mount_file', name='test-volume',
        read_only=True, sub_path=None, sub_path_expr=None,
        mount_propagation=None
        )

def volume():
    return k8s.V1Volume(
        name='test-volume',
        persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(
            claim_name='my-pvc'),
        )

Configmapをmountしてみる

  • 事前にconfigmapを準備しておく
$ k create configmap test-configmap-1 --from-literal=key1=value1
configmap/test-configmap-1 created
$ k create configmap test-configmap-2 --from-literal=key2=value2
configmap/test-configmap-2 created

作成したいPodのyamlイメージ

apiVersion: v1
kind: Pod
metadata:
  labels:
    foo: bar
  name: hello-pod-work
spec:
  containers:
  - image: debian
    name: hello-pod-work
    envFrom:
    - configMapRef:
        name: test-configmap-1
    - configMapRef:
        name: test-configmap-2

DAGサンプル

  • main.py
    • 追加or変更した部分のみ記載
    k = KubernetesPodOperator(
        namespace='default',
        name="hello-pod-work",
        image="debian",
        labels={"foo": "bar"},
        task_id="dry_run_demo",
        env_from=config_storage_apis.configmaps(),
        do_xcom_push=False,
        in_cluster=False,
    )
  • config_storage_apis.py
    • 追加or変更した部分のみ記載
def configmaps():
    return [
        k8s.V1EnvFromSource(
		config_map_ref=k8s.V1ConfigMapEnvSource(
			name='test-configmap-1')
		),
        k8s.V1EnvFromSource(
		config_map_ref=k8s.V1ConfigMapEnvSource(
			name='test-configmap-2')
		),
        ]

Pod内に複数のコンテナをつくる

作成したいPodのyamlイメージ

apiVersion: v1
kind: Pod
metadata:
  labels:
    run: share-pod
  name: share-pod
spec:
  containers:
  - image: busybox
    name: container1
  - image: busybox
    name: container2

DAGサンプル

  • 方法はfull_pod_specパラメータを使用する方法とpod_template_fileパラメータを使用する方法がある。
  • main.py
    • 追加or変更した部分のみ記載
c1 = k8s.V1Container(
    name="container1",
    image="busybox",
)
c2 = k8s.V1Container(
    name="container2",
    image="busybox",
)

p = k8s.V1Pod(
    api_version="v1",
    kind="Pod",
    metadata=k8s.V1ObjectMeta(
        namespace="default",
        name="share-pod"
    ),
    spec=k8s.V1PodSpec(
        restart_policy='Never',
        containers=[c1, c2],
    )
)

pod_template_file = """
apiVersion: v1
kind: Pod
metadata:
  labels:
    run: share-pod
  name: share-pod
spec:
  containers:
  - image: busybox
    name: container1
  - image: busybox
    name: container2
  restartPolicy: Always
"""

・・・
    # full_pod_specパラメータを使用する方法
    k2 = KubernetesPodOperator(
        full_pod_spec=p,
        task_id="hello-pod-work2",
        do_xcom_push=False,
        in_cluster=False,
    )
    
    # pod_template_fileパラメータを使用する方法
    k3 = KubernetesPodOperator(
        namespace='default',
        pod_template_file=pod_template_file,
        task_id="hello-pod-work3",
        do_xcom_push=False,
        in_cluster=False,
    )

他にもKubernetesOperatorでできること

2022.04.27確認 個人的に使うものの列挙

  • Podへの[ports,NodeSelector,Affinity,ServiceAcountName,SecurityContext,initContainer]の設定

実行結果をXComに入れたいときは/airflow/xcom/return.jsonへ結果をリダイレクトする

    k = KubernetesPodOperator(
        namespace='default',
        name="hello-pod-work",
        image="debian",
        cmds=[
		"sh",
		"-c",
		"mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"],
        labels={"foo": "bar"},
        task_id="dry_run_demo",
        do_xcom_push=True,
        in_cluster=False,
    )

XComへのPushは自動に作られるサイドカーコンテナを通してPushされる。

  • auditlogを見るとサイドカーコンテナが作られていることを確認できる
    • k get eventでも確認できる
$ cat /etc/kubernetes/audit/logs/audit.log  | grep hello-pod-work | grep ResponseComplete | grep create | jq '.requestObject.involvedObject'
{
  "kind": "Pod",
  "namespace": "default",
  "name": "hello-pod-work.8d708c7d53d84a3c9c11c64871ae2c5d",
  "uid": "fab4e8ef-66bd-4cd8-99c0-5d472aac1657",
  "apiVersion": "v1",
  "resourceVersion": "11120850",
  "fieldPath": "spec.containers{base}"
}
・・・
{
  "kind": "Pod",
  "namespace": "default",
  "name": "hello-pod-work.8d708c7d53d84a3c9c11c64871ae2c5d",
  "uid": "fab4e8ef-66bd-4cd8-99c0-5d472aac1657",
  "apiVersion": "v1",
  "resourceVersion": "11120850",
  "fieldPath": "spec.containers{airflow-xcom-sidecar}"
}
  • ちなみに、do_xcom_push=Trueにもかかわらず/airflow/xcom/return.jsonへ結果がリダイレクトされていない場合,自動に作られるサイドカーコンテナが/airflow/xcom/return.json Not Foundでエラーが発生する。

実行するとXcomに実行結果がPushされていることを確認できる

debugしたいときはdry_run()を使う

  • k run hoge --dry-run=client -o yamlを実行した結果と同じことをAirflowからも確認できる。
  • 上記DAGにdry_runを追加して出力してみる。

DAGサンプル

  • main.py
    • 追加or変更した部分のみ記載
・・・
from airflow.operators.python_operator import PythonOperator
・・・
    k = KubernetesPodOperator(
        namespace='default',
        name="hello-pod-work",
        image="debian",
	# Xcomにpushする場合、pushした値を/airflow/xcom/return.jsonへリダイレクトする
        cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"],
        labels={"foo": "bar"},
        task_id="dry_run_demo",
        do_xcom_push=True,
        in_cluster=False,
    )
 
        def print_dry_run(*kwargs):
        print(k.dry_run())
    
    o = PythonOperator(
        task_id='print_dry_run',
        python_callable=print_dry_run,
        dag=dag,
    )
    
    [k, o]

実行されるyamlが確認できる

  • task_id,print_dry_runのlogを見ると確認できる。
    • apiVersionapi_versionとなっていたりrestartPolicyrestart_policyとなっていたりと、そのままコピーしてyamlファイルとして実行ということはできない。
[2022-04-26 03:36:31,817] {logging_mixin.py:104} INFO - api_version: v1
kind: Pod
metadata:
  labels:
    airflow_version: 2.1.0
    foo: bar
    kubernetes_pod_operator: 'True'
  name: hello-dry-run-work.afd4862b480a46b1b2a37f24d7627375
  namespace: default
spec:
  containers:
  - command:
    - sh
    - -c
    - mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json
    image: debian
    name: base
    volume_mounts:
    - mount_path: /airflow/xcom
      name: xcom
  # KubernetesPodOperator内で`do_xcom_push=True`をすると、Sidercarコンテナが作られる。
  - command:
    - sh
    - -c
    - trap "exit 0" INT; while true; do sleep 1; done;
    image: alpine
    name: airflow-xcom-sidecar
    resources:
      requests:
        cpu: 1m
    volume_mounts:
    - mount_path: /airflow/xcom
      name: xcom
  host_network: false
  restart_policy: Never
  volumes:
  - name: xcom

まとめ

ドキュメントとコードを見つつ色々とできることを探ってみました。環境のコンテナ化さえできれば、割と何でもAirflow上で動くのすごいなあと思いました。作業環境上にk8s環境があるうちにまた色々探ってみたいです。

参考

20230104追記

TaskFlowAPI利用バージョンも書いてみました。https://467tn.com/post/content7/#taskkubernetesを試してみる

Discussion

Ry1eRy1e

ConfigMapを環境変数としてではなく、ファイルとしてマウントする方法をご存知でしょうか。

白菜白菜

お返事遅くなりすみません。
以下を定義し、

volume = k8s.V1VolumeMount(
    mount_path='/root/mount_file', name='test-volume',
    read_only=True, sub_path=None
)
volume_mount = k8s.V1Volume(
    name='test-volume',
    config_map=k8s.V1ConfigMapVolumeSource(
    name='{{configmapのname}}'),
)

KubernetesPodOperatorの引数に下を追加すれば

volumes=[volume],
volume_mounts=[volume_mount],

できるかと思います!
https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1KeyToPath.md

Ry1eRy1e

ありがとうございます!
typoかと思いますが、volumeとvolume_mountの変数名を逆にすると動きました!
大変助かりました🙇‍♂️