📌

Argo WorkflowsでstepのOutputsを使って並行処理をしてみる

2021/03/09に公開

Argo Workflowで step A のoutputsで出力したListを後続の step B で並列処理(単一の step B の内部で並列化するのではなく、Listの個々の値毎に step B を並列処理)したいといったケースを実現したかったので調べてみました。

結論

Argo Workflowsのgithubにsampleが沢山掲載されており、その中にありました。

argo-workflows/examples/loops-param-result.yaml

以下はgithubのサンプルからそのまま拝借しています。 withParam にoutputsで出力したListを渡すことで展開してくれるようです(この場合は {{item}} に個々の値が入り、seconds というパラメータの値として渡される)。
展開された値毎にWorkflowが生成されてそれぞれが並行処理で走るため、outputsのListに100件データがあれば100個のworkflowが並行で走る感じになります。

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: loops-param-result-
spec:
  entrypoint: loop-param-result-example
  templates:
  - name: loop-param-result-example
    steps:
    - - name: generate
        template: gen-number-list
    - - name: sleep
        template: sleep-n-sec
        arguments:
          parameters:
          - name: seconds
            value: "{{item}}"
        withParam: "{{steps.generate.outputs.result}}" # ここでoutputsのリストを展開している

  - name: gen-number-list
    script:
      image: python:alpine3.6
      command: [python]
      source: |
        import json
        import sys
        json.dump([i for i in range(20, 31)], sys.stdout)
  - name: sleep-n-sec
    inputs:
      parameters:
      - name: seconds # ここに展開された値が渡されてくる
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["echo sleeping for {{inputs.parameters.seconds}} seconds; sleep {{inputs.parameters.seconds}}; echo done"]

動かしてみよう

minikubeで動かしてみました。
pod操作を行うためdefaultのServiceAccountで上記のYAMLを実行すると権限不足でエラーとなるため、以下のように適当なServiceAccountを用意します。

apiVersion: v1
kind: ServiceAccount
metadata:
  name: argo-wf-sa
  namespace: default
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: argo-wf-clusterrolebinding
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: admin # 本来は必要最低限の権限を付与すること!
subjects:
  - kind: ServiceAccount
    name: argo-wf-sa
    namespace: default

上記をapplyしたら、Argo WorkflowのGUIを表示して以下を実行します。

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: loops-param-result
  namespace: default
spec:
  serviceAccountName: argo-wf-sa # 作ったSAを指定
  entrypoint: loop-param-result-example
  templates:
  - name: loop-param-result-example
    steps:
    - - name: generate
        template: gen-number-list
    - - name: sleep
        template: sleep-n-sec
        arguments:
          parameters:
          - name: seconds
            value: "{{item}}"
        withParam: "{{steps.generate.outputs.result}}"

  - name: gen-number-list
    script:
      image: python:alpine3.6
      command: [python]
      source: |
        import json
        import sys
        json.dump([i for i in range(20, 31)], sys.stdout)
  - name: sleep-n-sec
    inputs:
      parameters:
      - name: seconds
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["echo sleeping for {{inputs.parameters.seconds}} seconds; sleep {{inputs.parameters.seconds}}; echo done"]

generate ステップが終わると、 sleep ステップが並行で実行されます。よさそう。

その他

argo-workflows/examples にはその他にも沢山の便利なサンプルが載っているので、是非参考にしてみてください。

今回取り上げたloop関連では以下のようなものもあります。

Discussion