re:Invent 2024: AWSによるKafkaとLambdaを用いたリアルタイムデータ処理
はじめに
海外の様々な講演を日本語記事に書き起こすことで、隠れた良質な情報をもっと身近なものに。そんなコンセプトで進める本企画で今回取り上げるプレゼンテーションはこちら!
📖 AWS re:Invent 2024 - AWS Lambda and Apache Kafka for real-time data processing applications (SVS321)
この動画では、AWSにおけるリアルタイムデータ処理の実装方法について、Apache KafkaとAWS Lambdaの組み合わせを中心に解説しています。Amazon MSKやLambda Event Source Mappingの具体的な設定方法、認証やネットワーキングのベストプラクティス、スケーリングの仕組みなどの技術的な詳細に加え、先週リリースされたProvision modeなどの最新機能についても言及しています。特に、Kafkaのパーティション管理やオフセットメトリクスの監視、パフォーマンスチューニングなど、実運用で必要となる具体的なノウハウが豊富に含まれており、データストリーミングアプリケーションの設計から運用までを包括的にカバーしています。
※ 画像をクリックすると、動画中の該当シーンに遷移します。
re:Invent 2024関連の書き起こし記事については、こちらのSpreadsheet に情報をまとめています。合わせてご確認ください!
本編
データストリーミングとServerlessの融合:AWSのJulian Woodによる導入
データの力をビジネスに活用していますか?タイムリーなインサイトを得て、状況の変化に素早く対応できていますか?re:Inventの月曜の朝へようこそ。これが最初のセッションかもしれませんので、私と一緒にお時間を過ごしていただき、ありがとうございます。このセッションは「AWS LambdaとApache Kafkaによるリアルタイムデータ処理アプリケーション」についてです。データストリーミングとServerlessを組み合わせることで、スケーラブルかつ効率的にデータを管理・処理できます。これは私の考えでは、まさに両方の良いところを取り入れたものです。そして、あらゆる種類のビジネスから、あらゆる種類のデータに対して処理を行いたいものです。Serverlessとデータ処理には、とても興味深い共通点があると思います。
この会場には2種類の方がいらっしゃると思います。まぶしい照明で皆さんの顔はよく見えませんが、おそらくKafkaに詳しい方々がいらっしゃるでしょう。Kafkaについて少しでも、あるいは詳しくご存知の方は手を挙げていただけますか?素晴らしい。会場にはKafkaに詳しい方がいらっしゃいますね。また、このトークがKafkaとLambdaについてなので、Lambdaに詳しい方もいらっしゃいます。Lambdaについて少しでも、あるいは詳しくご存知の方は手を挙げていただけますか?良いバランスですね。つまり、LambdaとKafkaの両方に詳しい方もいれば、どちらか一方に詳しく、これらを組み合わせたいと考えている方もいらっしゃるということです。
このトークでは、KafkaやLambdaについて説明する際に、「はい、Julian、その部分はもう知っています」と思われる方もいらっしゃるかもしれません。しかし、隣に座っている方が、あなたとは異なる技術背景を持っており、私たちが統合しようとしている2つの技術について理解したいと思っているかもしれません。データストリーミングについて、いくつかの観点から説明していきます。まず、データストリーミングとは何か、AWSでのデータストリーミングのオプション、そしてデータストリーミングのアーキテクチャについて詳しく見ていきます - それがどのように見え、AWSの様々なサービスでどのように処理するのかについてです。
そして、かなりの時間を使って、KafkaストリーミングデータとLambdaを使ってどのようにデータストリーミングを行うかについて説明します。そして当然ながら最後に、パフォーマンスの管理に関するヒントやコツ、情報について説明します。なぜなら、ストリーミングデータを扱う場合、非常に素早くタイムリーなインサイトを得たいからです。ここにQRコードでリンクしているリソースページがあります。これは最後にも再度紹介します。このページには、プレゼンテーションのスライドすべてと、様々なトピックや詳細情報へのリンクが含まれています。この1時間で十分な情報が得られなかった場合は、家に帰ってからさらに読み進めることができます。
私はJulian Woodです。写真では少し髪が長めですね。私はここでDeveloper Advocateとして働いており、Serverlessチームと多くの仕事をしています。開発者やビルダーの方々が、特にストリーミングデータを使用したServerlessアプリケーションをどのように最適に構築できるか、そしてそれらがServerlessとどのように上手く組み合わさるかを理解するお手伝いをすることが大好きです。また、私たちのプロダクトチームが最高のServerlessプロダクトや機能を構築できるよう、開発者の皆さんとプロダクトチームの橋渡しをすることも大好きです。
データストリーミングの進化:リアルタイムインサイトの重要性
データストリーミングについて見ていきましょう。世界は左側に示されている従来型の分断されたデータから変化しています。ビジネスの様々な場所にデータが存在し、それはデータベースやOLTP、様々なデバイス、異なる事業部門など、あらゆる場所に散在しており、それぞれがデータに対して異なるユースケースを持ち、データは様々な形で分断されていました。しかし、データに基づいてアクションを起こし、優れたインサイトを得たい現代では、データアーキテクチャはより連携されており、タイムリーなインサイトを得るためにそのデータにアクセスする必要のある様々なサービスや機能が存在しています。
S3をデータレイクとして使用し、Amazon Athenaで様々な処理を行うかもしれません。また、SageMakerを使用した機械学習を行ったり、OpenSearchクラスターでそのデータにアクセスしたり、その情報をKinesisという別のデータストリーミング製品や、今日お話しするKafkaベースのAmazon MSKに送信したりするかもしれません。現代のデータに関する重要なポイントは、このような接続性を通じてインサイトを導き出せることです。また、時間的な側面もあります。従来の日単位や時間単位のバッチ処理アプローチから、月次や2週間ごと、週次の処理を行うようになりました。このバッチインテリジェンスは素晴らしく、大量のデータを処理できますが、特に迅速とは言えません。
私たちは、データに対して時間的に重要なアクション可能な、予防的で予測的な分析と情報を得られる世界に移行しています。両者を比較すると、新しい情報に反応するこのタイムリーな対応が重要になってきています。先ほど述べたように、バッチ処理では、時間単位のサービスログ、リアルタイムメトリクス、システムによって作成される週次や月次の請求書などがあります。これが、より反応的なシステムに移行することで、例えば支出アラートのようなものが可能になります。つまり、毎週や毎月の請求書を待つのではなく、請求額に上限を設定し、何か起こった場合に即座に対応できます。クリックストリームデータを分析する場合、毎日の終わりにレポートを受け取るのも有用ですが、ライブデータを得る方がよりアクション可能です。
ウェブサイトや製品を使用している人々のライブクリックストリームデータを取得することで、彼らの行動をより素早く分析し、対応することができます。わかりやすい例として、金融機関のような顧客は不正検知を行いたいと考えています。不正検知レポートは一日の終わりにだけ提供されても特に有用ではありません。リアルタイム分析により、クレジットカードの不正使用や不正が発生した場合に、疑わしい取引を即座に停止することができます。
これは、バッチ処理と比較したストリーム処理における時間の重要性についてであり、世界中の様々なユースケースに基づいてこれらのインサイトを得ることができます。ストリーミングを初めて使用する人々は、それがどこに適しているのか疑問に思うかもしれませんが、基本的にはあらゆる場面で活用できます。産業用の情報、例えば何千何百万というデバイスから大量の情報を収集するセンサーや、プレイヤーのステータスをリアルタイムで更新する必要のあるオンラインゲームなど、これらがストリーミングデータで実現できることです。
IoTは分かりやすい例です。Set-top box、家電製品、あるいは組み込みセンサーなどから収集されるデータがあります。収集されたデータは、多くの場合、データレイクに保存され、そこでさまざまなソースがそのデータに対して処理を行います。例えば、S3をデータレイクとして使用し、S3バケット内の情報をソースオブトゥルースとして、これらの異なるアプリケーションを連携させることができます。
もう1つの例としてログがあります。サーバーから送信されるアプリケーションログがあります。今回はServerlessについての話ですが、サーバーからもServerlessからもログは発生します。これらはデスクトップやモバイルアプリケーションからのログかもしれません。このデータを継続的に処理し、メトリクスを生成し、リアルタイムダッシュボードを作成し、データを集約して別のS3バケットやデータレイクに移動するアプリケーションを構築できます。
ストリーミングデータの特性とAWSにおけるApache Kafkaの位置づけ
ストリーミングデータはパイプラインでもあります。データが何であり、どこにあるかだけでなく、組織にデータがどのように入ってきて、最終的にどこへ行くのかというフローなのです。左側にはソース、つまりリアルタイムデータを生成するデバイスやアプリケーションがあります。多くの場合、高速でデータが発生し、特にリアルタイムで数万のIoTデバイスからそのデータを取り込むためのサービスが必要になります。
そのデータは一定期間受信できるように保存され、その期間中は何度でも再生できる必要があります。データの再生は、ストリーミングデータを重要なものにする鍵となる要素です。分析や統合を通じてユーザーステータスに接続するさまざまな方法があります。レコードは順序通りに読み取られ処理され、リアルタイム分析を可能にします。データの送信先として、データウェアハウス、別のS3バケット、OpenSearchクラスター、イベント駆動型アプリケーションなどが考えられます。
ストリーミングデータの特徴について考えると、多くの場合、大容量であることが挙げられます。アクセス数の多いウェブサイトのクリックストリームデータでは、毎日何兆ものイベントを収集する可能性があります。また、多くの場合、離散的ではなく連続的です。つまり、バッチ処理ではなく、センサーから毎分あるいは毎秒データが送られてくるのです。ストリーミングデータの重要な特徴は、多くの場合、順序付けされていることです。例えば、チャットアプリケーションでメッセージが受信者に順不同で届いたら、意味が通じなくなってしまいますよね。
ストリーミングデータを扱う場合、時間的な制約が重要な要素となります。先ほど金融分析の例を挙げましたが、不正検知はデータの損失を防ぐために可能な限り迅速に行う必要があります。キューベースのシステムと比較した場合、ストリーミングデータの特徴は、複数のコンシューマーが存在することです。キューベースのシステムは通常、メッセージを1つずつ処理し、処理後にそのメッセージをキューから削除しますが、ストリーミングの特徴として、そのキューにデータが保持され続けます。アプリケーションの異なる部分で、複数のコンシューマーが異なるタイミングでそのデータを読み取ることができます。
では、AWSにおけるストリーミングデータについて見ていきましょう。今日は特にApache Kafkaについて話します。Apache Kafkaは時として、アイデンティティの危機とも言えるような状況にあります。なぜなら、これは企業のService Busなのでしょうか?ビジネスで発生するすべてのイベントを配置する場所なのでしょうか?あるいは、単にデータを置く場所なのでしょうか?REST APIのような使い方をして、データを処理した後で非同期に処理する、という使い方をする人もいます。特定の時点のデータを照会できることから、データベースのように考える人もいます。
これがKafkaの持つパワーを人々が完全に理解できない理由の一つです。私が思うに、その理由は、Kafkaがこれらすべての異なる機能を提供しており、しかもそれぞれに意味があるからです。Kafkaは自身をストリーミングプラットフォームとして売り込んでいます。なぜなら、RESTによる非同期処理を使用してデータを集約し活用する様々な方法を提供しているからです。Service Busとしても機能し、データストアとしても機能します。完全なSQLのセマンティクスは持っていませんが、人々はこれをデータベースとして考えています。
Kafkaを実行する方法は多岐にわたります。オンプレミス環境、ノートPC、コロケーションデータセンター、クラウド上のEC2インスタンスなど、どこでもKafkaをインストールして、望む通りに実行・管理することができます。AmazonにはAmazon MSK(Managed Streaming for Kafka)があり、これは自分でKafkaを管理する必要のないマネージドサービスです。また、Kafkaを得意とするサードパーティも存在します。LinkedInから生まれたKafkaを定義した企業の一つであるConfluentは、クラウド内でKafkaを利用できる大規模なConfluent Cloudプラットフォームを運営しています。また、KafkaのConfluentバージョンをより詳細に制御したい場合は、オンプレミス用のConfluent Platformを実行することもできます。
これらの大企業がKafkaを使用している一方で、RedpandaやWarpstreamなど、Kafka市場に新規参入する企業が素晴らしい革新を起こしています。これらの企業はKafkaを次のレベルへと進化させています。Kafka互換のAPIを実行しながら、コンピューティング面、ネットワーキング面、またはスケーラビリティ面を劇的に簡素化しています。最近Confluentに買収されたWarpstreamは、より効率的で低コストなクラスターを実行する非常に興味深い方法を開発しました。
Amazon MSKとKafkaの基本概念:ProducerからConsumerまで
Amazon MSKは、Apache Kafkaをより安全で、高可用性があり、組織にとってアクセスしやすいものにします。開発者はKafkaに必要なインフラストラクチャではなく、アプリケーション開発に集中できます。KafkaクラスターやKafka Connectクラスターのプロビジョニング、設定、チューニングについて心配する必要はありません。Apache Kafkaのオープンソースを完全にサポートしているため、既存のアプリケーションやツールはアプリケーションコードを変更することなく、そのまま動作します。
AWSのセキュリティは「ジョブゼロ」、つまり最も重要な優先事項です。非常に安全で、VPC内にアプリケーションを簡単にデプロイでき、AWSの認証・認可と統合されています。完全マネージド型であるため、大規模な運用が可能です。他のプロバイダーと比べて10分の1から13分の1程度のコストで、この完全マネージド型Kafkaを提供しようとしています。そのため、MSKを使用することには大きなメリットがあります。オンプレミスで独自のKafkaを運用する場合、ネットワークからHVACまで、すべてを自分で行う必要があります。規模を効率的に運用できたとしても、運用のオーバーヘッドでコストがかかってしまいます。
Amazon MSK Serverlessは、Kafkaのサービスランタイムの一種で、アプリケーションを実行するためのクラスターの設定、最適化、セキュリティ確保、運用を行う必要がありません。データ処理ジョブのリソースのオーバープロビジョニングやアンダープロビジョニングを心配する必要はありません。これはサービスの一部として処理されます。ストリーミングするデータ量とそのデータを保存する期間に応じて料金を支払います。MSKはそのデータを最適に処理する方法を見つけ出します。これは、クラスターの管理・運用を避け、単にオープンソースフレームワークを使用してアプリケーションを実行したいと考えているお客様にとって非常に有益です。
Kafkaの世界では、情報を生成するProducer(IoTデバイスやアプリケーションの一部)と、その情報を消費する必要のあるConsumer(1つまたは複数)があります。Kafkaは、その間でBrokerとして機能します。Kafka Brokerは、実際にはKafkaがインストールされたEC2インスタンスです。MSKサービスを使用している場合でも、RedPandaやWarpStreamなどを使用している場合でも、基本的にどこかにKafkaがインストールされています。バイナリや他の形式かもしれませんが。
これがBrokerと呼ばれるもので、Kafkaを実行・管理するソフトウェアです。MSKサービスでは、これが管理されています。
Topicとは、類似したレコードを保存するためのメッセージチャネルです。組織によっては、あらゆるものに対して独自のTopicを持っていたり、アプリケーションごとに異なるTopicを持っていたり、部署ごとに異なるTopicを持っていたりするかもしれません。これは情報を論理的に分離する方法の一つです。今回は、単一のTopic Aについて説明していきます。Partitionはデータを分割する方法で、Kafkaが処理能力とスループットを管理できるようにする仕組みです。異なるPartitionは、高可用性を確保するために複数のBrokerにレプリケーションされます。
Kafka Partition内では、Producerが特定のPartitionにメッセージを送信し、各Partitionは順序付けられたレコードの集合として指定できます。レコードは常にPartition内で順序が保たれ、新しいレコードはPartitionの先頭に追加されます。メッセージは2つの方法で自然に期限切れになります:ストレージによる方法(Kafkaクラスターに特定の容量のデータを保存するよう設定)と、時間による方法(1日や1年などのメッセージ保持期間を指定)です。
メッセージは、Partition Keyと呼ばれる様々な要素を通じて特定のPartitionに配置されます。Kafkaはメッセージを受け取ると、Partition KeyにMD5ハッシュ関数を適用し、それによって最終的にどのPartitionに配置されるかを決定します。Brokerは常にハッシュを使用して同じPartitionに配置します。同じPartition Keyを持つものは必ず同じPartitionに配置されるため、Partition単位で順序が保たれます。複数のPartition Keyが1つのPartitionに配置される可能性があり、それらは異なるProducerから送信される可能性があります。
LambdaとKafkaの統合:Event Source Mappingの仕組み
Partitionの割り当てには、アプリケーションのニーズに応じて異なる戦略を使用できます。ランダムハッシュを使用して、レコードをランダムに異なるPartitionに送信し、利用可能なすべてのPartitionに効果的にロードバランシングすることができます。また、同じタイミングで到着するレコードを単一のPartitionにグループ化するために、時間ベースのハッシュを使用することもあります。さらに、顧客IDをKeyとして使用するなど、アプリケーション固有のPartitioning方法でより細かい制御を行いたい場合もあります。これにより、特定の顧客のすべてのレコードが同じPartitionにルーティングされ、順序通りに到着することが保証されます。これは下流の集計ロジックで特に有用です。
ただし、顧客IDなどのアプリケーション固有のPartitioningを使用する場合、スループット容量への影響を考慮することが重要です。例えば、ある顧客が大量のレコードを生成し、他の顧客があまりレコードを生成しない場合、その大量のレコードは常に同じPartitionに配置され、他のPartitionはあまり仕事をしないことになります。これは、スループットに関連してPartitionの動作を考える上での異なる視点の一つです。
Kafkaには、ストリーム内の位置を示すオフセット番号という概念もあります。これは、Consumerがストリームのどこを読んでいるかを追跡するために使用されます。Kafkaのストリーミングアプリケーションでは、1つまたは複数のConsumerが存在する可能性があり、複数のConsumerを持つことは非常に一般的です。例えば、Consumer 1はストリーム内のレコード番号4、つまりオフセット番号4の位置にいます。また、Consumer 2はストリーム内のオフセット番号7の位置にいます。Kafka自体が各Consumerがどのレコードにいるかを記憶しているため、Consumerが次のレコードセットをリクエストした際、Consumer 1がオフセット番号4にいることを認識し、それに応じて次のレコードセットを提供します。
AWSでストリーミングデータを処理するためのオプションをいくつか見ていきましょう。利用可能なアプリケーションがいくつかあります。クラウド上のイベントバスルーターであるAmazon EventBridgeを使用できます。Amazon EventBridgeファミリーには、ポイントツーポイントの統合のためのEventBridge Pipesも含まれています。Lambda Event Source Mappingを使用することもでき、本日のセッションの大部分でこのサービスの観点から説明していきます。Kafkaネイティブな方法としては、Kafka Connectを使用できます。これはKafkaクラスターにインストールして管理するもので、レコードを他の消費アプリケーションにプッシュすることができます。
30分間の計算など、状態を考慮した計算を行いたい、より高度な分析ワークロードの場合は、Managed Apache FlinkやAWS Glueで処理する方が適しているでしょう。これはデータを処理する異なる方法ですが、FlinkやAWS Glueを実行するにはより分析的なスキルセットが必要です。分析ワークロードには、サービスコンポーネントが最適な方法ではないかもしれませんが、サービス方式の方が多くのアプリケーション開発者にとってアクセスしやすい方法です。
Amazon EventBridgeは、AWSの完全マネージド型イベントルーターサービスで、イベントをダウンストリームのConsumerに接続することができます。AWSのイベント、カスタムイベント、そしてEventBridgeと直接統合される多くのSaaSパートナーのイベントを扱うことができます。メッセージを別のサービスに送信する前に、フィルタリングや変換など、さまざまな処理を行うことができます。レコードを取得して何らかの処理を行い、他の場所に送信するという点で、EventBridgeとKafkaは並行的な関係にあります。
違いとしては、Kafkaは複数のConsumerを持つイベントストリームであるため、イベントをプルする方式に重点を置いています。Kafkaでは、Producerがメッセージを特定の方法で作成・フォーマットする必要があり、Consumerもそれを受信できる必要があるため、スマートなエンドポイントが必要です。Kafkaはオープンソースで、ローカルのラップトップでも、どこでも実行でき、Amazon MSK、Confluent、Redpandaなどの完全マネージド版も利用できます。一方、EventBridge側は、ProducerがAmazon EventBridgeにメッセージをプッシュするプッシュメカニズムです。EventBridgeは異なるモデルを採用しており、イベントのストリーミングや順序付けではなく、個別のイベント、AWSやパートナーとの統合、JSONメッセージをやり取りするローコードアプローチに重点を置いています。
また、本日のセッションで取り上げる理由の1つとして、Amazon EventBridgeのためのKafka Connectorが存在します。これを使用すると、Kafka上でKafka Connectorを実行し、Amazon EventBridgeにメッセージを送信することができます。このSink Connectorは、個別のイベントを管理する場合に特に役立ちます。EventBridgeは個々のイベントのプッシュ、管理、フィルタリング、ルーティングを行うためのものであり、Kafka Sink Connectorを使用して直接EventBridgeに接続することができます。これはオープンソースで、すぐに利用可能です。Kafkaを使用している方々にとって、これは簡単に管理できる方法です。また、もう1つの有用なユースケースがあります。それは、Kafkaからカフカへの処理を行う場合です。複数のアプリケーション、部門、機能にまたがる全てのデータを、特にオンプレミスで運用している場合、単一のKafkaクラスター内に保持することがあります。このデータは多くの場合、分割する必要があり、特定の部門やアプリケーションから個別のイベントを取得できるようにする必要があります。
時には、大規模なKafkaクラスターから個別のイベントを取得し、それらの個別のイベントをAmazon EventBridgeを介してAWSに接続するための別のKafkaクラスターを用意することもあります。EventBridgeはサーバーレスなので、使用した分だけ支払えばよく、Lambda、インターネット上の任意のAPI、SageMakerを使用した機械学習など、優れた統合機能を提供します。これは、Kafkaの世界とネイティブなAWSの世界を結びつける優れた方法です。
Lambda Event Source Mappingの利点と動作原理
EventBridgeには、EventBridge Pipesと呼ばれる別のバリエーションがあり、これはポイントツーポイントのアプリケーションを作成するためのより完全に管理された方法です。2つの異なるポイントを接続するためのインフラストラクチャとメカニズムを管理します。Amazon MSKやKafkaのイベントソースからデータを取得し、フィルタリング、バッチ処理、さまざまな操作を実行して、異なるAWSサービスに送信することができます。例えば、Amazon EventBridge Pipesを使用したKafkaのユースケースを見てみましょう。異なるタイプのレコードを分離するフィルターを使用してKafkaからデータを取得し、一部のメッセージはML推論のためにAmazon SageMakerに送信し、別のメッセージはLambda関数を通じてインターネット上の任意のAPIに送信します。
AWS Lambdaに関連するKafka特有の機能について見ていきましょう。Lambdaでデータを消費する主な方法は2つあります。Lambdaは任意のコードを記述できる非常に柔軟なオプションであり、Amazon EventBridgeやEventBridge Pipesのようなポイントツーポイントの統合だけではありません。1つ目の方法は、Kafka Sink Connectorを使用してAWSとLambda関数にデータをプッシュするKafkaの方法です。Kafka Sink Connectorは、Kafkaパーティションのポーリングとメッセージを配信するためのLambda関数の呼び出しを処理します。もう1つの方法は、Lambda Event Source Mapping(ESM)と呼ばれるLambdaのネイティブな統合機能を使用することです。LambdaサービスがこのESMを代わりに実行し、Kafkaクラスターからデータを取得してLambdaにメッセージを送信します。
Sink Connectorは、同期または非同期でLambdaを呼び出すことができます。同期モードでは、トピックとパーティション内のレコードは順次処理されますが、異なるパーティションのレコードは並列で処理できます。Lambdaから成功レスポンスがあれば、その情報を追跡するために別のトピックに送信することができます。非同期モードでは、ファイアアンドフォーゲット方式で、必要に応じてレコードが処理されます。Lambdaはメッセージの受信を確認し、ベストエフォートのシーケンシャルベースで非同期に処理が続行されます。
Lambdaは非同期で呼び出されるため、At-least-onceのセマンティクスを持っています。つまり、エラー処理やリトライのロジックにより、処理が複数回実行される可能性があります。そのため、関数はべき等性を持つ必要があります。べき等性とは、同じアクションを一度実行しても同じ結果が得られることを意味します。例えば、コーヒーを購入する際にクレジットカードで2回請求されても、コーヒーショップで問題が発生してリトライが行われた場合でも、1回分しか請求されないようにする必要があります。Sink Connectorは最大10接続までスケールし、Confluentに申請することでさらに増やすことができます。MSK Connectのキャパシティは、Worker数に基づいて管理されます。
エラー処理に関しては、エラーはエラートピックに書き込まれた後、処理が続行されます。同期モードでは、Kafkaが自身でリトライを処理する必要があります。非同期モードでは、Lambdaが内部キューにメッセージを配置し、最初の試行に加えて2回のリトライを実行することでこれを管理します。また、失敗したメッセージをOn-failure destinationと呼ばれる場所にプッシュすることもできます。これについては後ほど説明します。これにより、メッセージが失われることはなく、Sink Connectorを使用したKafka処理に多くのオプションが提供されます。
今日のメインテーマは、Lambda Event Source MappingとMSKの連携の仕組みについてです。Lambda Event Source Mappingは、ソースからアイテムを読み取るLambda Pollerリソースです。MSKはソースの1つに過ぎず、AWSの別のストリーム処理サービスであるKinesisを使用することもできます。DynamoDB Streams、SQS、Amazon MQなども利用可能です。Lambda ESMは多くの異なるサービスからのポーリングが可能で、これらのポーリングによってサービスからメッセージを取得し、必要に応じてフィルタリングやバッチ処理を行い、最終的にLambda関数に送信して処理を行います。
前述の通り、これらのPollerは私たちが代わりに実行します。コードを管理したり、パーティションのオフセットを管理するコードを書いたりする必要はありません。Lambdaサービスがそのオフセットの管理を担当し、ユーザーはビジネスロジックを書くだけで済みます。ほとんどのシナリオでは、これは実際に無料です。先週リリースされたばかりのProvision Modeと呼ばれるLambda Pollerの新機能については、後ほど説明します。基本的に、LambdaのPollerリソースはほとんどのユースケースで無料です。無料のものは非常に便利で、検討する価値があります。
ストリームデータ処理にLambdaを使用する利点は、Lambdaがスケーリングも管理してくれることで、カスタムビジネスロジック、つまりLambda関数が何を行い、どのようにデータを処理・分析するかに集中できることです。基盤となるインフラストラクチャを気にする代わりに、Lambdaは多数のランタイムをネイティブにサポートしています。Python、.NET、Javaなどが使用可能ですが、カスタムランタイムも実行でき、PowerShellやCOBOLを実行しているお客様もいます。Lambdaは実際には言語に依存せず、どの言語でも実行できます。これはMSKの世界では非常に便利で、複数の異なるコンシューマーや複数の異なるチームを持つことができます。
MSKはJavaベースのシステムですが、Java開発者だけでなく、Pythonを得意とする機械学習・分析チーム、あるいは.NETを得意とする別のチームなど、様々なチームが関わることがあります。Lambdaとイベントソースマッピングの優れている点は、Lambdaがポーラーを実行するため、開発者は好みのプログラミング言語を使用できることです。MSKからのデータ処理に使用する言語を気にする必要はなく、もはやJavaの世界だけに限定されません。
Event Source Mappingの詳細設定:フィルタリングからバッチ処理まで
イベントソースマッピングでは、レコードの処理開始ポイントを設定できます。新しいレコードを処理するか、古いレコードを処理するかを選択できます。レコードはLambda関数に配信され、バッチあたり最大10,000メッセージまで設定可能です。Lambda関数が処理を完了すると、その結果がKafkaに返されます。これはEventBridge Pipesと似ていると思われるかもしれませんが、実はEventBridge Pipesは内部でLambdaイベントソースマッピングリソースを使用しています。シンプルなデータ変換のポイントツーポイント統合にはEventBridge Pipesを使用し、より複雑なワークフローや、複数のイベントソースからデータを集約する必要がある場合にはLambdaを使用することができます。
イベントソースマッピングサービスの内部をもう少し詳しく見てみましょう。ソースマッピングには左側にポーラーがあり、1秒ごとにKafkaブローカー、トピック、パーティションに新しいメッセージがあるかどうかをチェックします。まず、イベントソースマッピングの設定の一部として、Kafkaストリーム内のどこから開始するかを設定できます。選択肢としては、ストリーム内の最も古いレコードから読み始める「trim horizon」、特定の時点から開始する「タイムスタンプ指定」、またはストリーム内の最新メッセージから処理を開始する「latest」があります。
「latest」を選択すると、イベントソースマッピングはバックグラウンドでポーラーとインフラストラクチャをスピンアップする必要があります。これについて開発者が気にする必要はありませんが、数秒かかります。そのため、OKをクリックしても最新のレコードがすぐには取得できない場合がありますが、ポーラーリソースが起動して実行されると、最新のレコードを取得できるようになります。Consumer IDについても興味深い点があります。MSKの場合、イベントソースマッピングの作成時にConsumer IDを設定できます。Consumer IDはMSKの世界でコンシューマーを一意に識別するもので、これはMSK、自己管理型Kafka、Confluent、その他のイベントソースでも機能します。Consumer IDを指定しない場合、LambdaイベントソースマッピングはUUIDを使用して作成します。なお、イベントソースマッピングを削除しても、他の場所で使用する可能性があるため、コンシューマーグループIDは削除されません。
カスタムConsumer IDを作成する理由は何でしょうか?Kafkaが新しいコンシューマーがクラスターに接続されているのを検出すると、カスタムConsumer IDを使用しているかどうかを認識します。その後、最初のコンシューマーとして開始するか、別のConsumer IDに参加します。新規に開始するのではなく、前回のKafka Consumer IDの続きから、あるいは最後のレコードがすでにストリームから出ている場合はtrim horizonから継続できます。これは災害復旧ワークロードでよく使用されます。例えば、Apache Mirror Maker 2を使用してKafkaクラスターを別の場所に複製する場合、カスタムコンシューマーグループIDを使用してミラーリングされたKafkaクラスターからメッセージを処理できます。問題が発生した場合、Kafkaクラスターをフェイルオーバーし、Lambdaは以前の続きから処理を継続できます。
MSKクラスターからPollerがレコードを取得した後、必要に応じてフィルタリングを行うことができます。これは、不要なメッセージの処理を避けたい場合や、特定のアプリケーション、ビジネスユニット、その他の基準に基づいたメッセージのみを必要とする複数のConsumerがいる場合に便利です。フィルタリングの処理はFunction実行前に行われるため、必要なメッセージのみを処理することでコストを削減できます。フィルターはJSONパターンで設定され、多様なパターンマッチングオプションが用意されています。例えば、タイヤ圧が32未満のメッセージのみを処理し、別のConsumerが32以上のメッセージを処理するといった設定が可能です。
注目すべき機能として、EventBridge Pipesを使用する場合、Event Source Mappingを使用し、EventBridge自体も同じルールライブラリとシンタックスを使用するため、サービス間で一貫性を保つことができます。つまり、EventBridge PipesでEvent-Drivenアプリケーションを構築する場合も、EventBridge Event BusとLambda Event Source Mappingでレコードを処理する場合も、同じシンタックスでフィルタリングロジックを使用できます。このフィルタリング機能により、関連するメッセージのみを処理するために、異なるフィルターを持つ複数のEvent Source Mappingを設定することができます。
フィルタリングの後、Event Source Mappingはメッセージをバッチにグループ化してFunctionに送信します。バッチは3つの異なる方法で定義されます。まず、即時処理から最大5分までの待機時間を設定できるBatch Windowがあります。次に、1レコード(デフォルト)から最大10,000レコードまでのBatch Sizeを設定できます。そして、API Gatewayやその他のサービスの背後で実行される場合も含め、すべてのLambda Event Sourceに適用される6MBのペイロードサイズ制限があります。Event Source Mappingは、この6MB制限に近づくことを基準にBatch Sizeを形成します。
実際の例で3つのシナリオを見てみましょう。Batch Windowの最大値を40秒、Batch Sizeを10、Lambdaの最大ペイロードサイズを6MBと設定した場合、異なる結果が得られます。最初のシナリオでは、40秒が経過した時点で、その期間中の5つのレコードがLambda Functionに送信されます。2番目のシナリオでは、Batch Windowが終了する前に、利用可能な10個のレコードがBatch Size制限に達して送信されます。Batch Windowは低ボリュームの処理に特に有用ですが、高ボリュームの場合は、大きなBatch Sizeによる潜在的な遅延を考慮する必要があります。3番目のシナリオでは、Batch WindowやBatch Size制限に達する前に6MBのペイロード制限に達し、メッセージがLambdaに送信されます。
バッチが形成されると、Event Source MappingはそれをLambdaに同期的に送信し、応答がMSKに返されます。これがKafkaのバッチの形式であり、Lambda Functionがパースして処理することができます。
レコード情報、パーティション情報、タイムスタンプ、シーケンス番号、読み取り元のオフセット番号、処理対象のデータが含まれています。これはJSON形式なので、Lambda関数内で必要に応じてそれぞれのデータを確認することができます。
認証オプションについて見ていきましょう。SAL SCRAMは、ユーザー名とパワスワードを保存するKafkaでよく使用されるアプリケーションオプションです。これらのシークレット情報を保存するのに適しているのがSecrets Managerです。MSKを使用する場合、事前に決められた命名規則があり、シークレット名は「Amazon_MSK_」で始める必要があります。Event Source Mappingは自動的にSecrets Managerに接続して情報を取得し、認証を行います。Amazon MSK Serverlessは実はIAM認証のみをサポートしているのですが、これは非常に便利です。なぜなら、AWSのネイティブな統合機能を使用して、他の多くのアプリケーションと同様にIAMを利用できるからです。
LambdaとKafkaの連携におけるネットワーキングとスケーリング
MSKチームは約1年前に、Python、Node、Go向けのオープンソースライブラリをリリースしました。これにより、IAM認証をより簡単に使用できるようになりました。これは特に、Kafkaに対してデータを送信する必要があるプロデューサーに関連性が高く、Javaだけでなく、さまざまな言語を使用できます。Event Source Mappingを使用する場合、ポーリングと認証はLambdaが処理するため、関数内でデータを消費するだけで済むことを覚えておいてください。
Kafkaのもう一つの重要な側面はネットワーキングです。特に自分でKafkaを運用している場合、複数のアベイラビリティーゾーンにまたがると、多くのネットワーク設定が必要になります。自己管理型のKafkaを使用する場合、Lambdaからの消費時のネットワーク設定は、Lambda関数と同様です。というのも、一般的にインターネット上で利用可能だからです。デフォルトでは、Event Source MappingはAWSへのアクセス権を持っていますが、VPC内でのアクセスは設定されていません。Kafkaクラスターにアクセスするための特定のサブネットとセキュリティグループを設定できます。Kafka クラスターのESMは、Lambda関数を含むVPCと同じまたは異なるアカウントからアクセス可能なクラスターにアクセスでき、クロスアカウント接続が可能です。
Lambda関数がKafkaインスタンスをプルするためには、各リージョンのパブリックサブネットでNATゲートウェイが稼働していることを確認する必要があります。テストや開発ワークロードの場合は、1つのアベイラビリティーゾーンに単一のNATゲートウェイを設置することもできますが、本番ワークロードの場合は、各アベイラビリティーゾーンにNATゲートウェイを1つずつ配置して高可用性を確保することが望ましいでしょう。MSKを使用してMSKクラスターと通信する場合、MSK ESMは、Lambda関数がVPC内のリソースに接続する方法と同様に、クラスターが使用する各サブネットにElastic Network Interfaceを作成します。
MSK ESMはLambda関数のVPC設定を使用しないことに注意が必要です。代わりに、ESMは対象のMSKクラスターで設定されているサブネットとセキュリティグループの設定を自動的に使用します。Event Source Mappingはクラスターとやりとりしてセキュリティ情報とセキュリティグループを取得します。MSKクラスターのセキュリティルールは、自身との間でインバウンドとアウトバウンドのアクセスを許可する必要があり、ブローカーが使用する認証ポートの1つにセキュリティグループを追加する必要があります。
MSKのVPCにはアウトバウンドのネットワーク接続も必要ですが、これはオンデマンドモードでのみ必要です。先週リリースされた新しいプロビジョンモードでは、このようなセットアップは不要になり、シンプルになりました。オンデマンドのLambda Event Source Mappingのスケーリングでは、LambdaとSTS(Secure Ticket Service)に接続する必要があります。これはLambdaがLambda関数を呼び出すためのロールを引き受けるためです。SASL SCRAMやSecrets Managerを使用する他の認証オプションを使用している場合は、そちらへのアクセスも必要です。AWSサービスへのネットワーク接続は、VPCエンドポイントやNATゲートウェイを使用して行うことができます。VPCエンドポイントの場合、プライベートサブネットからLambda、STS、必要に応じてSecrets Managerのエンドポイントを作成する必要があります。
VPCエンドポイントなどを使用している場合、これらは通常、Kafkaクラスターに接続するために必要な標準的なセットアップです。Lambdaでも同様のセットアップが必要になります。NATゲートウェイを使用する場合、開発テスト用のパブリックサブネットにNATゲートウェイを配置する必要があります。開発テストでは1つで済みますが、本番環境のワークロードでは、複数のアベイラビリティーゾーンで利用できるように適切に設定する必要があります。
次に、あらゆる種類のKafkaクラスターに接続する際のLambdaのスケーリングについて説明しましょう。Kafkaのイベントソースを最初に作成すると、Lambdaはトピック内のすべてのメッセージを処理するために1つのコンシューマーポーラーを割り当てます。つまり、すべてのトピックにわたるすべてのメッセージに対して単一のポーラーが割り当てられます。この初期スケーリングは適度に速く行われます。1年前、LambdaはEvent Source Mappingの初期起動プロセスを改善し、可能な限り早く起動し、複数の関数を呼び出してスケーリングできるようになりました。1つのポーリングリソースで、パーティション内の順序を維持しながら、複数のLambda関数を使用して処理を行うことができます。
Lambdaはワークロードに基づいてコンシューマーの数を自動的に増減できます。これは自動的に行われるため、管理する必要はありません。Lambdaは1分ごとにトピック内のすべてのパーティションのコンシューマーオフセットを評価します。ラグが大きすぎる場合、つまりLambdaがパーティションからのレコード処理に追いつけない場合、Lambdaは自動的にトピックに追加のポーラーを追加します。これによってより多くのLambda関数にスケールアウトされますが、このプロセスには最大3分かかる可能性があります。Lambdaが追加のポーラーが必要だと判断してから、それらのポーラーリソースを作成してセットアップするまでに3分のデルタがあります。
Lambdaは、パーティション数以下までConsumerポーラーをスケールアップすることができます。つまり、Lambda関数の数がパーティション数を超えることはありません。これは、パーティションごとのメッセージ順序を保持する必要があるためです。最大Consumer数は常にトピックのパーティションあたり1 Consumerとなっており、この順序付けをシンプルに保つことができます。これはまた、スループットの管理にも役立ちます。Lambdaサービスは1秒に1回パーティションをポーリングしてレコードのセットを取得し、そのレコードのバッチで関数を同期的に呼び出します。処理が成功すると、次のレコードのバッチに進んでポーリングを続けます。
エラー処理とパフォーマンス測定:LambdaとKafkaの最適化
処理が失敗した場合、リトライとエラーの動作はEvent Source Mappingの設定に依存します。デフォルト設定では、Lambdaは同じレコードのバッチで関数を再度呼び出し、バッチが成功するか、ストレージまたは時間の制限でレコードがストリームから消えるまで、これを継続します。つまり、処理に何か問題がある場合、順序を維持するためにLambdaは問題が解決されるまで再処理を続けます。無限ループではありませんが、何かが解決されるまで再処理し続けます。これは恐ろしく聞こえるかもしれません。ストリーム処理が全て停止してしまうと思うかもしれませんが、厳密に順序を維持したい場合は、実際にこの動作が望ましいこともあります。
ストリームのブロックを防ぐには、エラー処理を適切に行う必要があります。バッチでエラーが発生し、バッチ全体を無期限に再処理したくない場合は、Failure Destinationと呼ばれる機能を設定できます。これにより、レコードのバッチを2つの異なるオプション(SQSとSNS)に送信でき、どのレコードまたはどのバッチのレコードが失敗したかのメタデータを送信します。その後、別のConsumerやアプリケーションで、Kafkaトピックからそれらのメッセージを取得し、Lambda関数のコードが正しく処理していないのか、あるいは何らかのエラーが発生しているのかを判断できます。
この1、2年で、注目すべき新しいLambdaとKafkaの機能が多数追加されました。先ほど述べたように、より速い初期スケールアップと、実際にはかなり遅いスケールダウンが実現されています。基本的な考え方として、Lambdaはメッセージをできるだけ早く処理するよう努めます。Provision Modeは、Lambdaがリソースを事前にプロビジョニングできるようにするもので、これについては後ほど説明します。On-Failure Destinationは2013年にリリースされ、KafkaからMSK、SQS、SNS、そして現在ではS3を含む様々な送信先に失敗したレコードを送信できます。
設定オプションには開始位置があり、タイムスタンプが新しく追加されました。EventBridgeと同じフィルタリング構文が新機能の1つとして追加され、Provision Modeのネットワーキングも簡素化されました。
パフォーマンスについて見ていきましょう。KafkaとLambdaは、スケールでの処理が重要です。このグラフは、約1年前に新しい、より高速なKafka pollerの起動を導入した際の状況を示しています。青い線はKafkaクラスター上に表示されるメッセージを表し、オレンジ色の線ではLambdaが同時実行をすばやくスケールアップしている様子が分かります。Lambdaはメッセージを処理する必要があることを認識してスケールアップし、そのパーティションを処理するための同時実行を維持します。最後に急激な低下が見られますが、これはLambdaが可能な限り効率的にメッセージの処理を継続することを意味します。
先週、新しいProvision modeを導入しました。これにより、予想される突発的なトラフィックに対応するためのリソースを事前にプロビジョニングすることができます。これは、厳格なパフォーマンス要件を持つアプリケーションで特に有用で、Lambda pollerが追加のポーラーを追加する際の1分や3分の待ち時間をなくすことができます。設定可能な項目は2つあり、1から200の間でイベントポーラーの最小数を設定するか、下流のリソースを保護したり特定の設定を維持したりするためにイベントポーラーの最大数を設定することができます。
通常の無料のEvent Source Mappingとは異なり、このProvision modeには料金が発生します。厳格なパフォーマンス要件と迅速なセットアップが必要な場合は、スループットに焦点を当てた課金メトリクスであるEvent Poller Unitを使用するProvision modeを利用できます。さらなる利点として、VPCエンドポイントのネットワークセットアップのコストを私たちが負担するため、AWS PrivateLinkやNAT gatewayを設定する必要がなく、よりシンプルなネットワーキングが実現できます。
パフォーマンスの測定について説明しましょう。Lambdaでの消費には2つの異なる側面があります:個々のLambda関数と、プロデューサーがレコードを作成してからコンシューマーに到達するまでのエンドツーエンドのプロセスです。まず、通常のアプリケーション実行のベースラインを理解することが重要です。ストリームベースの側面では、1秒あたりの平均レコード数と、1秒あたりの平均バイト数でのサイズを把握する必要があります。Lambda側では、単一レコードの処理時間と平均バッチサイズの処理時間を理解する必要があります。
このベースラインの理解により、メトリクスの変化を特定することができます。レコード数の増加、レコードサイズの変更、または異なる動作をする新しいプロデューサーの存在を判断できます。Lambda側では、関数エラー、同時実行制限によるスロットリング、実行時間の変化、呼び出し回数、リトライを監視します。これらの変化を特定するには、事前にベースラインを理解しておくことが重要です。
メトリクスとスループット管理:LambdaとKafkaの効率的な運用
メトリクスは私たちの味方です。KafkaとMSKについては、CloudWatchと自動的に連携しています。自己ホスト型のKafkaメトリクスをCloudWatchにプッシュすることで、Kafkaがどこで動いていても、コンシューマーのラグやオフセットを一箇所で監視できます。オープンソースの世界では、オープンな監視のためにPrometheusもサポートされており、多くのお客様が素晴らしい成果を上げています。
Lambda関数のコンシューマーを監視するための有用なメトリクスがいくつかあります。エラーやスロットルに対してアラームを設定し、これらがゼロを超えた場合に詳しく調査することができます。処理時間のスパイクは、コンシューマーが遅くなっていることを示している可能性があり、その場合は適切な対処が可能です。
もう一つの重要なメトリクスは、オフセットメトリクスです。これは、Lambda関数がバッチレコードの処理を完了したときに出力されます。オフセットメトリクスの理解で重要なのは、これが時間の尺度ではなく、ストリーム内のレコード数を示す尺度だということです。時間的な遅れを示すと考える人もいますが、そうではありません。これは単にストリーム内のメッセージ数を示すもので、その数が増加している場合は処理が遅れていることがわかります。これは個々のメッセージ処理によって異なり、バッチの最後のレコードがいつ完了したかを示すことで、レコードが追加されてから関数が処理するまでの遅延を推定できます。
特にオフセットラグが増加している場合、スケールでのスループットをどのように管理すればよいでしょうか?繰り返しになりますが、オフセットラグは時間の尺度ではなく、レコード数の尺度です。Lambda側では、いくつかのオプションがあります。フィルタリングを使用していますか?Lambda関数で大量のレコードを受け取って破棄しているのであれば、なぜイベントソースマッピングリソースでフィルタリングを使用しないのでしょうか?それの方が安価で高速です。もう一つの方法は、関数のメモリ設定を増やすことです。Lambda関数のメモリを増やすと、10GBのRAMと6つの仮想CPUまで、比例的にCPUが追加されます。より多くのCPUとRAMにより、Lambda関数はレコードの処理を迅速に完了し、全体的なスループットを向上させることができます。
関数のコードを見直してみる価値もあるかもしれません。関数コードに最適化の余地はありませんか?使用していないライブラリをインポートしていませんか?Lambda関数のコードを高速化できる部分はありませんか?初期化時間やコールドスタートが長いLambda関数がある場合は、バッチサイズを増やすことができます。起動に時間がかかるものの、その後の処理が非常に速い場合、より大きなバッチサイズの方が効率的です。また、Lambda側の同時実行数とスロットルを管理することも重要です。Lambda関数がスロットルによって処理されていない、あるいは何らかの制限を超えていないでしょうか?その場合は、サービスクォータに行ってLambdaの制限を引き上げる必要があるかもしれません。
ストリーム処理に関して、パーティションキーをどのように使用していますか?例えば、Customer IDをパーティションキーとして使用し、90%のレコードを単一のパーティションに集中させ、他のパーティションがほとんど使用されていない状況はありませんか?これは明らかにアプリケーションのアーキテクチャ変更が必要なサインです。パーティションキーを使用してレコードをより均等に分散させる方法を検討してください。パーティション数を増やすと、自動的にLambda消費者の数も増加します。パーティションの最大数がLambda関数の最大数となるため、単にパーティションを追加するだけで、より多くの処理を可能にする簡単な解決策になるかもしれません。プロデューサー側でレコードをバッファリングすることを提案する人もいます。一般的ではありませんが、これも効率を上げる一つの方法です。
AWS Lambda用のPower Toolsは、AWSの優秀なエンジニアやコミュニティのメンバーによってオープンソースで開発されたツールです。Python、TypeScript、Java、.NETに対応したユーティリティがあり、ストリーミングレコードの処理や、APIの背後での処理、SQSからのメッセージ消費などを扱うことができます。AWS Power Toolsをご存じない方は、Lambda関数に追加できる非常に便利なライブラリで、管理が必要なコード量を削減できます。
今日は多くのトピックをカバーしました - データストリーミングの詳細、AWSでの様々なオプション、Kafka側での個々のレコードとパーティションの確認、EventBridge、Kafka Connect、そしてLambdaのイベントソースマッピングとパフォーマンスの管理について深く掘り下げました。リソースのスライドがあり、スライドは既にアップロードされており、他の情報へのリンクもあります。月曜日の朝一番に、KafkaとLambdaに関連するセッションがあります。約60人規模の小さな部屋でのホワイトボードを使用したトークで、質問についてよりインタラクティブなやり取りができます。また、自分のラップトップを持参して自分で構築できるビルダーセッションもあります。Lambdaについてさらに深く学びたい方は、今日の午後にServerless開発者向けのベストプラクティスについてのブレイクアウトセッションを行います。自分のペースでAWSの学習を続け、知識を深めることができ、Serverlessバッジを取得することもできます。本日はご参加ありがとうございました。私の連絡先はこちらです。質問がある方は周りにいますので、残りのre:Inventをお楽しみください。
※ こちらの記事は Amazon Bedrock を利用することで全て自動で作成しています。
※ 生成AI記事によるインターネット汚染の懸念を踏まえ、本記事ではセッション動画を情報量をほぼ変化させずに文字と画像に変換することで、できるだけオリジナルコンテンツそのものの価値を維持しつつ、多言語でのAccessibilityやGooglabilityを高められればと考えています。
Discussion