👋

Argo WorkflowsとEmbulkでBigQueryにデータ転送する

2022/07/08に公開

これは何か

前回の記事でArgo WorkflowsでEmbulkを動かすところまでできたので、実用を意識してArgo WorkflowsとEmbulkで簡易的なデータパイプラインを作ってみる。

やりたいこと


①GCSからcsvファイルをBigQueryにロードする
②失敗したらSlackにエラー通知を出す
③定期実行する
※クラウド上で動かすところは今回はスコープアウト

準備

docker imageを準備する

Dockerfile
FROM openjdk:8-jre-alpine
ARG VERSION

RUN mkdir -p /root/.embulk/bin \
    && wget -q https://dl.embulk.org/embulk-${VERSION}.jar -O /root/.embulk/bin/embulk \
    && chmod +x /root/.embulk/bin/embulk
ENV PATH=$PATH:/root/.embulk/bin
RUN apk add --no-cache libc6-compat 
RUN embulk gem install jwt:2.3.0
RUN embulk gem install embulk-filter-column && embulk gem install embulk-input-gcs && embulk gem install embulk-output-bigquery

docker imageの作成は前回の記事と同じ。
今回embulk0.9系を使ったため、embulk-output-bigqueryのインストールでエラーになったが、以下の記事を参考に回避した。
https://qiita.com/RRY77/items/9a5e91a287e07f92f3ec

GCS, BigQueryの準備

Cloud StorageにCSVファイルを用意する。今回はtest-sasakky-bucketというバケット名の中に、dataというディレクトリを作ってファイルをアップロードした。
また、BigQueryにデータセットとテーブルを用意する。

secretの準備

GCSからのファイル取得とBigQueryへのデータロードのできるサービスアカウントを用意し、シークレットキーをkubernetes secretとして作成する。
また、slackのwebhook URLもsecretとして作成しておく。

実装

embulkでデータ転送を実行するコンテナとSlack通知を実行するコンテナはそれぞれWorkflow Templateとして作成する。

embulk_template.yml
apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
  name: embulk-template
spec:
  entrypoint: embulk
  templates:
    - name: embulk
      inputs:
        artifacts:
        - name: embulk-config
          path: /input/config.yml
      container:
        image: embulk:v0.1
        command: [java]
        args:  ["-jar", "/root/.embulk/bin/embulk", "run", "/input/config.yml"]
        volumeMounts:
        - name: credentials-volume
          mountPath: "/run/secrets/gcp-secret"
          readOnly: true
          subpath: gcp-secret
        resources:
          limits:
            memory: "4Gi"
            cpu: "2"
      volumes:
        - name: credentials-volume
          secret:
            secretName: volume-secret
            items:
            - key: gcp-secret
              path: gcp-secret
notification_template.yml
apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
  name: slack
spec:
  templates:
  - name: failed-notification-to-slack
    volumes:
    - name: credentials-volume
      secret:
        secretName: credentials-volume
    container:
      image: curlimages/curl
      command: [sh, -c]
      args: [
        "curl -X POST --data-urlencode 'payload={
          \"text\": \"{{workflow.name}} finished\",
          \"blocks\": [
            {
              \"type\": \"section\",
              \"text\": {
                \"type\": \"mrkdwn\",
                \"text\": \"Workflow {{workflow.name}} {{workflow.status}}\",
              }
            }
          ]
        }'
        $WEBHOOKURL"
      ]
      env:
      - name: WEBHOOKURL
        valueFrom:
          secretKeyRef:
            name: credentials-volume
            key: webhook-url

それぞれのtemplateを実行する処理はCron Workflowとして作成する。onExitで失敗時に実行するWorkflowが指定できる。

embulk.yml
apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
  name: csv-from-gcs-to-bq
spec:
  schedule: "*/10 * * * *" # 10分ごとに実行
  workflowSpec:
    entrypoint: embulk
    onExit: exit-handler
    templates:
    - name: embulk
      steps:
        - - name: call-embulk-template
            templateRef:
              name: embulk-template
              template: embulk
            arguments:
              artifacts:
              - name: embulk-config
                raw:
                  data: |
                    in:
                      type: gcs
                      bucket: test-sasakky-bucket
                      path_prefix: "data/"
                      auth_method: json_key
                      json_keyfile: /run/secrets/gcp-secret
                      decoders:
                      - {type: gzip}
                      parser:
                        charset: UTF-8
                        newline: CRLF
                        type: csv
                        delimiter: ','
                        quote: '"'
                        escape: '"'
                        null_string: 'NULL'
                        skip_header_lines: 1
                        columns:
                        - {name: customer_id, type: string}
                        - {name: customer_name, type: string}
                        - {name: gender_cd, type: string}
                        - {name: gender, type: string}
                        - {name: birth_day, type: timestamp, format: '%Y-%m-%d'}
                        - {name: age, type: long}
                        - {name: postal_cd, type: string}
                        - {name: address, type: string}
                        - {name: application_store_cd, type: string}
                        - {name: application_date, type: string}
                        - {name: status_cd, type: string}
                    filters:
                      - type: column
                        columns: 
                          - {name: customer_id, type: string}
                          - {name: gender_cd, type: string}
                          - {name: gender, type: string}
                          - {name: birth_day, type: timestamp, format: '%Y-%m-%d'}
                          - {name: age, type: long}
                          - {name: postal_cd, type: string}
                          - {name: application_store_cd, type: string}
                          - {name: application_date, type: string}
                          - {name: status_cd, type: string}
                    out: 
                      type: bigquery
                      mode: replace
                      auth_method: json_key
                      json_keyfile: /run/secrets/gcp-secret
                      dataset: raw
                      table: test
                      auto_create_table: true
                      location: us-central1 

    - name: exit-handler
      steps:
        - - name: slack-notification
            when: "{{workflow.status}} != Succeeded"
            templateRef:
              name: slack   
              template: failed-notification-to-slack

結果

成功したためslackのワークフローはスキップされる。失敗した場合はslackに通知が飛ぶ。

Discussion