Argo WorkflowsでEmbulkを動かしてみる
これは何か
今更だがArgo Workflowsのキャッチアップをすべく、ローカル環境でArgo Workflowsを使ってembulkを動かして見たので、そこで得た知見をまとめていく。意外と日本語で体系的にまとまっているドキュメントが少なくしんどかった。
Argo Workflowsとは何か
Kubernetes上で動作するワークフローエンジン。コンテナを用いてジョブを記述でき、ワークフローを定義できる。環境依存性がある処理をコンテナごとに管理できるため、ソフトウェア構成が簡素化できるらしい。基本的な使い方については割愛。
前提
ローカルでkubernetesクラスタのセットアップはしておく。
やりたいこと
とりあえずembulkでローカルにあるcsvファイルを読み取って、内容をフィルターして標準出力に出すところまでやってみた。
準備
Argo Workflowsのインストール
# Namespaceの作成
kubectl create ns argo
# 作成したNamespaceにArgoをインストールする
kubectl apply -n argo -f https://raw.githubusercontent.com/argoproj/argo/stable/manifests/quick-start-postgres.yaml
成功したら以下のpodが立ち上がる。
NAME READY STATUS RESTARTS AGE
argo-server-74d5fbc96f-9gc2n 1/1 Running 0 2m2s
minio-58977b4b48-m9c5l 1/1 Running 0 2m1s
postgres-6b5c55f477-pbbv8 1/1 Running 0 2m1s
workflow-controller-7fb47d49bb-t2mrb 1/1 Running 0 2m
Argo CLIのインストール
ここのページを見つつ、最新版のCLIをインストールする。
$ argo version
argo: v3.3.6
BuildDate: 2022-05-26T01:07:24Z
GitCommit: 2b428be8001a9d5d232dbd52d7e902812107eb28
GitTreeState: clean
GitTag: v3.3.6
GoVersion: go1.17.10
Compiler: gc
Platform: darwin/amd64
実装
ディレクトリ構成は以下。
.
├── Dockerfile
├── argo
│ ├── workflow.yaml
│ └── workflow_template.yaml
└── data
└── customer.csv.gz
DockerfileにEmbulkの実行環境を記述する。フィルタリングに必要なpluginも同時にインストールした。
FROM openjdk:8-jre-alpine
ARG VERSION
RUN mkdir -p /root/.embulk/bin \
&& wget -q https://dl.embulk.org/embulk-${VERSION}.jar -O /root/.embulk/bin/embulk \
&& chmod +x /root/.embulk/bin/embulk
ENV PATH=$PATH:/root/.embulk/bin
RUN apk add --no-cache libc6-compat && embulk gem install embulk-filter-column
DockerfileはイメージとしてArgoから呼び出せるように登録しておく。2022年6月段階では0.9.24がstableなのでそれを使っている。
docker build . -t embulk:v0.1 --build-arg VERSION=0.9.24
Embulkを実行する Templateは以下のように記述した。Argo Workflowはこのように処理をテンプレ化させておいて、各所で再利用することが可能らしい。
apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
name: embulk-template
spec:
entrypoint: embulk
templates:
- name: embulk
inputs:
artifacts:
- name: embulk-config
path: /input/config.yml
container:
image: embulk:v0.1
command: [java]
args: ["-jar", "/root/.embulk/bin/embulk", "run", "/input/config.yml"]
volumeMounts:
- name: data
mountPath: /data
volumes:
- name: data
configmap:
name: customer-data
dataはConfigmapで渡した。
kubectl -n argo create configmap customer-data --from-file=data/customer.csv.gz
embulkを実行する本体の処理は以下。embulkのconfigはとりあえずartifactsとして渡す。これも外部ファイルから呼び出せないか後で検討したい。
workflowTemplateRef
で作成したテンプレートを参照することができる。
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: csv-to-stdout-sample
spec:
entrypoint: embulk
arguments:
artifacts:
- name: embulk-config
raw:
data: |
in:
type: file
path_prefix: 'data/'
decoders:
- {type: gzip}
parser:
charset: UTF-8
newline: CRLF
type: csv
delimiter: ','
quote: '"'
escape: '"'
null_string: 'NULL'
skip_header_lines: 1
columns:
- {name: customer_id, type: string}
- {name: customer_name, type: string}
- {name: gender_cd, type: string}
- {name: gender, type: string}
- {name: birth_day, type: timestamp, format: '%Y-%m-%d'}
- {name: age, type: long}
- {name: postal_cd, type: string}
- {name: address, type: string}
- {name: application_store_cd, type: string}
- {name: application_date, type: string}
- {name: status_cd, type: string}
filters:
- type: column
columns:
- {name: customer_id, type: string}
- {name: gender_cd, type: string}
- {name: gender, type: string}
- {name: birth_day, type: timestamp, format: '%Y-%m-%d'}
- {name: age, type: long}
- {name: postal_cd, type: string}
- {name: application_store_cd, type: string}
- {name: application_date, type: string}
- {name: status_cd, type: string}
out:
type: stdout
workflowTemplateRef:
name: embulk-template
最後に、ワークフローを登録して処理が正しく動くか確かめる。
# テンプレートを登録
argo -n argo template create argo/workflow_template.yaml
# ジョブのsubmit
argo -n argo submit argo/workflow.yaml
submitに成功していれば、ArgoのUIで確かめることができる。
# UIを立ち上げ
argo server
※最初はログインを求められるので、トークンを取得する。
kubectl -n argo exec -it $(kubectl get --no-headers=true pods -n argo -o name -l app=argo-server) -- argo auth token
成功していれば、statusがSucceededになり、ログが出力される。
感想
とりあえずローカルでざっと動かすことはできた。次はクラウドストレージのデータをBigQueryに移送する機能を実装する。
参考
Discussion