EMR ServerlessをStepFunctionsに組み込みマイクロサービスのデータパイプラインを構築した
概要
実務でマイクロサービスの一部にAmazon EMR Serverlessをつかったデータパイプラインを作りました。
使う際のTipsをこちらにまとめます。
この記事で書かないこと
内容は実務でAmazon EMR Serverlessを使ってみた中で、他でも汎用できそうなTipsにしぼります。
実務のプロジェクトの詳細、パイプライン全体の仕様、Spark(PySpark)の処理内容等は言及しません。
またAmazon EMR ServerlessのApplication、Jobを作って動かすまでの手順は他に記事があるので割愛します。
EMR Serverlessとは
Amazon EMR Serverlessは2022年6月にGAされたフルマネージドのの分散処理サービスです。
必要なタイミングで必要なリソースが割り当てられるため、運用管理にコストがかかりません。
今回のユースケースと選定理由
1週間〜2週間に一回のペースで、約2億件ほどのデータを定期的に言語判定処理してプロダクトにデリバリーする必要がありました。
言語判定は所属している会社の他のデータ処理でPythonのlangdetectを使用していたため、言語はPythonを踏襲したところ、2億件のデータを直列で処理すると十数時間かかりました。
短時間で処理するには並列処理する必要がありますが、管理コストをなるべくかけたくなかったことから、Amazon EMR Serverlessを使ってみることにしました。
結果として他のサービスも併用して、データの件数を2億→4000万程度に削ってからEMR Serverlessで処理するようにしたため、単純比較はできないですが、処理時間は30分にまで抑えることができました。(料金は概算で一回600円程度)
併用したサービス
Amazon EMR Serverlessと合わせて以下のサービスを使用しました。
- StepFunctions
- Lambda
- S3
※パイプライン全体でいうと他のサービスも使っていますが、関連が強いものに絞っています。
Pythonのライブラリを使用するため仮想環境をアップロードする
今回AWS EMR Serverlessを使用する目的は、言語判定であるため、Pythonのlangdetectの利用が
必須でしたが、標準のimageにはinstallされていません。
ライブラリを反映するには以下の公式で展開されている通り、venv-packで仮想環境を出力し、S3を経由してJobが読み取れるように設定する必要があります。
また、Application作成時のArchitectureの設定に合わせ、仮想環境を生成してください。
今回はarm64で設定したのでlinux/amd64 amazonlinux:2
のimage上で仮想環境を出力し、S3に配備しました。
FROM amazonlinux:2 AS base
RUN yum install -y python3
ENV VIRTUAL_ENV=/opt/venv
RUN python3 -m venv $VIRTUAL_ENV
ENV PATH="$VIRTUAL_ENV/bin:$PATH"
RUN python3 -m pip install --upgrade pip && \
python3 -m pip install \
great_expectations==0.15.6 \
venv-pack==0.2.0 && \
python3 -m pip install langdetect
RUN mkdir /output && venv-pack -o /output/pyspark_venv.tar.gz
FROM scratch AS export
COPY /output/pyspark_venv.tar.gz /
# Pyspark job setting
--conf spark.archives=s3://bucket/script/pyspark_ge.tar.gz#environment
--conf spark.emr-serverless.driverEnv.PYSPARK_DRIVER_PYTHON=./environment/bin/python
--conf spark.emr-serverless.driverEnv.PYSPARK_PYTHON=./environment/bin/python
--conf spark.emr-serverless.executorEnv.PYSPARK_PYTHON=./environment/bin/python
仮想環境のアップロード以外にも、ApplicationのCustom Image Settingから、ワーカーノードに利用するimageをECRより指定して差し替えることができます。
StepFunctionsにAWS EMR Serverlessを組み込む
パイプラインはStepFunctionsで以下のように構築しました。
EMRのJobの実行は非同期であるため、実行と完了判断のアクションを分けています。
- JobStart(Lambda Invoke)・・・Jobの実行
- JobCheck(Lambda Invoke)・・・Jobのステータスの取得
- IsEMRServerless(Choice state)・・・取得したステータスを判断して完了(失敗・キャンセル)するまでループさせる
- JobWait(Waite state)・・・Job完了までの不要なActionのループを抑止するため一定時間待機
プロジェクト開始時点の2023年1月時点で、StepFunctionsのアクションとしてサポートされているのはEMR(EC2)とEMR on EKSのみだったため、Lambdaから実行しました。
Lambda上からpythonのboto3でJobを実行します。
def start_job(version):
job_driver = {
"sparkSubmit": {
"entryPoint": f"s3://bucket/script/entrypoint.py",
"entryPointArguments": [version],
"sparkSubmitParameters":
f"--conf spark.archives=s3://bucket/script/pyspark_venv.tar.gz#environment --conf spark.emr-serverless.driverEnv.PYSPARK_DRIVER_PYTHON=./environment/bin/python --conf spark.emr-serverless.driverEnv.PYSPARK_PYTHON=./environment/bin/python --conf spark.executorEnv.PYSPARK_PYTHON=./environment/bin/python",
},
}
client = boto3.client("emr-serverless", region_name="ap-northeast-1")
response = client.start_job_run(
applicationId="APP_ID",
executionRoleArn="arn:aws:XXXXXXXXXXXXXXX",
jobDriver=job_driver,
name="langdetect-job",
)
return response
Jobの状態も同様にboto3で取得します。
def check_job(job_run_id):
boto3.client("emr-serverless", region_name="ap-northeast-1")
response = get_client().get_job_run(
applicationId="APP_ID", jobRunId=job_run_id
)
return response
実行 ・ 取得には権限が必要なため、policyで付与してください。
{
"Statement": [
{
"Action": "emr-serverless:StartJobRun",
"Effect": "Allow",
"Resource": "arn:aws:iam::XXXXXXXXXXXXXXXXX",
"Sid": ""
}
],
"Version": "2012-10-17"
}
{
"Statement": [
{
"Action": "emr-serverless:GetJobRun",
"Effect": "Allow",
"Resource": "arn:aws:iam::XXXXXXXXXXXXXXXXX",
"Sid": ""
}
],
"Version": "2012-10-17"
}
boto3でのEMR Serverlessの操作には、1.26.46以降のバージョンが必要です。
Lambdaのboto3は2022年2月現在1.20.32のため、レイヤー指定やライブラリの参照先の読み替えにより適宜新しいバージョンを参照する様に設定してください。
※2023年2月現在はStepFunctionsのAcionでEMR Serverlessがサポートされてます
参考
Discussion