Kubernetesを利用したバッチ実行環境の構築
こんにちは。エンジニアのSです。主にバックエンドの機能開発を担当しています。
Sprocketにおけるバッチ連携機能のアーキテクチャとその変化を説明することにより、Kubernetesを用いてどのようにバッチ実行環境を構築したのかをご紹介いたします。バッチ連携機能ではお客様が保持するユーザー情報をCSVファイルとしてS3へアップロードすることにより、ユーザー情報をプラットフォームへ登録できます。登録されたユーザー情報を利用してセグメントの作成などに利用できます。
バッチ連携機能をリリースした当初のアーキテクチャ
バッチ連携機能ははじめからEKSをベースとしてシステムを構築しました。gRPCサーバーであるPodがメッセージを受信するとバッチを実行する仕組みです。メッセージにはCSVのデータ定義などお客様毎に異なるパラメータが含まれています。バッチはメッセージのパラメータに基づいて動作するようになっています。CronJobがgRPCを経由してバッチを定期実行することにより、ユーザー情報を保持しているAmazon DynamoDBのテーブルへCSVの内容が書き込まれます。
バッチ連携システムの問題点
リリース当初は順調に先述のアーキテクチャが機能していました。しかしながら、バッチ連携機能を利用するお客様が増えるに従っていくつかの問題が発生しました。
gRPCサーバーのメモリリーク
gRPCサーバーにてメモリリークが頻発するようになりました。gRPCによって起動されるバッチはAlpakkaを用いて実装されていました。AlpakkaはAkka Streamsをベースとして実装されたストリーム指向プログラミングのDSLを提供するライブラリです。Alpakkaは少ない記述量にて複雑なストリーミング処理を実装できますが、メモリリークのような問題が発生した場合、その原因を特定することが困難です。そのため、メモリリークを改善するためのコード修正が難しいという問題がありました。
バッチの実行時間が短い
gRPCサーバーの稼働している期間と比較してバッチの実行時間が短いという問題がありました。バッチの実行間隔はお客様毎に1時間1回であり、ファイルがアップロードされていない場合はバッチが短時間で終了します。そのため、利用するお客様が一定以上増えない限り、クラスタのリソースを無駄にしてしまいます。バッチ連携を利用されるお客様は一定以上の規模のサイトを運営している事が多く、思ったように機能の利用率が延びませんでした。
CronJobを作成する工数が継続的に発生する
お客様の数だけCronJobを作成する必要があるため、CronJobのリソースを作成する工数が継続的に発生してしまいます。1つのCronJobにてすべてのお客様のバッチを実行する方法もありますが、バッチ実行が失敗した場合のエラーハンドリングが煩雑になるリスクがありました。
問題の解決策
これらの問題を解決するため、アーキテクチャの変更を検討しました。
まず、CronJobを作成する工数が継続的に発生しないよう、Amazon MSKを経由してバッチを起動する仕組みを考えました。CronJobがお客様を識別するキーをメッセージとしてAmazon MSKへ送信します。次にConsumerとなるPodが受信したメッセージに含まれるキーからお客様毎に設定されたパラメータをAmazon Aurora Serverlessから参照します。そのパラメータを元にPodがバッチを起動するよう前段の処理を変更しました。
Podからバッチを起動させる仕組みとしてなにかしらのWorkflowを導入することを検討しました。
メモリリークが発生する条件としてバッチを一度実行したサーバーが再度、バッチを実行することによりメモリリークが発生していました。そのため、Workflowから動的にJobを作成することにより、新規に作成されたPodからバッチを実行することが可能となります。また、必要なタイミングでのみリソースが作成されるため、バッチの実行期間外もサーバーが起動することはありません。
Workflowの選定
アーキテクチャの構想が固まった時点にて、動的にJobを生成するためのWorkflowを選定しました。求められる要件としてKubernetesにてJobの作成が行えることに加えて、Jobの実行結果を保持する状態マシンとしての機能が挙げられます。
AWS Step Functions
AWS Step Functionsはラムダなどのサーバレスアプリケーションを順次実行するサービスです。フローの状態管理やエラーハンドリング、ジョブの再実行なども行えます。さらにAmazon EKSからJobの実行も可能です。しかしながら、選定していた2021年1月時点ではprivateのEKSクラスタに対応していなかったため、採用を断念しました。
Apache Airflow
Apache AirflowはWorkflowの構築とスケジューリングを行うプラットフォームです。PythonにてDAGを記述することにより、Workflowの登録が可能です。登録されているDAGへ実行したい時間を登録しておく他、REST APIからもDAGを実行できます。AWSにてApache Airflowを利用するにはMWAAを使用するのが最も簡単です。チームとしては静的型付け言語での開発へシフトしている中、動的言語であるPythonを採用しにくいことに加え、DAGの開発環境を構築しづらい点が懸念として挙げられました。
Argo Workflows
Argo WorkflowsはKubernetesのJobをオーケストレーションを行うワークフローエンジンです。CRDと付随するリソースをデプロイすることにより、Kubernetesクラスタへインストールできます。WorkflowはCRDとして定義され、それに基づきJobが逐次実行されます。Workflowの設定に対してテストを行う場合、コードベースのユニットテストではなくWorkflowを実際に起動する必要があります。Workflowを修正する度、手動によるE2Eテストを行うコストがかかることは避けたいです。
Stateless
StatelessはGo言語にて記述された状態マシンです。Workflowの経過を状態として表現することにより、Workflowを構築できます。データベースの永続化にも対応しており、状態遷移に応じてデータベースへ状態を書き込むことができます。今まで紹介した他のWorkflowと異なり、AWSのサービスやKubernetesのリソースを必要としません。拡張性と自由度の高さから今回はStatelessを採用しました。
最終的なアーキテクチャ
ConsumerとなるPodがStatelessを用いて各Jobの実行状態を管理します。Jobの作成にはclient-goを使用しています。ProducerとなるCronJobとConsumerとなるPodはそれぞれGo言語にて実装しています。バッチを実行するJobは引き続きAlpakkaを使用しました。各言語のモデルを共有するため、Protocol Buffersによりエンコードされたデータを用いてバッチのパラメータを共有しています。
まとめ
今回はSprocketにおけるバッチ連携機能のアーキテクチャをご説明致しました。その結果、Kubernetesを用いたバッチの実行環境を構築できました。新しいアーキテクチャに基づいた実装は完了しており、古いアーキテクチャにより動作している環境を移行している段階です。
Sprocketで働きませんか?
弊社ではカジュアル面談を実施しております。
ご興味を持たれましたら、こちらからご応募お待ちしております。
Discussion