🙆

Argo WorkflowsでEmbulkを動かしてみる

2022/06/26に公開

これは何か

今更だがArgo Workflowsのキャッチアップをすべく、ローカル環境でArgo Workflowsを使ってembulkを動かして見たので、そこで得た知見をまとめていく。意外と日本語で体系的にまとまっているドキュメントが少なくしんどかった。

Argo Workflowsとは何か

Kubernetes上で動作するワークフローエンジン。コンテナを用いてジョブを記述でき、ワークフローを定義できる。環境依存性がある処理をコンテナごとに管理できるため、ソフトウェア構成が簡素化できるらしい。基本的な使い方については割愛。

前提

ローカルでkubernetesクラスタのセットアップはしておく。

やりたいこと

とりあえずembulkでローカルにあるcsvファイルを読み取って、内容をフィルターして標準出力に出すところまでやってみた。

準備

Argo Workflowsのインストール

# Namespaceの作成
kubectl create ns argo
# 作成したNamespaceにArgoをインストールする
kubectl apply -n argo -f https://raw.githubusercontent.com/argoproj/argo/stable/manifests/quick-start-postgres.yaml

成功したら以下のpodが立ち上がる。

NAME                                   READY   STATUS    RESTARTS   AGE
argo-server-74d5fbc96f-9gc2n           1/1     Running   0          2m2s
minio-58977b4b48-m9c5l                 1/1     Running   0          2m1s
postgres-6b5c55f477-pbbv8              1/1     Running   0          2m1s
workflow-controller-7fb47d49bb-t2mrb   1/1     Running   0          2m

Argo CLIのインストール

ここのページを見つつ、最新版のCLIをインストールする。

$ argo version
argo: v3.3.6
  BuildDate: 2022-05-26T01:07:24Z
  GitCommit: 2b428be8001a9d5d232dbd52d7e902812107eb28
  GitTreeState: clean
  GitTag: v3.3.6
  GoVersion: go1.17.10
  Compiler: gc
  Platform: darwin/amd64

実装

ディレクトリ構成は以下。

.
├── Dockerfile
├── argo
│   ├── workflow.yaml
│   └── workflow_template.yaml
└── data
    └── customer.csv.gz

DockerfileにEmbulkの実行環境を記述する。フィルタリングに必要なpluginも同時にインストールした。

Dockerfile
FROM openjdk:8-jre-alpine
ARG VERSION

RUN mkdir -p /root/.embulk/bin \
    && wget -q https://dl.embulk.org/embulk-${VERSION}.jar -O /root/.embulk/bin/embulk \
    && chmod +x /root/.embulk/bin/embulk
ENV PATH=$PATH:/root/.embulk/bin
RUN apk add --no-cache libc6-compat && embulk gem install embulk-filter-column

DockerfileはイメージとしてArgoから呼び出せるように登録しておく。2022年6月段階では0.9.24がstableなのでそれを使っている。

docker build . -t embulk:v0.1 --build-arg VERSION=0.9.24

Embulkを実行する Templateは以下のように記述した。Argo Workflowはこのように処理をテンプレ化させておいて、各所で再利用することが可能らしい。

workflow_template.yaml
apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
  name: embulk-template
spec:
  entrypoint: embulk
  templates:
    - name: embulk
      inputs:
        artifacts:
        - name: embulk-config
          path: /input/config.yml
      container:
        image: embulk:v0.1
        command: [java]
        args:  ["-jar", "/root/.embulk/bin/embulk", "run", "/input/config.yml"]
        volumeMounts:
        - name: data
          mountPath: /data
      volumes:
      - name: data
        configmap:
          name: customer-data

dataはConfigmapで渡した。

kubectl -n argo create configmap customer-data --from-file=data/customer.csv.gz

embulkを実行する本体の処理は以下。embulkのconfigはとりあえずartifactsとして渡す。これも外部ファイルから呼び出せないか後で検討したい。
workflowTemplateRefで作成したテンプレートを参照することができる。

workflow.yml
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  name: csv-to-stdout-sample
spec:
  entrypoint: embulk
  arguments:
    artifacts:
    - name: embulk-config
      raw:
        data: |
          in:
            type: file
            path_prefix: 'data/'
            decoders:
            - {type: gzip}
            parser:
              charset: UTF-8
              newline: CRLF
              type: csv
              delimiter: ','
              quote: '"'
              escape: '"'
              null_string: 'NULL'
              skip_header_lines: 1
              columns:
              - {name: customer_id, type: string}
              - {name: customer_name, type: string}
              - {name: gender_cd, type: string}
              - {name: gender, type: string}
              - {name: birth_day, type: timestamp, format: '%Y-%m-%d'}
              - {name: age, type: long}
              - {name: postal_cd, type: string}
              - {name: address, type: string}
              - {name: application_store_cd, type: string}
              - {name: application_date, type: string}
              - {name: status_cd, type: string}
          filters:
            - type: column
              columns: 
                - {name: customer_id, type: string}
                - {name: gender_cd, type: string}
                - {name: gender, type: string}
                - {name: birth_day, type: timestamp, format: '%Y-%m-%d'}
                - {name: age, type: long}
                - {name: postal_cd, type: string}
                - {name: application_store_cd, type: string}
                - {name: application_date, type: string}
                - {name: status_cd, type: string}
          out: 
            type: stdout       
  workflowTemplateRef:
    name: embulk-template

最後に、ワークフローを登録して処理が正しく動くか確かめる。

# テンプレートを登録
argo -n argo template create argo/workflow_template.yaml
# ジョブのsubmit
argo -n argo submit argo/workflow.yaml

submitに成功していれば、ArgoのUIで確かめることができる。

# UIを立ち上げ
argo server

※最初はログインを求められるので、トークンを取得する。

kubectl -n argo exec -it $(kubectl get --no-headers=true pods -n argo -o name -l app=argo-server) -- argo auth token

成功していれば、statusがSucceededになり、ログが出力される。

感想

とりあえずローカルでざっと動かすことはできた。次はクラウドストレージのデータをBigQueryに移送する機能を実装する。

参考

https://aaaanwz.github.io/post/2021/argo-workflows-embulk/
https://kubernetes.io/docs/tasks/configure-pod-container/configure-pod-configmap/#add-configmap-data-to-a-volume
https://kubernetes.io/docs/tasks/configure-pod-container/configure-pod-configmap/#create-configmaps-from-files

Discussion