🐖

EMR ServerlessをStepFunctionsに組み込みマイクロサービスのデータパイプラインを構築した

2023/02/19に公開

概要

実務でマイクロサービスの一部にAmazon EMR Serverlessをつかったデータパイプラインを作りました。
使う際のTipsをこちらにまとめます。

この記事で書かないこと

内容は実務でAmazon EMR Serverlessを使ってみた中で、他でも汎用できそうなTipsにしぼります。

実務のプロジェクトの詳細、パイプライン全体の仕様、Spark(PySpark)の処理内容等は言及しません。

またAmazon EMR ServerlessのApplication、Jobを作って動かすまでの手順は他に記事があるので割愛します。

EMR Serverlessとは

Amazon EMR Serverlessは2022年6月にGAされたフルマネージドのの分散処理サービスです。
必要なタイミングで必要なリソースが割り当てられるため、運用管理にコストがかかりません。

https://aws.amazon.com/jp/about-aws/whats-new/2022/06/amazon-emr-serverless-generally-available/

今回のユースケースと選定理由

1週間〜2週間に一回のペースで、約2億件ほどのデータを定期的に言語判定処理してプロダクトにデリバリーする必要がありました。

言語判定は所属している会社の他のデータ処理でPythonのlangdetectを使用していたため、言語はPythonを踏襲したところ、2億件のデータを直列で処理すると十数時間かかりました。

短時間で処理するには並列処理する必要がありますが、管理コストをなるべくかけたくなかったことから、Amazon EMR Serverlessを使ってみることにしました。

結果として他のサービスも併用して、データの件数を2億→4000万程度に削ってからEMR Serverlessで処理するようにしたため、単純比較はできないですが、処理時間は30分にまで抑えることができました。(料金は概算で一回600円程度)

併用したサービス

Amazon EMR Serverlessと合わせて以下のサービスを使用しました。

  • StepFunctions
  • Lambda
  • S3

※パイプライン全体でいうと他のサービスも使っていますが、関連が強いものに絞っています。

Pythonのライブラリを使用するため仮想環境をアップロードする

今回AWS EMR Serverlessを使用する目的は、言語判定であるため、Pythonのlangdetectの利用が
必須でしたが、標準のimageにはinstallされていません。

ライブラリを反映するには以下の公式で展開されている通り、venv-packで仮想環境を出力し、S3を経由してJobが読み取れるように設定する必要があります。

https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/using-python-libraries.html

また、Application作成時のArchitectureの設定に合わせ、仮想環境を生成してください。

今回はarm64で設定したのでlinux/amd64 amazonlinux:2のimage上で仮想環境を出力し、S3に配備しました。

FROM --platform=linux/amd64 amazonlinux:2 AS base

RUN yum install -y python3

ENV VIRTUAL_ENV=/opt/venv
RUN python3 -m venv $VIRTUAL_ENV
ENV PATH="$VIRTUAL_ENV/bin:$PATH"

RUN python3 -m pip install --upgrade pip && \
    python3 -m pip install \
    great_expectations==0.15.6 \
    venv-pack==0.2.0 && \
    python3 -m pip install langdetect

RUN mkdir /output && venv-pack -o /output/pyspark_venv.tar.gz

FROM scratch AS export
COPY --from=base /output/pyspark_venv.tar.gz /
# Pyspark job setting

--conf spark.archives=s3://bucket/script/pyspark_ge.tar.gz#environment 
--conf spark.emr-serverless.driverEnv.PYSPARK_DRIVER_PYTHON=./environment/bin/python 
--conf spark.emr-serverless.driverEnv.PYSPARK_PYTHON=./environment/bin/python 
--conf spark.emr-serverless.executorEnv.PYSPARK_PYTHON=./environment/bin/python 

仮想環境のアップロード以外にも、ApplicationのCustom Image Settingから、ワーカーノードに利用するimageをECRより指定して差し替えることができます。

https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/application-custom-image.html

StepFunctionsにAWS EMR Serverlessを組み込む

パイプラインはStepFunctionsで以下のように構築しました。

EMRのJobの実行は非同期であるため、実行と完了判断のアクションを分けています。

  • JobStart(Lambda Invoke)・・・Jobの実行
  • JobCheck(Lambda Invoke)・・・Jobのステータスの取得
  • IsEMRServerless(Choice state)・・・取得したステータスを判断して完了(失敗・キャンセル)するまでループさせる
  • JobWait(Waite state)・・・Job完了までの不要なActionのループを抑止するため一定時間待機

プロジェクト開始時点の2023年1月時点で、StepFunctionsのアクションとしてサポートされているのはEMR(EC2)とEMR on EKSのみだったため、Lambdaから実行しました。

Lambda上からpythonのboto3でJobを実行します。

def start_job(version):
    job_driver = {
        "sparkSubmit": {
            "entryPoint": f"s3://bucket/script/entrypoint.py",
            "entryPointArguments": [version],
            "sparkSubmitParameters": 
                f"--conf spark.archives=s3://bucket/script/pyspark_venv.tar.gz#environment --conf spark.emr-serverless.driverEnv.PYSPARK_DRIVER_PYTHON=./environment/bin/python --conf spark.emr-serverless.driverEnv.PYSPARK_PYTHON=./environment/bin/python --conf spark.executorEnv.PYSPARK_PYTHON=./environment/bin/python",
        },
    }
    
    client = boto3.client("emr-serverless", region_name="ap-northeast-1")
    
    response = client.start_job_run(
        applicationId="APP_ID",
        executionRoleArn="arn:aws:XXXXXXXXXXXXXXX",
        jobDriver=job_driver,
        name="langdetect-job",
    )
    return response

Jobの状態も同様にboto3で取得します。

def check_job(job_run_id):
    boto3.client("emr-serverless", region_name="ap-northeast-1")
    
    response = get_client().get_job_run(
        applicationId="APP_ID", jobRunId=job_run_id
    )
    return response

実行 ・ 取得には権限が必要なため、policyで付与してください。

{
    "Statement": [
        {
            "Action": "emr-serverless:StartJobRun",
            "Effect": "Allow",
            "Resource": "arn:aws:iam::XXXXXXXXXXXXXXXXX",
            "Sid": ""
        }
    ],
    "Version": "2012-10-17"
}
{
    "Statement": [
        {
            "Action": "emr-serverless:GetJobRun",
            "Effect": "Allow",
            "Resource": "arn:aws:iam::XXXXXXXXXXXXXXXXX",
            "Sid": ""
        }
    ],
    "Version": "2012-10-17"
}

boto3でのEMR Serverlessの操作には、1.26.46以降のバージョンが必要です。

Lambdaのboto3は2022年2月現在1.20.32のため、レイヤー指定やライブラリの参照先の読み替えにより適宜新しいバージョンを参照する様に設定してください。

※2023年2月現在はStepFunctionsのAcionでEMR Serverlessがサポートされてます

参考

https://docs.greatexpectations.io/docs/deployment_patterns/how_to_use_great_expectations_in_emr_serverless/

Discussion