Closed5

Kubeflow Pipelines で S3 にあるCSVを取得する

d3d3

Pipeline の骨子

@dsl.pipeline(
    name="Ingest data",
    description="Ingest data from S3."
)
def ingest_pipeline(csv_path: str):
  # TODO: define ContainerOp to accsss S3 bucket

kfp.compiler.Compiler().compile(
    pipeline_func=ingest_pipeline,
    package_path="ingest_pipeline.yaml"
)
d3d3

予め S3 にアクセスするための認証情報を定義したオブジェクトを K8s へ apply しておく

# S3 のシークレット情報
# ACCESS_KEY, SECRET_ACCESS_KEY は base64に変換したものを指定する
# ex) `echo -n $AWS_ACCESS_KEY_ID | base64`
apiVersion: v1
kind: Secret
metadata:
  name: aws-secret
  namespace: ***
type: Opaque
data:
  AWS_ACCESS_KEY_ID: *****
  AWS_SECRET_ACCESS_KEY: *****
d3d3

とりあえず 指定した CSV ファイルを S3 から取得して表示する

def prepare(bucket_name: str, input_file_name: str):
  from io import StringIO

  import boto3
  import pandas as pd

  client = boto3.client("s3")
  obj = client.get_object(Bucket=bucket_name, Key=input_file_name)
  content = obj["Body"].read().decode("utf-8")

  df = pd.read_csv(StringIO(content))
  df.head()
from kfp.aws import use_aws_secret

prepare_op = comp.create_component_from_func(
    prepare,
    packages_to_install=["boto3", "pandas"]
)

@dsl.pipeline(
    name="Ingest data",
    description="Ingest data from S3."
)
def ingest_pipeline(bucket_name: str, csv_path: str):
  prepare_op(bucket_name=bucket_name, input_file_name=csv_path).apply(
    use_aws_secret(
        'aws-secret', 'AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY'
    )
  )
kfp.compiler.Compiler().compile(
    pipeline_func=ingest_pipeline,
    package_path="ingest_pipeline.yaml"
)
d3d3

出力された ingest_pipeline.yaml を Kubeflow Dashboard の Pipeline から実行してみる

Create Run の際に パラメータを指定できるようになっている

d3d3

データが S3 から取得できている 🎉


このスクラップは2022/06/14にクローズされました