🦴
AirflowのKubernetesPodOperatorを試してみる
KubernetesPodOperatorとは?
- Kubernetes Cluster上でPodを作り実行できるAirflowのOperator。
- Kubernetes Executorとは異なる。
どうやって動くの?
- Operator内でkubernetes-clientを使用して動いている。
- Clusterの設定は
airflow.cfg
内にあるconfig_file
パラメータを使う。- 設定されていない場合は
~/.kube/config
- 設定されていない場合は
- YAMLの定義は
airflow.cfg
内にあるpod_template_file
パラメータにも定義できる。
事前準備
簡易的なPodを作ってみる
作成したいPodのyamlイメージ
apiVersion: v1
kind: Pod
metadata:
labels:
foo: bar
name: hello-pod-work
spec:
containers:
- image: debian
name: hello-pod-work
DAGサンプル
- main.py
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
KubernetesPodOperator,
)
default_args = {
'owner': '467',
'depends_on_past': False,
'start_date': days_ago(2),
}
with DAG(
'test_kubernetes_pod_operator_work',
default_args=default_args,
description='KubernetesPodOperatorを試す',
tags=["work"],
) as dag:
dag.doc_md = """
KubernetesPodOperatorを試す
"""
k = KubernetesPodOperator(
namespace='default',
name="hello-pod-work",
image="debian",
labels={"foo": "bar"},
task_id="dry_run_demo",
do_xcom_push=False,
in_cluster=False,
)
- DAG実行後,Podは削除されている
$ k get pod
No resources found in default namespace.
Secretをmountしてみる
- 事前に2つのsecretを準備しておく
$ k create secret generic airflow-secrets --from-literal=sql_alchemy_conn=hoge
secret/airflow-secrets created
$ k create secret generic airflow-secrets-2 --from-literal=sql_alchemy_conn2=hoge2
secret/airflow-secrets-2 created
作成したいPodのyamlイメージ
apiVersion: v1
kind: Pod
metadata:
labels:
foo: bar
name: hello-pod-work
spec:
containers:
- image: debian
name: hello-pod-work
volumeMounts:
- mountPath: /etc/sql_conn
name: hoge
readOnly: true
env:
- name: SQL_CONN
valueFrom:
secretKeyRef:
name: airflow-secrets
key: sql_alchemy_conn
envFrom:
- secretRef:
name: airflow-secrets-2
volumes:
- name: hoge
secret:
secretName: airflow-secrets
DAGサンプル
- main.py
- 追加or変更した部分のみ記載
from kubernetes_pod_operator_work.dags import config_storage_apis
・・・
k = KubernetesPodOperator(
namespace='default',
name="hello-pod-work",
image="debian",
labels={"foo": "bar"},
task_id="dry_run_demo",
secrets=[
config_storage_apis.secret_file(),
config_storage_apis.secret_env(),
config_storage_apis.secret_all_keys()],
do_xcom_push=False,
in_cluster=False,
)
- config_storage_apis.py
from airflow.kubernetes.secret import Secret
def secret_file():
return Secret(
deploy_type='volume', deploy_target='/etc/sql_conn',
secret='airflow-secrets'
)
def secret_env():
return Secret(
deploy_type='env', deploy_target='SQL_CONN',
secret='airflow-secrets', key='sql_alchemy_conn'
)
def secret_all_keys():
return Secret(
deploy_type='env', deploy_target=None,
secret='airflow-secrets-2'
)
PersistentVolumeClaim(PVC)をmountしてみる
- 事前に
STATUS
がBound
になっているpvcを準備しておく
$ k get pvc
NAME STATUS VOLUME CAPACITY ACCESS MODES STORAGECLASS AGE
my-pvc Bound my-pv-hostpath 1Gi RWO manual 3s
$ k get pv
NAME CAPACITY ACCESS MODES RECLAIM POLICY STATUS CLAIM STORAGECLASS REASON AGE
my-pv-hostpath 1Gi RWO Retain Bound default/my-pvc manual 2m24s
作成したいPodのyamlイメージ
apiVersion: v1
kind: Pod
metadata:
labels:
foo: bar
name: hello-pod-work
spec:
containers:
- image: debian
name: hello-pod-work
volumeMounts:
- name: test-volume
mountPath: /root/mount_file
volumes:
- name: test-volume
persistentVolumeClaim:
claimName: my-pvc
DAGサンプル
- main.py
- 追加or変更した部分のみ記載
k = KubernetesPodOperator(
namespace='default',
name="hello-pod-work",
image="debian",
labels={"foo": "bar"},
task_id="dry_run_demo",
volumes=[config_storage_apis.volume()],
volume_mounts=[config_storage_apis.volume_mount()],
do_xcom_push=False,
in_cluster=False,
)
- config_storage_apis.py
- 追加or変更した部分のみ記載
from kubernetes.client import models as k8s
def volume_mount():
return k8s.V1VolumeMount(
mount_path='/root/mount_file', name='test-volume',
read_only=True, sub_path=None, sub_path_expr=None,
mount_propagation=None
)
def volume():
return k8s.V1Volume(
name='test-volume',
persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(
claim_name='my-pvc'),
)
Configmapをmountしてみる
- 事前にconfigmapを準備しておく
$ k create configmap test-configmap-1 --from-literal=key1=value1
configmap/test-configmap-1 created
$ k create configmap test-configmap-2 --from-literal=key2=value2
configmap/test-configmap-2 created
作成したいPodのyamlイメージ
apiVersion: v1
kind: Pod
metadata:
labels:
foo: bar
name: hello-pod-work
spec:
containers:
- image: debian
name: hello-pod-work
envFrom:
- configMapRef:
name: test-configmap-1
- configMapRef:
name: test-configmap-2
DAGサンプル
- main.py
- 追加or変更した部分のみ記載
k = KubernetesPodOperator(
namespace='default',
name="hello-pod-work",
image="debian",
labels={"foo": "bar"},
task_id="dry_run_demo",
env_from=config_storage_apis.configmaps(),
do_xcom_push=False,
in_cluster=False,
)
- config_storage_apis.py
- 追加or変更した部分のみ記載
def configmaps():
return [
k8s.V1EnvFromSource(
config_map_ref=k8s.V1ConfigMapEnvSource(
name='test-configmap-1')
),
k8s.V1EnvFromSource(
config_map_ref=k8s.V1ConfigMapEnvSource(
name='test-configmap-2')
),
]
Pod内に複数のコンテナをつくる
作成したいPodのyamlイメージ
apiVersion: v1
kind: Pod
metadata:
labels:
run: share-pod
name: share-pod
spec:
containers:
- image: busybox
name: container1
- image: busybox
name: container2
DAGサンプル
- 方法は
full_pod_spec
パラメータを使用する方法とpod_template_file
パラメータを使用する方法がある。 - main.py
- 追加or変更した部分のみ記載
c1 = k8s.V1Container(
name="container1",
image="busybox",
)
c2 = k8s.V1Container(
name="container2",
image="busybox",
)
p = k8s.V1Pod(
api_version="v1",
kind="Pod",
metadata=k8s.V1ObjectMeta(
namespace="default",
name="share-pod"
),
spec=k8s.V1PodSpec(
restart_policy='Never',
containers=[c1, c2],
)
)
pod_template_file = """
apiVersion: v1
kind: Pod
metadata:
labels:
run: share-pod
name: share-pod
spec:
containers:
- image: busybox
name: container1
- image: busybox
name: container2
restartPolicy: Always
"""
・・・
# full_pod_specパラメータを使用する方法
k2 = KubernetesPodOperator(
full_pod_spec=p,
task_id="hello-pod-work2",
do_xcom_push=False,
in_cluster=False,
)
# pod_template_fileパラメータを使用する方法
k3 = KubernetesPodOperator(
namespace='default',
pod_template_file=pod_template_file,
task_id="hello-pod-work3",
do_xcom_push=False,
in_cluster=False,
)
他にもKubernetesOperatorでできること
2022.04.27確認 個人的に使うものの列挙
- Podへの[ports,NodeSelector,Affinity,ServiceAcountName,SecurityContext,initContainer]の設定
/airflow/xcom/return.json
へ結果をリダイレクトする
実行結果をXComに入れたいときは k = KubernetesPodOperator(
namespace='default',
name="hello-pod-work",
image="debian",
cmds=[
"sh",
"-c",
"mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"],
labels={"foo": "bar"},
task_id="dry_run_demo",
do_xcom_push=True,
in_cluster=False,
)
XComへのPushは自動に作られるサイドカーコンテナを通してPushされる。
- auditlogを見るとサイドカーコンテナが作られていることを確認できる
-
k get event
でも確認できる
-
$ cat /etc/kubernetes/audit/logs/audit.log | grep hello-pod-work | grep ResponseComplete | grep create | jq '.requestObject.involvedObject'
{
"kind": "Pod",
"namespace": "default",
"name": "hello-pod-work.8d708c7d53d84a3c9c11c64871ae2c5d",
"uid": "fab4e8ef-66bd-4cd8-99c0-5d472aac1657",
"apiVersion": "v1",
"resourceVersion": "11120850",
"fieldPath": "spec.containers{base}"
}
・・・
{
"kind": "Pod",
"namespace": "default",
"name": "hello-pod-work.8d708c7d53d84a3c9c11c64871ae2c5d",
"uid": "fab4e8ef-66bd-4cd8-99c0-5d472aac1657",
"apiVersion": "v1",
"resourceVersion": "11120850",
"fieldPath": "spec.containers{airflow-xcom-sidecar}"
}
- ちなみに、
do_xcom_push=True
にもかかわらず/airflow/xcom/return.json
へ結果がリダイレクトされていない場合,自動に作られるサイドカーコンテナが/airflow/xcom/return.json Not Found
でエラーが発生する。
実行するとXcomに実行結果がPushされていることを確認できる
debugしたいときはdry_run()を使う
-
k run hoge --dry-run=client -o yaml
を実行した結果と同じことをAirflowからも確認できる。 - 上記DAGにdry_runを追加して出力してみる。
DAGサンプル
- main.py
- 追加or変更した部分のみ記載
・・・
from airflow.operators.python_operator import PythonOperator
・・・
k = KubernetesPodOperator(
namespace='default',
name="hello-pod-work",
image="debian",
# Xcomにpushする場合、pushした値を/airflow/xcom/return.jsonへリダイレクトする
cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"],
labels={"foo": "bar"},
task_id="dry_run_demo",
do_xcom_push=True,
in_cluster=False,
)
def print_dry_run(*kwargs):
print(k.dry_run())
o = PythonOperator(
task_id='print_dry_run',
python_callable=print_dry_run,
dag=dag,
)
[k, o]
実行されるyamlが確認できる
- task_id,print_dry_runのlogを見ると確認できる。
-
apiVersion
がapi_version
となっていたりrestartPolicy
がrestart_policy
となっていたりと、そのままコピーしてyamlファイルとして実行ということはできない。
-
[2022-04-26 03:36:31,817] {logging_mixin.py:104} INFO - api_version: v1
kind: Pod
metadata:
labels:
airflow_version: 2.1.0
foo: bar
kubernetes_pod_operator: 'True'
name: hello-dry-run-work.afd4862b480a46b1b2a37f24d7627375
namespace: default
spec:
containers:
- command:
- sh
- -c
- mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json
image: debian
name: base
volume_mounts:
- mount_path: /airflow/xcom
name: xcom
# KubernetesPodOperator内で`do_xcom_push=True`をすると、Sidercarコンテナが作られる。
- command:
- sh
- -c
- trap "exit 0" INT; while true; do sleep 1; done;
image: alpine
name: airflow-xcom-sidecar
resources:
requests:
cpu: 1m
volume_mounts:
- mount_path: /airflow/xcom
name: xcom
host_network: false
restart_policy: Never
volumes:
- name: xcom
まとめ
ドキュメントとコードを見つつ色々とできることを探ってみました。環境のコンテナ化さえできれば、割と何でもAirflow上で動くのすごいなあと思いました。作業環境上にk8s環境があるうちにまた色々探ってみたいです。
参考
20230104追記
TaskFlowAPI利用バージョンも書いてみました。https://467tn.com/post/content7/#taskkubernetesを試してみる
Discussion
ConfigMapを環境変数としてではなく、ファイルとしてマウントする方法をご存知でしょうか。
お返事遅くなりすみません。
以下を定義し、
KubernetesPodOperatorの引数に下を追加すれば
できるかと思います!
ありがとうございます!
typoかと思いますが、volumeとvolume_mountの変数名を逆にすると動きました!
大変助かりました🙇♂️