Closed5
Kubeflow Pipelines で S3 にあるCSVを取得する
Pipeline の骨子
@dsl.pipeline(
name="Ingest data",
description="Ingest data from S3."
)
def ingest_pipeline(csv_path: str):
# TODO: define ContainerOp to accsss S3 bucket
kfp.compiler.Compiler().compile(
pipeline_func=ingest_pipeline,
package_path="ingest_pipeline.yaml"
)
予め S3 にアクセスするための認証情報を定義したオブジェクトを K8s へ apply しておく
# S3 のシークレット情報
# ACCESS_KEY, SECRET_ACCESS_KEY は base64に変換したものを指定する
# ex) `echo -n $AWS_ACCESS_KEY_ID | base64`
apiVersion: v1
kind: Secret
metadata:
name: aws-secret
namespace: ***
type: Opaque
data:
AWS_ACCESS_KEY_ID: *****
AWS_SECRET_ACCESS_KEY: *****
とりあえず 指定した CSV ファイルを S3 から取得して表示する
def prepare(bucket_name: str, input_file_name: str):
from io import StringIO
import boto3
import pandas as pd
client = boto3.client("s3")
obj = client.get_object(Bucket=bucket_name, Key=input_file_name)
content = obj["Body"].read().decode("utf-8")
df = pd.read_csv(StringIO(content))
df.head()
from kfp.aws import use_aws_secret
prepare_op = comp.create_component_from_func(
prepare,
packages_to_install=["boto3", "pandas"]
)
@dsl.pipeline(
name="Ingest data",
description="Ingest data from S3."
)
def ingest_pipeline(bucket_name: str, csv_path: str):
prepare_op(bucket_name=bucket_name, input_file_name=csv_path).apply(
use_aws_secret(
'aws-secret', 'AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY'
)
)
kfp.compiler.Compiler().compile(
pipeline_func=ingest_pipeline,
package_path="ingest_pipeline.yaml"
)
出力された ingest_pipeline.yaml
を Kubeflow Dashboard の Pipeline から実行してみる
Create Run
の際に パラメータを指定できるようになっている
データが S3 から取得できている 🎉
このスクラップは2022/06/14にクローズされました