re:Invent 2023: PinterestのEKS上Apache Spark活用とData on EKSの最新事例
はじめに
海外の様々な講演を日本語記事に書き起こすことで、隠れた良質な情報をもっと身近なものに。そんなコンセプトで進める本企画で今回取り上げるプレゼンテーションはこちら!
📖 AWS re:Invent 2023 - Data processing at massive scale on Amazon EKS (CON309)
この動画では、AWSのAlex LinesとVara Bonthu、そしてPinterestのSoam Acharyaが、Amazon EKS上でApache Sparkを大規模に実行するための最新のベストプラクティスを紹介します。PinterestがHadoopからEKSへの移行で得た知見や、IP枯渇問題の解決策、Gravitonへの移行によるコスト削減効果など、実践的な話題が満載です。さらに、KarpenterやYuniKornといった最新ツールの活用方法も解説。Data on EKSプロジェクトの詳細も語られる、データエンジニア必見の内容です。
※ 動画から自動生成した記事になります。誤字脱字や誤った内容が記載される可能性がありますので、正確な情報は動画本編をご覧ください。本編
自己紹介とプレゼンテーションの概要
まずは簡単な挙手から始めましょう。今日ここにいる方で、Amazon EKSを使用している人は何人いますか? はい、なるほど。もう一つ質問です。Apache Sparkを現在使用している人は何人いますか? はい、これもなるほど。少し少ないようですね。最後の質問です。現在、Amazon EKS上でApache Sparkを実行している人は何人いますか? はい、素晴らしい。ご協力ありがとうございます。
それでは、始めましょう。私はAlex Linesと申します。AWSでEKSを担当しているSenior Container Specialistです。同僚のVara Bonthuと、PinterestのSoam Acharyaも同席しています。彼らには後ほど自己紹介をしてもらいます。
本日は、データ処理のためのKubernetesについてお話しし、それを踏まえてAWSで構築した「Data on EKS」プロジェクトについてご紹介します。その後、Varaに引き継ぎ、EKS上のオープンソースデータプラットフォームについて簡単に説明してもらいます。そして、Soamが、Pinterestのデータ処理に関する最新化の取り組みについて共有してくれます。最後に、Varaが再び登場し、Apache Sparkを大規模に実行するためのベストプラクティスについてお話しします。
データ処理におけるKubernetesの重要性
さて、データのためのKubernetesについて考えてみましょう。金融シミュレーション、自動運転車、ゲノム研究、機械学習による推薦エンジンには、何か共通点があるでしょうか?まず一つ言えるのは、これらはすべてデータによって支えられているということです。データは、これらのプロセスが価値を生み出すための重要な入力です。これらを動かしているのがデータなのです。もう一つの共通点は、これらすべてが現在Kubernetes上で、特にAmazon EKS上で稼働しているということです。
世界の金融市場を理解するためのシミュレーションを構築しているBridgewaterや、世界中の推定1億5000万台の車両に搭載されている自動運転技術を開発しているMobileye。あるいは、生物医学研究を行っているRocheのような企業や、機械学習による推薦エンジンを構築しているPinterestのような企業。これらはすべて、単にKubernetes上だけでなく、現在Amazon EKS上で稼働しており、しかも大規模に運用されています。
では、なぜデータ処理にKubernetesを使うのでしょうか?主に4つの理由があります。
Kubernetesを使用するメリット:スケーラビリティから標準化まで
1つ目はスケーラビリティです。Kubernetesを使えば、アプリケーションのコンピューティングリソースとメモリを個別にスケールできるので、インフラをアプリケーションのニーズに合わせやすくなります。Horizontal Pod AutoscalerやCluster Autoscalerなどの機能があり、さらにAWSのオープンソースツールであるKarpenterのようなツールを使えば、このオートスケーリングをより効果的に管理できます。
2つ目は、オーケストレーションです。データワークロードに関して言えば、Kubernetesのオーケストレーション機能を使って、ジョブが失敗した際の自動再実行などを管理できます。また、ジョブレベルでCPUやメモリのリソースを指定できるので、アプリケーションに割り当てるリソースをきめ細かく制御できます。
3つ目は、ポータビリティです。Kubernetesで実行するということは、コンテナを使用するということです。コンテナでは、アプリケーションコードと、環境変数やライブラリなどの実行に必要な依存関係がパッケージ化されています。そのため、環境間でアプリケーションを移行する際のジョブ失敗が減り、本番環境へのリリースまでの時間を短縮できます。
このポータビリティのもう1つの利点は、マルチバージョンクラスターを実行できることです。同じクラスター内で複数のバージョンのSparkなどのフレームワークを並行して実行できます。VM型のアーキテクチャでは、別のバージョンを実行するために新しいVMを立ち上げるか(コストが増加)、すべてのジョブを新しいバージョンにアップグレードする(手間がかかる)必要がありますが、コンテナならそういった問題を回避できます。
最後の理由は、標準化です。AWSでは、多くの顧客が大規模にKubernetesを採用しているのを見ています。彼らは通常、「Kubernetesの標準化」と呼ばれる方法で、コンテナ化されたアプリケーションのオーケストレーションにKubernetesをデフォルトのコンピューティングオプションとして採用しています。実際には、中央のグループが管理するKubernetesプラットフォームを構築し、開発者に本番環境へのデプロイに必要なCI/CD、可観測性、ガバナンスなどを提供し、インフラの詳細を抽象化するという形で実現されています。
顧客がこのような利点、例えば市場投入までの時間短縮やコスト削減などを実感し始めると、他のアプリケーションにもこれらの利点を広げたいと考えるようになります。また、essential groupの人材面での投資や構築したツールなどの投資をさらに活用することもできます。これも、データワークロードにおけるKubernetesの採用が拡大している理由の一つです。
Data on EKSプロジェクト:課題解決とオープンソースの取り組み
AWSでは、お客様がAmazon EKS上でこれらのデータワークロードを構築するのを支援する中で、お客様が直面する一般的な課題のセットに気づきました。これらは非常にコンピューティング集約型のワークロードであり、多くの場合、非常に変動の激しいスケーリングパターンを持っています。バッチ処理を考えると、時には1,000ノード以上に急速にスケールアップし、その後すぐにスケールダウンする必要があります。クラスターはそれをサポートできるように構成される必要があります。
もう一つの課題は、高可用性アプリケーションの構築です。これらはステートフルなアプリケーションであり、災害復旧はステートレスなアプリケーションとは異なる対応が必要になります。また、数千のノードと潜在的に数百万のマイクロサービスに対する可観測性の設定も課題です。これらと並行して、適切なネットワーク構成、コンピューティングとストレージのオプション、そしてバッチスケジューリングのオプションに関して、多くの決定をお客様と一緒に行い、EKS上でスケーラブルなワークロードを構築できるようにしました。
これらの問題をお客様と共に解決する中で、私たちはData on EKSプロジェクトの構築と立ち上げを決定しました。Data on EKSは、お客様がAmazon EKS上に最新のデータプラットフォームを構築するのを支援するための、私たちが「ブループリント」と呼ぶものの集合体です。このプロジェクトでは、環境にデプロイするものを視覚化できるリファレンスアーキテクチャを含むこれらのブループリントをリリースしました。また、AWSリソースのスピンアップやオープンソースツールのインストール、そしてこれらのアプリケーションを環境で実行するために必要な前提条件をセットアップするのに役立つInfrastructure as Code(IAC)テンプレートも提供しています。これはAWSのベストプラクティスに従って行われ、それらはIACテンプレートに組み込まれています。
最後に、テスト用のサンプルコードを提供しています。Apache Sparkの場合、TPCDSベンチマークとして知られる、広く使用されているベンチマーキングフレームワークを提供しています。これにより、これらのリソースとツールをクラスターにインストールするだけでなく、現在のソリューションに対してテストするためのサンプルコードも利用でき、要件を満たしていることを確認できます。
Data on EKSはパブリックなオープンソースプロジェクトです。ここに画面のスクリーンショットがありますが、私たちはこれらのブループリントをすべてウェブサイトで公開しています。私たちの目標は、このナレッジをコミュニティーにできるだけ広く共有することです。Kubernetesがオープンソースであり、Kubernetesだけでなくデータツールに関しても活発なオープンソースコミュニティーがあることを認識しています。私たちは、この2つが一緒に進化していく過程をサポートし続けたいと考えています。私たちは、お客様の間で最も一般的だと思われるユースケースに対してブループリントを構築しています。これにより、できるだけ有益で、できるだけ多くの価値を提供することができます。
主に、機械学習に関するものが中心です。トレーニングからサービング、さらには機械学習オペレーションまでのエンドツーエンドのプロセスをカバーしています。また、データ処理も重要で、主にApache Sparkを使用しています。これが今日お話しするテーマです。それでは、Varaに引き継ぎます。彼がEKS上のオープンソースデータプラットフォームについて少しお話しします。
Pinterestのデータ処理インフラストラクチャ:概要と移行の理由
ありがとう、Alex。皆さん、こんにちは。私はVara Bonthuと申します。主にオープンソース技術を扱うPrincipal Solutions Architectで、データ分析とKubernetesを専門としています。今日、皆さんにお見せしたい重要なポイントは、Kubernetes上でオープンソースデータプラットフォームを実際に稼働させる方法です。 このスライドをご覧いただくと、Kubernetes内のデータプラットフォームの様々な段階が示されています。データ取り込みから始まり、ストリーミングワークロードのデータ取り込みにはKafkaが最も一般的です。しかし、今回の話題に関連して、特にデータ処理に注目していただきたいと思います。
データ処理の分野では、Apache Spark、Flink、pandas、dask、beamなど、多くのツールがあります。これらのツールはすべて、バッチ処理とストリーム処理の両方を含むデータ処理に使用されます。しかし、私たちのお客様の大多数は、バッチ処理と一部のストリーム処理の両方にApache Sparkを好んで使用しています。では、Apache Sparkについて少し紹介しましょう。
Apache Spark、そして皆さんの多くがApache Sparkを使用していると言われましたが、これは分散処理エンジンです。主に複数のノードを持つ分散システムで、テラバイトからペタバイト規模のデータを処理するために使用されます。Spark SQL、MLlib、Streaming、GraphXなど、様々な種類のデータを処理するためのライブラリが付属しています。Sparkはスタンドアロンマシンで実行することができます。
MacやWindowsマシンなどのローカル環境で主にスクリプトを書いたり、小規模なデータセットを処理したりすることができます。しかし、テラバイトやペタバイト規模のデータを処理する場合は、YARNやMesosなどのリソースマネージャーを使用する必要があります。2018年、SparkコミュニティはリソースマネージャーとしてのKubernetesのサポートを追加しました。これにより、Kubernetesをリソースマネージャーとして使用し、分散方式でKubernetes上でSparkジョブを実行できるようになりました。
では、SparkとKubernetesがどのように連携して動作するかを説明しましょう。スライドに示すように、コントロールプレーンとデータプレーンがあります。ユーザーがコントロールプレーンにSparkジョブを送信すると、スケジューラーがSparkコアとヘッドレスサービスを含むポッドをスケジュールします。これがデータプレーンに作成されるSparkドライバーです。次に、ドライバーがAPIサーバーに複数のエグゼキューターの作成を要求します。スケジューラーはそれらのエグゼキューターをデータプレーンに作成します。これらのエグゼキューターは、ご覧のヘッドレスサービスを使用してドライバーポッドに接続されます。これがKubernetes上のSparkのエンドツーエンドの通信の仕組みです。
では、Pinterestのモダナイゼーションの取り組みについて話すSoamに引き継ぎます。ありがとうございました。
PinterestのEKS上のSparkプラットフォーム「Moka」の詳細
皆さん、こんにちは。まず、AlexとVaraに感謝します。EKSを使った私たちの取り組みと、これまでのモダナイゼーションの道のりについて話す機会を与えてくれてありがとうございます。まず、Pinterestとは何か、どのようなサービスなのかを簡単に説明します。Pinterestは、ソーシャルメディアの要素を持つビジュアル検索・発見エンジンです。Wall Streetは時々私たちをソーシャルメディアと見なしますが、私たちはそうは考えていません。むしろビジュアル検索と発見の側面が強いと感じています。直近の四半期報告では、月間アクティブユーザー数(MAU)は4億8200万人でした。私たちのモットーは、「人々がPinterestを使って、自分の愛する人生を創造するためのインスピレーションを見つける」ことです。
私たちは、ピンとボードという概念で作業します。つまり、気に入った資産、ウェブサイト、画像を見つけ、それらをピン留めし、これらのピンをボードで整理します。これがPinterestの概要です。これが実際にどのようにデータ処理に変換されるかについて、非常に主観的な概要をお見せします。ユーザーはアプリやウェブブラウザを通じてピンとボードを操作します。また、商品カタログなどのサードパーティデータのソースもあります。
これらのデータは、フロントエンドAPIサービスに流れ込み、そこからさらに下流では、ログ情報をストリーミングでリアルタイムに取得するための大規模なKafkaクラスターがあります。このデータは、ストリーミングインフラストラクチャに送られたり、例えばユーザーのボードやピンのメンバーシップを追跡するデータベースに送られたりします。データサービスがこれを担当し、そしてこれらのデータベースからのダンプやApache Kafkaの出力も、巨大なS3データレイクに流れ込みます。その下流には、バッチ処理やストリーミング処理が大量に行われるビッグデータプラットフォームがあります。そこで稼働している機械学習プラットフォームと連携して、シグナル、モデル、レコメンデーション、そして膨大なデータウェアハウスのセットを出力しています。
バッチ処理プラットフォームをさらに詳しく見ていくと、現在、私たちはHadoopをかなり大規模に使用しています。簡単な数字を挙げると、Hadoopクラスターはクラウド上に約14,000のEC2インスタンスを持ち、最大のクラスターは2,500以上のノードを有しています。
1日に約70,000のHadoopジョブを実行し、毎日数百ペタバイトのデータを処理し、S3で管理しているデータは半エクサバイト以上になります。このように、私たちは非常に大規模なHadoopベースを持っています。では、なぜSparkへの移行を検討したのでしょうか?いくつかの理由がありました。Hadoopは確立されたサービスですが、古くなってきていると感じていました。
より cloud native でアジャイルなものに移行したいと考えていました。また、Pinterest内部ではすでにSparkへの移行の勢いがありました。例えば、HiveジョブをSpark SQLに移行したり、多くのMapReduceジョブをSparkに移行してSparkの利点を活かしたりしていました。Sparkのアップグレードや中心的なSparkコードベースの修正にも投資していました。そのため、この部分は当然の選択でした。KubernetesやAmazon EKSのようなcloud nativeソリューションへの移行は、コスト効率を高める大きな機会をもたらすため、非常に魅力的でした。これらを総合すると、開発の反復サイクルを短縮し、生産性を向上させることができます。これらが私たちの主な移行理由でした。
Pinterestの移行タイムラインと得られた成果
私たちのEKS上のSparkプラットフォームは、Mokaと呼んでいます。これは複雑な図ですが、EKS上にSparkを単に導入するだけでは不十分だということを指摘したいと思います。本当に大規模にデータを処理したい場合は、その周りにさまざまなサポートサービスを構築する必要があります。左側から始まるのが、ユーザーがシステムとどのように相互作用するかです。Spinnerと呼ばれるシステムサービスがあり、これはAirflowベースのジョブDAG作成ツールです。これがワークフローを生成します。そして、Archerというサービスがあります。これは、Spinnerから得たDAG定義を取り、一連のEKSクラスターにジョブを投入するために特別に作成したジョブ投入サービスです。
EKS内では、複数のEKSクラスターが稼働しています。これらのクラスターには、VaraとAlexが話していたSparkオペレーターを装備する必要がありました。スケジューリングには、YuniKornを使用しています。これは、HadoopのYARNに最も近いアナログだと判断したためです。さらに、ジョブが実行される際には、イメージをプルする必要があり、そこでAmazon ECRが活躍します。Hiveを廃止してSpark SQLを実行していますが、テーブル定義が格納されているため、Hiveメタストアはまだ必要です。Jupyterや Spark SQL UIの送信、より細かなアクセス制御やセキュリティを提供する他のサービスも開発中です。
右上には、リモートシャッフルサービスもあります。ご存知の通り、Spark自体には外部シャッフルサービスがありますが、EKSクラスター内でのオートスケーリングなどの可能性を開くために、別のサービスに移行する必要があると感じました。これは現在も投資を続けているプロジェクトです。ジョブが実行されると、メトリクスとログが生成されるので、それらを消費するサービスがあります。ユーザーはジョブの実行中にライブのSpark UIに接続したいと思うでしょうから、そのためのイングレスサービスがあります。ジョブが終了すると、ログを確認したいと思うでしょう。そこでSpark History Serverの出番です。
移行のタイムラインについて簡単に説明します。昨年は、様々なSparkランタイムソリューションを評価し、最終的にEKSに決定しました。今年の前半は、EKSとEKS上のSparkの設定を製品化し、選定することに集中しました。
運用方法、ロギングとメトリクスの導入に焦点を当て、最初のジョブセットをHadoopからMokaに移行しました。これは約10%に相当し、現在は一部のスケーリングの機会を検討し、さらに20%のジョブを移行しようとしています。年の前半はSpark Scalaジョブのみを移行しましたが、後半はScalaとSpark SQLの両方を組み合わせて移行に取り組んでいます。
来年は、Pinterestの他のサービスとの統合、特にPinterestサービスメッシュとの統合に重点を置く予定です。PySpark移行の検討や、より細かなアクセス制御の実装、そして開発したコンポーネントの一部をオープンソース化することも計画しています。コミュニティにニーズがあると考えており、私たちが解決している問題は他の人も直面するはずです。これらについてもっと知りたい方がいれば、喜んで共有させていただきます。
EKS上でSparkを実行する際のベストプラクティス:ネットワーキングとストレージ
では、これまでのプロジェクトから得られた成果と学びについてまとめてみましょう。 大きな成果の一つは、Gravitonへの移行とそれに伴う大幅なコスト削減です。これまで私たちのHadoopクラスターはIntelやAMDベースのインスタンスで動作していましたが、現在EKSクラスターはGravitonで稼働しています。また、標準的なKubernetes APIに深いレベルでアクセスできるようになったことで、互換性の問題を心配することなく、さまざまなオープンソースコンポーネントを組み合わせて実行できるようになりました。
Kubernetesクラスターのコントロールプレーンの管理を心配する必要がなくなり、それは全て対応されています。そして、OTELやFluent BitのようなAWSフレーバーのオープンソースコンポーネントも非常に役立ちました。AWSと協力して、Pinterestの環境内での最適な展開方法を理解できたからです。価格面も素晴らしく、EKSのオーバーヘッドは最小限で、コストの大部分はEC2インスタンスから発生しています。私たちは常にコスト削減の機会を探しているので、これは嬉しい誤算でした。
そして、講演全体で触れたように、Data on EKSとEKS Blueprintsは、クラスターの展開を容易にするために、私たちのTerraformデプロイメントインフラストラクチャに組み込んでいます。そして、もちろん、AWSの様々な方々のサポートと協力は素晴らしく、今後もさらに協力関係を深めていきたいと考えています。実際のサイジングについては、現在、合計で約600ノードのクラスターが数個あります。年末までに約1,000ノードに到達したいと考えており、来年はさらに野心的なクラスターサイジング計画があります。
学びとしては、EKSの展開には特にネットワーキングやEC2 APIへの負荷など、独自の課題があることがわかりました。これらの問題を解決するために、AWSチームと慎重に協力してきました。コントロールプレーンログについては、Pinterestのインフラストラクチャにエクスポートするより良い方法を見つけたいと考えています。また、私たちは長年自前のHadoopクラスターを運用してきたため、様々なHadoopパッチを組み合わせてプラットフォームに展開することに慣れていました。
しかし、EKSを所有していないため、その贅沢はできず、EKSのKubernetesリリーススケジュールに従う必要があります。定期的なEKSアップグレードも考慮すべき点で、時間制限がある中でEKSバージョンの移行を行わなければなりません。Sparkに関しては、多くの話ができますが、簡潔に述べます。移行の目標の一つは、ユーザーにとって全ての詳細を隠蔽または透過的にすることでした。つまり、内部顧客の観点からは、単にAirflow DAGを書くだけで、それがHadoopに行くのかMokaに行くのかは重要ではありません。そのため、ジョブの設定を慎重にキャプチャし、それらがSpark on EKS環境に確実に変換されるようにする必要がありました。また、インスタンスタイプとアーキテクチャをAMDからGravitonに変更しているため、ネイティブライブラリにも注意を払う必要があります。
私たちは、それらのネイティブライブラリを Graviton にも確実に移行できるようにする必要があります。
大きな考慮点は、Hadoop から離れることで、当たり前だと思っていた多くの機能を失うことです。Hadoop は YARN スケジューラー、ログ集約、メトリクスのエクスポート、独自のファイルシステム、そして UI を提供しています。EKS に移行するにあたり、代替手段を見つけるか、ゼロからカスタムソリューションを書く必要がありました。さらに、私たちが運用している規模が大きいため、これらの代替手段の中には荒削りなものもあり、私たちが慣れ親しんでいる規模で確実に動作させるには相当な作業が必要でした。
全体として、EKS への移行は Pinterest のデータエンジニアリングとインフラストラクチャにとって変革的なものだと言えるでしょう。今後は、EKS 上の Spark での成功に勇気づけられ、ストリーミングプラットフォームに使用している Flink や、ストレージ、キャッシング、キーバリューストアの側面でより多く使用している TiDB についても同様のことを行いたいと考えています。最終的には、他のワークロードも EKS に移行することになるでしょう。ご清聴ありがとうございました。では、Vara にお返しします。
EKS上でのSparkワークロードのスケーリング:コンピューティングとオートスケーリング
はい、ありがとう Soam。多くの方が EKS 上で Spark を実行していると言われました。一つお聞きしたいのですが、 大規模に Spark を実行する際に IP 枯渇の問題に直面した方はどのくらいいらっしゃいますか?ライトで見えづらいのですが、何人かの方がいらっしゃるようですね。
これから、EKS 上で Spark ワークロードをスケールさせるためのベストプラクティスについて説明します。これらのベストプラクティスに従うことで、一般的な問題を回避できます。また、ワークロードはお客様ごとに異なるため、AWS では、お客様が望むレベルまでワークロードをスケールさせる際の問題解決をサポートいたします。
このスライドでは、セカンダリ CIDR を持つ VPC CNI が示されています。多くのデータチームやデータサイエンスチームが、Spark ジョブを実行するために EKS クラスターの作成をプラットフォームチームに依頼することがよくあります。しかし、プラットフォームチームは必要な IP 数を把握できていないことが多く、小さめのサブネットを作成してしまい、IP 枯渇の問題につながります。そこで、スライドに示されているネットワーク構成を検討することを強くお勧めします。この構成では、VPC 内に 2 つのルーティング可能なサブネットを使用して、ロードバランサーやパブリックインターフェース、オンプレミスのデータセンターとの通信に活用できます。
実際の Spark ワークロードの実行には、ルーティング不可能な IP を持つセカンダリ CIDR レンジを利用できます。ご覧のように、2 つの AZ にまたがる 2 つのサブネットがあり、セカンダリ CIDR レンジの各プライベートサブネットには 65,000 の IP があります。これにより、Kubernetes 上の Spark Pod に 120,000 以上の IP を提供できます。
大規模な運用をしているお客様でよく見られるもう一つの問題は、CoreDNS からの unknown host exception です。この問題を回避するには、Cluster Proportional Autoscaler を使用して CoreDNS を水平方向にスケーリングすることができます。デフォルトでは、CoreDNS は EKS クラスター用に 2 つの Pod でデプロイされます。しかし、何千ものノードを実行する場合、その 2 つの Pod では他の Pod にサービスを提供するには不十分です。
Cluster Proportional Autoscaler を使用すれば、スピンアップしているワーカーノードのサイズに基づいて CoreDNS Pod を水平方向にスケーリングできます。もう一つのベストプラクティスは、Node Local DNS Caching を使用することです。これは、簡単な kubectl コマンドを使って各ノードに Node Local DNS Caching をデプロイできます。これにより、DNS 解決のために CoreDNS に戻る必要がなくなり、パフォーマンスが向上します。
次にストレージに移りましょう。Kubernetes 上で Spark を実行する際、ストレージは非常に重要です。以前は Hadoop や EMR などの Hadoop ベースのフレームワークでワークロードを実行していたかもしれませんが、EKS 上で Spark を実行する場合、ストレージは全く異なる様相を呈します。このスライドにあるように、高性能で低スループットが必要な場合、NVMe SSD ベースのインスタンスを Spark で使用することを強くお勧めします。ここでは 2 つのワーカーノードがあり、1 つは c5d.xlarge、もう 1 つは c5d.12xlarge です。c5d.xlarge には 100 GB の NVMe SSD が 1 つ接続されています。c5d.12xlarge には 2 つの NVMe SSD が接続されています。これらのボリュームを Spark ジョブのシャッフルデータとして使用する場合、Spark のローカルディレクトリを特定のストレージを指すように設定できます。
Kubernetesのバッチスケジューラーとその重要性
このケースではhostPathを使用できます。スライドにあるように、hostPathを定義するボリュームがあります。しかし、EC2インスタンス内に複数のSSDがある場合、データエンジニアはそのインスタンスに2つのディスクがあるのか1つのディスクがあるのかをどのように知ることができるでしょうか?この状況を避けるため、ワーカーノード2で見られるように、RAID 0の構成を使用することをお勧めします。これらのノードをスピンアップする際、RAID 0を構成することで、各インスタンスで利用可能なすべてのディスクに対して1つのマウントパスを得ることができます。
オプション2では、EBS CSIコントローラーを使用してAmazon EBSをシャッフルストレージとして利用することもできます。これにより、永続的ボリューム要求(PVC)が作成されます。つまり、10個のエグゼキューターを要求し、各エグゼキューターが40GBを必要とする場合、40GB容量の10個のEBSディスクが作成されます。スライドに示されているように、各エグゼキューターには専用のEBSボリュームがあります。このオプションでは、NVMe SSDを使用する前のオプションでDタイプの限られたインスタンスしか選択できなかったのに比べ、現在利用可能なすべてのEC2インスタンスから選択できます。より大規模なクラスターに成長させたい場合や、パフォーマンスが懸念事項でない場合は、PVCを持つEBS CSIコントローラーを使用してワークロードを実行するこのオプション2が適しています。
このオプションが提供する2つの重要な機能は、永続的ボリューム要求の再利用と動的リソース割り当ての活用です。ここで見られるように、ワーカーノード1には2つのエグゼキューターによって接続された2つのEBSボリュームがあります。ワーカーノード1がSpot中断によって中断され、ノードが終了しても、新しいノードが立ち上がる際にこれら2つのボリュームを再利用できます。この機能により、新しいノードが立ち上がると、新しいボリュームを作成するのではなく、既存のEBSボリュームに接続します。つまり、計算を最初からやり直すのではなく、中断された箇所から処理を再開できるのです。さらに、Sparkが提供する動的リソース割り当てを活用でき、タスクの負荷に基づいて動的にスケールすることができます。
次に、コンピューティングに移りましょう。これは顧客からよく聞かれる質問の1つです。KubernetesのEKS上でSparkを実行する場合、覚えておくべき重要な点は、Sparkドライバーが単一障害点であるということです。ドライバーが停止すると、ジョブ全体が削除されてしまいます。そのため、ジョブを再起動し、計算プロセスをやり直す必要があります。この状況を避けるため、Sparkドライバーはオンデマンドインスタンスでのみ実行することを強くお勧めします。コスト効率が悪い場合は、オンデマンド価格から最大72%オフになるReserved Instancesの使用を検討することもできます。Sparkエグゼキューターパートには、Spotインスタンスを使用できます。
KarpenterやAutoScalerを利用することで、スケールアップとダウンが可能です。エグゼキューターパートが削除されたり、ノードがSpotによって中断されたりしても、ドライバーは新しいノード上でそれらのエグゼキューターを再度スピンアップし、処理を継続できます。コンピューティングに関しては、SoamがすでにPinterestのユースケースを説明しました。彼らはIntelから始めましたが、最終的にGravitonに移行し、パフォーマンスの向上とコスト削減を実現しました。私たちは、テスト済みのGraviton 3を検討することを強くお勧めします。EKS上で実行されるSparkで非常に良好な結果を示しています。
さて、EKS上でSparkを実行する際に、out of memoryエラーに遭遇したことがある方はどれくらいいらっしゃいますか?多くの手が挙がっていますね。実は、多くのお客様から、特にHadoopからEKSにワークロードを移行する際にこの質問をよく受けます。HadoopからEKSに移行する際に考慮すべき重要な点の一つが、メモリオーバーヘッドファクターです。Apache Sparkのデフォルト設定では、メモリオーバーヘッドファクターは0.1、つまり使用メモリの10%です。これはScalaやJavaのジョブを実行する場合には問題ありません。しかし、KubernetesでPySparkジョブを実行する場合は、少なくともメモリの40%をメモリオーバーヘッドファクターとして設定する必要があります。
次に、オートスケーリングについて見ていきましょう。お客様の中には、両方のオプションを使い分けている方がいます。Pinterestのように静的クラスターを使用し、Reserved Instancesを利用してEKS上でSparkワークロードを実行する方法と、コスト最適化のためにCluster AutoscalerやKarpenterを活用する方法です。私は特にKarpenterをお勧めします。Karpenterはより強力で、オペレーターベースのソリューションであり、managed node groupsやCluster Autoscalerを必要としません。必要なのは、EKSクラスターをインストールして運用することだけです。インストールが完了したら、ここに示すテンプレート(以前のバージョンではprovisionersと呼んでいましたが、バージョン0.32からはnode poolと呼ばれています)を使用して、シンプルなYAMLマニフェストとして1つのnode poolを定義するだけです。
node pool内で、instancesパートとon-demandを定義できます。Sparkジョブを実行する際、ドライバーとエグゼキューターの両方にpod templatesという概念があります。pod templates内で、Karpenterが管理するラベルを使用して、ドライバーをon-demandインスタンスで実行し、エグゼキューターをspotインスタンスで実行するように定義できます。Karpenterがこれらのノードを自動的にスピンアップし、ジョブが終了すると自動的にスケールダウンします。Karpenterの速度はCluster Autoscalerと比べて注目に値します。Karpenterは1分以内にノードをスピンアップできますが、Cluster Autoscalerは2〜3分かかることがあります。
お客様からよく聞かれる質問があります。「Cluster Autoscalerでは多くのlaunch templatesを使用し、user dataで多くの設定を行っていますが、Karpenterでも同じことができますか?」はい、EC2 node classを使用すれば可能です。これは別のテンプレートです。EC2 node classを使用して、EKS optimized AMIでuser dataを定義し、必要に応じて設定することができます。これにより、バックグラウンドでlaunch templateが作成され、ノードがスピンアップされます。カスタムAMIも使用できます。ここで示しているように、RAID 0の設定についても言及しています。KarpenterでNVMe SSDインスタンスを活用したい場合、C5 XlargeからC5 24xlargeまでの範囲のインスタンスを選択できます。スライドに示されている設定では、EC2インスタンスに利用可能なNVMe SSDインスタンスがあるかどうかを識別します。
もしあれば、そのディスクをフォーマットしてEC2インスタンスにマウントし、データエンジニアがシャッフルデータストレージとして1つのパスを使用できるようにします。
それでは、Kubernetesのバッチスケジューラーについて見ていきましょう。 Kubernetesには、デフォルトのバッチスケジューラーがあり、これは単一のキューです。数百のジョブを実行すると、それらはこのFIFOキューに入り、一度に1つのジョブを取り出してワークロードを配置します。しかし、Hadoopワークロードから移行してきて、YARNの使用に慣れているお客様は、EKS上でSparkジョブを実行する際にも複数のキューを使用したいと考えていました。この要望に応えるため、バッチスケジューラーが進化してきました。その1つがApache YuniKornで、Pinterestが使用していると言及しています。もう1つはVolcanoです。
これらのバッチスケジューラーが提供する主要な機能の1つが、マルチテナンシーとリソース分離です。例えば、組織内に複数のチームがあり、各チームが使用できるCPUとメモリの量を制御したい場合、複数のキューを作成し、各キューに割り当てを設定できます。これにより、ノイジーネイバーの状況を回避し、複数のチームが割り当てられたリソースにアクセスできるようになります。これらのスケジューラーは、公平なスケジューリングやFIFOキューなど、さまざまなワークロードキューイングオプションも提供しています。そして最も重要なのが、ギャングスケジューリングです。
ギャングスケジューリングは、これらのバッチスケジューラーが提供する重要な機能です。先ほど述べたように、Sparkジョブを実行する際は2段階のプロセスがあります。まず、スケジューラーによってドライバーが作成され、次にドライバーがエグゼキューターをリクエストし、スケジューラーがそれらをスケジュールします。ギャングスケジューリングはこのプロセスを改善します。ジョブを送信すると、ドライバーとエグゼキューター用の一時的なポッドを作成し、ポッドごとにスケジュールするのではなく、アプリケーションとしてデプロイします。これは非常に優れた機能なので、ぜひ検討してみてください。さらに、これらのバッチスケジューラーは、アプリケーションの並べ替え、ジョブの優先順位付け、プリエンプション機能も提供しています。EKS上でSparkを実行する際は、これらのバッチスケジューラーの使用を強くお勧めします。
EKS上のSparkジョブのモニタリングとロギング
次に、メトリクスについて話しましょう。EKS上でSparkを実行する際、Kubernetes上のSparkジョブをどのようにモニタリングするのでしょうか?これは重要な問題です。KubernetesとともにCNCFが提供するオープンソースのエコシステムを使用すると、ベンダーソリューションを含む複数のツールを利用できます。ここで示すソリューションでは、Prometheusを使用してメトリクスを保存し、リモートライト方式でそれらのPrometheusメトリクスをAmazon Managed Service for Prometheusに書き込んでいます。そして、Grafanaを使用してこれらのメトリクスを可視化できます。
Apache Spark用のオープンソースのGrafanaダッシュボードがいくつか利用可能で、これらを使用してメトリクスを可視化できます。これらのダッシュボードは、ノードのリソース使用率だけでなく、Sparkジョブの JVMメトリクスも表示し、シャッフルデータのパフォーマンスや各タスクのパフォーマンスを詳細なレベルで確認できます。Apache Sparkには、Prometheusサーブレットが追加されました。これはApache Spark内のクラスで、これらのメトリクスをPrometheusに書き込むことができます。これはApache Sparkに組み込まれた機能で、ドライバーとエグゼキューターのメトリクスをPrometheusに公開します。Prometheusがそれらのメトリクスを抽出し、Grafanaを使用して可視化できます。さらに、CloudWatch InsightsやCloudWatchダッシュボードも使用できます。
さて、最も重要なのは、Spark History Serverについてお話しすることです。これはデータエンジニアにとって、データ処理ジョブのチューニングやコードの出来栄えを確認するための重要なツールです。場合によっては、特定のステージが他よりも時間がかかることがあります。これを特定する一つの方法は、Spark History Serverを使って、どのステージにより多くの時間がかかっているかを視覚化し、それに基づいてコードを最適化することです。
データエンジニアは、ジョブの実行中も視覚化したいと考えることがよくあります。Spark Operatorのようなツールには、ジョブ実行中にSpark live UIを視覚化するためのingress objectを作成できる組み込み機能があります。私たちのblueprintsで利用可能なオプションを使えば、イベントログをS3バケットに書き込むように設定し、そのバケットを指すSpark S3 serverをデプロイできます。この設定により、ingressとload balancerを使って、履歴メトリクスとライブメトリクスの両方を視覚化できます。
次に、Sparkのロギングについて話しましょう。 集約を処理するHadoopとは異なり、Kubernetesでのロギングには、ドライバーやエグゼキューターを含むすべてのSparkポッドからログを抽出するためのFluent BitのようなDaemon Setが必要です。例に示されているように、同じノード内に2つのドライバーとFluent Bitポッドが実行されています。Daemon Setとしての Fluent Bitは、すべてのポッドからログを抽出し、S3バケットやその他の場所に書き込むように設定できます。S3バケットへの書き込みはよりコスト最適化されており、Athenaのようなツールを使ってこれらのログを照会し、ジョブのパフォーマンスを監視できます。
Amazon EKSでワークロードを実行する際のスケーラビリティに関する重要なベストプラクティスは、Daemon Setが行うAPIサーバーコールの数を考慮することです。オープンソースツールをEKSに組み込む際は、オペレーターやDaemon SetがAPIコールの観点からどのようにパフォーマンスを発揮するかを評価することが重要です。Fluent Bitについては、追加のフィルターの使用をお勧めします。"Use_kubelet"をtrueに設定することで、Fluent Bitはノード内のKubeletからメタデータを取得でき、APIサーバーへのコール数を減らすことができます。この方法は、APIサーバーの負荷を軽減してジョブのスケーラビリティを向上させるため、Fluent Bitをデプロイする際に強く推奨されます。
ありがとうございました。では、Alexに引き継ぎます。
Data on EKSの活用方法と今後の展望
素晴らしい、Varaさん、ありがとうございます。Soamさんとpinterestの素晴らしいストーリーと、Varaさんの貴重な洞察を聞くことができました。さて、この技術を使い始める方法について話しましょう。初めての方はもちろん、すでに使用している方にも、Data on EKSのblueprintsを利用することを強くお勧めします。右側にこれらのリファレンスアーキテクチャのスクリーンショットが表示されています。また、右下にはサンプルコードがあります。これらのblueprintsは、環境でこれらのワークロードを実行するために必要なリソースを構築するのに役立ち、テスト用のサンプルコードも提供しています。
ほとんどのblueprintsはTerraformで構築されていますが、Terraformを使用していない場合や、すでに環境でこれらのワークロードを実行している場合でも、ベストプラクティスとしてこれらを参考にすることをお勧めします。Varaさんが顧客との協業から得た、今日発表したすべての学びがこれらのblueprintsに組み込まれています。そのため、直接環境にデプロイできなくても、これらを確認することで貴重な洞察を得ることができます。
さらに、GitHubで私たちとコミュニケーションを取ることをお勧めします。画面にはGitHub issuesへのリンクのQRコードが表示されています。質問がある場合や、現在ウェブサイトにないパターンを見たい場合、または何か共有したいことがある場合は、そちらで私たちとコミュニケーションを取ることを強くお勧めし、お願いします。以上で今日のプレゼンテーションを終わります。残り約10分ありますので、グループからの質問を受け付けたいと思います。
※ こちらの記事は Amazon Bedrock を様々なタスクで利用することで全て自動で作成しています。
※ どこかの機会で記事作成の試行錯誤についても記事化する予定ですが、直近技術的な部分でご興味がある場合はTwitterの方にDMください。
Discussion