👋
Argo WorkflowsとEmbulkでBigQueryにデータ転送する
これは何か
前回の記事でArgo WorkflowsでEmbulkを動かすところまでできたので、実用を意識してArgo WorkflowsとEmbulkで簡易的なデータパイプラインを作ってみる。
やりたいこと
①GCSからcsvファイルをBigQueryにロードする
②失敗したらSlackにエラー通知を出す
③定期実行する
※クラウド上で動かすところは今回はスコープアウト
準備
docker imageを準備する
Dockerfile
FROM openjdk:8-jre-alpine
ARG VERSION
RUN mkdir -p /root/.embulk/bin \
&& wget -q https://dl.embulk.org/embulk-${VERSION}.jar -O /root/.embulk/bin/embulk \
&& chmod +x /root/.embulk/bin/embulk
ENV PATH=$PATH:/root/.embulk/bin
RUN apk add --no-cache libc6-compat
RUN embulk gem install jwt:2.3.0
RUN embulk gem install embulk-filter-column && embulk gem install embulk-input-gcs && embulk gem install embulk-output-bigquery
docker imageの作成は前回の記事と同じ。
今回embulk0.9系を使ったため、embulk-output-bigqueryのインストールでエラーになったが、以下の記事を参考に回避した。
GCS, BigQueryの準備
Cloud StorageにCSVファイルを用意する。今回はtest-sasakky-bucketというバケット名の中に、dataというディレクトリを作ってファイルをアップロードした。
また、BigQueryにデータセットとテーブルを用意する。
secretの準備
GCSからのファイル取得とBigQueryへのデータロードのできるサービスアカウントを用意し、シークレットキーをkubernetes secretとして作成する。
また、slackのwebhook URLもsecretとして作成しておく。
実装
embulkでデータ転送を実行するコンテナとSlack通知を実行するコンテナはそれぞれWorkflow Templateとして作成する。
embulk_template.yml
apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
name: embulk-template
spec:
entrypoint: embulk
templates:
- name: embulk
inputs:
artifacts:
- name: embulk-config
path: /input/config.yml
container:
image: embulk:v0.1
command: [java]
args: ["-jar", "/root/.embulk/bin/embulk", "run", "/input/config.yml"]
volumeMounts:
- name: credentials-volume
mountPath: "/run/secrets/gcp-secret"
readOnly: true
subpath: gcp-secret
resources:
limits:
memory: "4Gi"
cpu: "2"
volumes:
- name: credentials-volume
secret:
secretName: volume-secret
items:
- key: gcp-secret
path: gcp-secret
notification_template.yml
apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
name: slack
spec:
templates:
- name: failed-notification-to-slack
volumes:
- name: credentials-volume
secret:
secretName: credentials-volume
container:
image: curlimages/curl
command: [sh, -c]
args: [
"curl -X POST --data-urlencode 'payload={
\"text\": \"{{workflow.name}} finished\",
\"blocks\": [
{
\"type\": \"section\",
\"text\": {
\"type\": \"mrkdwn\",
\"text\": \"Workflow {{workflow.name}} {{workflow.status}}\",
}
}
]
}'
$WEBHOOKURL"
]
env:
- name: WEBHOOKURL
valueFrom:
secretKeyRef:
name: credentials-volume
key: webhook-url
それぞれのtemplateを実行する処理はCron Workflowとして作成する。onExitで失敗時に実行するWorkflowが指定できる。
embulk.yml
apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
name: csv-from-gcs-to-bq
spec:
schedule: "*/10 * * * *" # 10分ごとに実行
workflowSpec:
entrypoint: embulk
onExit: exit-handler
templates:
- name: embulk
steps:
- - name: call-embulk-template
templateRef:
name: embulk-template
template: embulk
arguments:
artifacts:
- name: embulk-config
raw:
data: |
in:
type: gcs
bucket: test-sasakky-bucket
path_prefix: "data/"
auth_method: json_key
json_keyfile: /run/secrets/gcp-secret
decoders:
- {type: gzip}
parser:
charset: UTF-8
newline: CRLF
type: csv
delimiter: ','
quote: '"'
escape: '"'
null_string: 'NULL'
skip_header_lines: 1
columns:
- {name: customer_id, type: string}
- {name: customer_name, type: string}
- {name: gender_cd, type: string}
- {name: gender, type: string}
- {name: birth_day, type: timestamp, format: '%Y-%m-%d'}
- {name: age, type: long}
- {name: postal_cd, type: string}
- {name: address, type: string}
- {name: application_store_cd, type: string}
- {name: application_date, type: string}
- {name: status_cd, type: string}
filters:
- type: column
columns:
- {name: customer_id, type: string}
- {name: gender_cd, type: string}
- {name: gender, type: string}
- {name: birth_day, type: timestamp, format: '%Y-%m-%d'}
- {name: age, type: long}
- {name: postal_cd, type: string}
- {name: application_store_cd, type: string}
- {name: application_date, type: string}
- {name: status_cd, type: string}
out:
type: bigquery
mode: replace
auth_method: json_key
json_keyfile: /run/secrets/gcp-secret
dataset: raw
table: test
auto_create_table: true
location: us-central1
- name: exit-handler
steps:
- - name: slack-notification
when: "{{workflow.status}} != Succeeded"
templateRef:
name: slack
template: failed-notification-to-slack
結果
成功したためslackのワークフローはスキップされる。失敗した場合はslackに通知が飛ぶ。
Discussion