📖

re:Invent 2024: Apache FlinkとPrometheusで大規模Observabilityを実現

2024/01/01に公開

はじめに

海外の様々な講演を日本語記事に書き起こすことで、隠れた良質な情報をもっと身近なものに。そんなコンセプトで進める本企画で今回取り上げるプレゼンテーションはこちら!

📖 AWS re:Invent 2024 - Handle millions of observability events with Apache Flink & Prometheus (OPN406)

この動画では、ObservabilityとApache Flink、Prometheusの連携について詳しく解説しています。特に、IoTデバイスや接続された車両など、大規模に分散したリソースの監視におけるPrometheusの活用方法に焦点を当てています。10万台の車両からのリアルタイムデータを扱うデモを通じて、Apache FlinkによるPrometheusへの前処理の重要性を示し、5億件のイベントデータを30万件程度まで削減できることを実証しています。また、新しく登場したFlink Prometheus Connectorの機能と実装方法についても詳しく解説され、Remote Write仕様の完全実装やバッチ処理による書き込みスループットの最適化など、具体的な技術的特徴が示されています。
https://www.youtube.com/watch?v=trhsC9tcGU4
※ 動画から自動生成した記事になります。誤字脱字や誤った内容が記載される可能性がありますので、正確な情報は動画本編をご覧ください。
※ 画像をクリックすると、動画中の該当シーンに遷移します。

re:Invent 2024関連の書き起こし記事については、こちらのSpreadsheet に情報をまとめています。合わせてご確認ください!

本編

Observabilityとプレゼンテーションの導入

Thumbnail 0

Thumbnail 20

はい、ライブ配信が始まったようですね。皆さん、こんにちは。音声は聞こえていますでしょうか?それでは、ご覧の通り、今回のトークはObservability、Apache Flink、そしてPrometheusについてお話しします。 始める前に、簡単なアンケートを取らせていただきたいと思います。手を挙げていただけますでしょうか。Apache FlinkやStream処理の経験がある方はいらっしゃいますか?かなりいらっしゃいますね。Prometheusについてご存知の方は?さらに多いですね。素晴らしいです。どちらもあまり馴染みのない方々には、今日は新しい知識を得ていただければと思います。そして、すでに詳しい方々には、PrometheusとFlinkをより効果的に連携させるためのアイデアをお伝えできればと思います。

まずは簡単に自己紹介をさせていただきます。私はLorenzo Nicoraと申します。AWSのSenior Streaming Solutions Architectとして働いております。ロンドンを拠点に10年ほど活動していますが、私の訛りでお分かりの通り、イタリア出身です。そして、皆さんこんにちは。私はFrancisco Morilloと申します。同じくSenior Streaming Solutions Architectを務めています。マドリッドを拠点とし、AWSで5年間働いていますが、ベネズエラ出身です。そして、こちらに私たちの小さなマスコットがいますが、その正体はすぐにお分かりいただけると思います。

Observabilityの重要性とPrometheusの概要

Thumbnail 110

Thumbnail 120

それでは、Observabilityについてお話を始めましょう。 Observabilityが重要なのは、システムの可視性を提供し、その可視性によってリアルタイムで問題のトラブルシューティングが可能になるからです。システムで発生するあらゆる問題をリアルタイムで対処できるのです。結果として、システムのパフォーマンスが向上し、お客様により良い体験を提供することができます。そして最終的には、アプリケーションやビジネスからより良い収益を得ることができます。つまり、Observabilityそのものは運用上の課題ですが、その影響はまさにビジネス上の課題なのです。

Thumbnail 150

Thumbnail 160

Thumbnail 170

一般的に、Observabilityと言えば、ITインフラ、アプリケーション、コンピューティングリソース全般の監視を思い浮かべます。 あるいはデータやミドルウェアかもしれません。 しかし、実際に観察する必要があるのは、最も広い意味でのビジネスそのものなのです。ここでKPIの話をしているわけではありません。ビジネスを観察するということは、コンピューティングやインフラを超えて、システム全体に関わるすべての技術リソースを観察する必要があるということです。皆さんはObservabilityの3本柱である、ログ、トレース、メトリクスについてご存知かと思います。

Thumbnail 200

Thumbnail 220

今回のトークでは、特にメトリクスに焦点を当てていきます。これは、ITリソースやコンピューティング以外のものを観察する際に、最も関連性の高い柱だからです。ITリソース以外のものを観察する場合、メトリクスが唯一利用可能なObservabilityデータということもあります。 メトリクスとは何でしょうか?メトリクスとは、CPU使用率や温度など、何らかの測定値を離散的な時間で採取したサンプルの集まりです。一方で、時系列データは、時系列順に並んだ離散的なデータポイントの連続として定義できます。つまり、メトリクスと時系列データは、異なる視点から見た同じものなのです。時系列データは、時間経過に伴う値の変化を表す折れ線グラフとして表現されることが多いです。

Thumbnail 270

時系列データは通常、Time Series Databaseと呼ばれる専用のデータベースに保存されます。実は、これらのTime Series Databaseは、私たちが想像する以上に、Observabilityの領域を超えて、さまざまなユースケースで活用されています。この図では、その可能性の一部をご紹介しています。例えば、取引や金融データ - これらのほとんどが時系列データで、取引、価格などが含まれます。製造業や産業用アプリケーション、機械のモニタリング、あるいはIoTデバイスのモニタリングなどがあります。

Thumbnail 320

これらのアプリケーションには、温度やその他のメトリクスなど、さまざまな測定値が含まれます。 現在、多くの人気のあるTime Series Databaseが利用可能です。その中には、Amazonのフルマネージド型Time Series DatabaseであるAmazon Timestream、非常に人気の高いInfluxDB、そしてPrometheusなどがあります。

Apache FlinkとPrometheusの連携:課題と解決策

Thumbnail 350

今回のトークでは、特にPrometheusに焦点を当てていきます。Prometheusをご存じない方のために説明すると、 これは多次元のTime Series Databaseで、リアルタイムの可視化と正確なアラート機能のための専用のクエリ言語を提供しています。可視化を構築するための多くのコネクタを通じて豊富な統合機能を提供し、水平スケーリングのための複数のストレージオプションをサポートしています。

Thumbnail 380

Prometheusは運用メトリクスのために設計されています。リアルタイムのダッシュボード作成や、 Observabilityおよびモニタリングのユースケースを考える際、リアルタイムで正確なアラート機能を提供します。スケールしても低コストで多くの機能を提供し、高速な書き込みのために設計されており、一貫性よりも可用性とデータの鮮度を優先しています。

Thumbnail 430

私たちはPrometheusをITインフラのモニタリング用途として考えがちですが、実はさまざまなユースケースに拡張できます。 これには、IoTデバイスのモニタリング、製造業や産業用アプリケーション、フリート管理、通信インフラなどが含まれます。これらのアプリケーションには、リアルタイムの可視化、正確なアラート機能、大量のデータ処理という共通の要件があります。

Thumbnail 460

Prometheusへのデータ書き込みについて、最も一般的な方法は、Scraperを使用したPullメカニズムです。アプリケーションはエンドポイントを通じてメトリクスを公開し、Prometheusは設定された間隔でそのエンドポイントからデータを取得してデータベースに保存します。これはJMX ExporterやOpenTelemetryといった統一規格を使用して実現できます。もう一つの方法として、RemoteWriteインターフェースを使用してデータベースに直接書き込むこともできます。この方法は、ソースから直接書き込みを行い、データ送信間隔をより細かく制御したい場合に特に有効ですが、データ送信の責任がソース側に移ることになります。

Thumbnail 550

Pullメカニズムのサンプルアーキテクチャを見てみましょう。アプリケーション(通常はContainerやKubernetesクラスター)は、特定のエンドポイントを通じてメトリクスやインフラストラクチャメトリクスを公開します。Prometheusはこれらのエンドポイントとポートからデータを取得するよう設定され、バックエンドにデータを保存します。その後、独自のクエリ言語を使用して、Grafanaなどのツールでリアルタイムな可視化を行うことができます。

Thumbnail 590

RemoteWriteインターフェースの実装については、IoTデバイスやその他のソースからデータを受け取り、RemoteWriteインターフェースを使用してPrometheusに直接データをプッシュするカスタムコンポーネントが必要になります。

Thumbnail 610

Prometheusにデータを保存した後は、PromQLを活用して可視化、クエリ、アラートルールを設定します。PromQLを使用することで、リアルタイムでのフィルタリングや集計が可能になり、sum、count、rateなどの独自の関数を利用することができます。

Thumbnail 640

PromQLには重要な考慮点がいくつかあります。データの結合やエンリッチメントができないため、データベースに保存している異なるタイムシリーズをマージすることはできません。また、カスタム集計の柔軟性も限られており、Prometheusが提供する定義済みの関数セットのみを使用することができます。大量のデータを扱い、多くのタイムシリーズを生成する場合、高い粒度と高いカーディナリティによって、Prometheusへのクエリ実行時にパフォーマンスの問題が発生する可能性があります。

Thumbnail 690

先ほど説明した観測可能性イベントの処理方法と使用例について考えると、これらの考慮事項は、特にIoTの運用メトリクスのモニタリングを議論する際に重要となります。私たちは、数百万のデバイスを扱う可能性のある高カーディナリティと、デバイスが1秒ごとあるいは0.5ミリ秒ごとにデータを送信する高頻度な状況について話しています。さらに重要なのは、これらのデバイスが、私たちの可視化やビジネスロジックの構築に必要なすべてのデータを常に送信しているわけではないということです。モデルIDやデバイスの出自といった文脈情報が欠けている可能性があります。

Thumbnail 740

RemoteWriteアーキテクチャに話を戻すと、IoTデバイスのメトリクスをPrometheusにプッシュしたい場合、このカスタムコンポーネントは実際にはカスタムプリプロセッサーであるべきです。これにより、RemoteWriteインターフェースを使用してPrometheusに書き込む前に、必要な結合やカスタム集計を実行することができます。ここで Apache Flinkが私たちの救世主となります。

Thumbnail 780

Apache Flinkについて説明させていただきます。Flinkのウェブサイトによると、「Apache Flinkは、有界および無界データストリームに対するステートフル計算のためのフレームワークおよび分散処理エンジンです」。まず、Flinkは独自のアプリケーションを作成するためのフレームワークですが、同時に複数のマシンにまたがってアプリケーションを実行できる分散処理エンジンでもあります。これにより水平スケーリングが可能となり、Flinkに驚異的なスケーラビリティを与えています。特にステートフル計算用に設計されており、これは1つの入力から1つの出力が生成されるステートレス計算とは異なります。ステートフル計算は、移動平均の計算のような単純なタスクから、より複雑な操作まで、複数の入力と履歴に依存します。Flinkはこれらのタスクを確実に処理し、アプリケーションがクラッシュして再起動した場合でも状態を確実に保持します。Flinkに馴染みがない方にとって、無界および有界データストリームの処理という概念は少し変わって聞こえるかもしれませんが、基本的にFlinkは無界データセットとストリームを扱うための一般化されたAPIプログラミングインターフェースを提供しています。

このAPIは、無界データセット、ストリーム、終わりのないデータを、有界データセット、テーブル、有限データと同じように扱います。有界データセットを無界の部分集合として考えますが、Flinkは主に無界ストリーム処理のために設計されており、これが今日私たちが検討しているユースケースです。つまり、私たちは無界データセット処理、すなわちストリーム処理のためのFlinkについて話しているのです。

Thumbnail 940

Flinkの最も重要な特徴の1つは、数多くの異なるシステムからの読み取りと書き込みが可能なことです。このリストは決して網羅的なものではありません。Flinkが読み取り可能なシステムには、Apache Kafka、Amazon Kinesis、Apache Pulsarなどのストリームストレージ、RabbitMQのようなメッセージキュー、データベース、さらにはストリーミングモードでのAmazon S3上のファイルなどがあります。同様に出力先としては、データベース、ストリーミングシステム、メッセージングシステム、ファイルシステム、そしてApache Cassandra、任意のJDBCデータベース、リレーショナルデータベース、Amazon DynamoDBなど、様々なタイプのデータベースに書き込むことができます。これらは単なる例です。Flinkには、特定のシステムに接続するためのドライバーとして機能するConnectorと呼ばれるコンポーネントがあります。Kafkaコネクター、DynamoDBコネクター、Cassandraコネクターなど、それぞれの特定の技術に対して1つのコネクターが用意されています。

Thumbnail 1020

この時点で、Francisco が言及した運用メトリクスの前処理におけるApache Flinkの活用方法と、その課題解決について理解できたと思います。Flinkを使えば、デバイスから送られてくる生のイベントに文脈情報を追加する enrichment(例えば、IDだけの情報にモデル情報を追加するなど)が簡単に実現できます。デバイスが多くのセンサーデータを送信しているけれど、その全てが必要ではない場合のカーディナリティの削減も可能です。また、単純な平均値や、より複雑な集計を通じて、粒度や頻度を削減することもできます。さらに、GPSトラッカーが位置を見失ってランダムな座標を送信し始めた時のノイズの多いGPSデータをフィルタリングするといったことも実現できます。最後に、複数の生メトリクスから任意のロジックで派生メトリクスを計算することも可能です。

Thumbnail 1120

Thumbnail 1130

Flinkに詳しい方なら「Flink Prometheus metric reporterがあるじゃないか」とおっしゃるかもしれません。これは確かにFlink標準ディストリビューションに含まれているコンポーネントです。しかし、処理したメトリクスをPrometheusに送信する用途には使えません。なぜなら、これは私たちが求めているコネクタではないからです。Prometheus reporterは、他のFlink reporterと同様に、Flinkの内部メトリクスをエクスポートするように設計されています。つまり、Flinkを観察し、アプリケーションを監視するためのものであり、データを扱うためのものではありません。これは、IoTデバイスなどの外部システムから観測可能性データを扱う私たちのユースケースとは大きく異なります。さらに重要なのは、reporterはスケーリングを考慮して設計されていないということです。数秒ごとに100個程度のメトリクスを公開するアプリケーションの監視を想定しているためです。私たちの要件には別のソリューションが必要です。

Thumbnail 1210

もちろん、自分で実装することもできます。先ほど触れたPrometheusのRemote Write interfaceはオープンで、よくドキュメント化されており、標準的なHTTP上で動作します。そのため、独自の書き込みコンポーネントを実装することは可能で、実際にそうしているお客様もいらっしゃいます。ただし、完全にカスタムなものを構築し、すべてを自分で処理する必要があります。エンドポイントは特定の構造を持つProtobufペイロードを期待するため、それを構築する必要があります。Remote Writeドライバーは存在せず、仕様だけなので、低レベルのHTTPクライアントの上にすべてを一から構築する必要があります。

できるだけ効率的に動作させるため、非同期HTTPクライアントを使用する必要があります。さらに重要なのは、Remote Write interfaceに1つずつサンプルを送信していたのではスケールしないため、書き込みをバッチ処理する必要があるということです。つまり、順序を乱さないように注意しながら、多くのサンプルを含む単一のリクエストを送信できるよう、賢くバッチ処理を行う必要があります。HTTPエンドポイントが報告する様々なエラーを処理し、仕様に従って再試行を行う必要もあります。Remote Write仕様は、どのエラーを再試行できて、どのエラーは絶対に再試行してはいけないのか、また、どのように再試行すべきかについて、非常に厳密に定められています。

Thumbnail 1330

朗報として、Apache Flinkエコシステムの一部として、新しいコンポーネント、新しいコネクタが使用できるようになりました。これはApache Flinkプロジェクトへの新しい追加機能であり、独立したコンポーネントではなく、Apache Flinkへのオープンソース貢献を可能にするものです。11月中旬に安定版がリリースされ、現在は完全に利用可能でドキュメント化されています。最新のApache Flink安定版である1.18と1.20をサポートしています。実際にはFlink 2.0でも動作しますが、Flink 2.0がまだプレビュー段階なので、本格的なテストは行っていません。実際、Flink 2.0で徹底的にテストされたコネクタはまだありません。これは他のすべてのコネクタでも同じ状況です。Flink 2.0がより安定してきた段階でテストを行う予定です。

Thumbnail 1390

Flink Prometheus Connectorは、Apache FlinkとPrometheusを組み合わせて運用メトリクスの前処理を行うことを可能にします。これにより新しいユースケースが開かれ、コンピューティングやITインフラの監視を超えて、Prometheusの活用範囲が広がります。特に、大規模に分散したリソースの監視に効果を発揮します。これはIoTデバイスや接続デバイスなどが対象となりますが、この後すぐデモでご覧いただけます。また、生イベントを生成するシステムや、メトリクスの基礎となるシステムなど、さまざまな用途に活用できます。

Thumbnail 1440

デモとそのアーキテクチャの説明に入る前に、このConnectorの主な特徴を見ていきましょう。まず、このConnectorはPrometheusのRemote Write仕様を完全に実装しています。Prometheusは製品ではなく標準規格であり、このConnectorは多くのPrometheusバックエンドが実装している最新の1.0仕様に対応しています。バッチ処理とFlinkの水平スケーリング機能を活用して書き込みスループットを最適化するように設計されており、複数の並列書き込みが可能です。その際、Prometheusはサンプルの送信順序に特に厳密なため、Connectorは順序の保持にも配慮して設計されています。また、エンドポイントがエラーを返した場合の対処方法など、エラー処理もある程度設定できます。

Thumbnail 1540

さらに、このConnectorは実はApache Flink Async Sinkをベースにしています。これは数年前にAWSがオープンソースとして提供したベースConnectorで、非同期クライアントを活用できる宛先に独自のSink Connectorを構築する必要がある場合に使用できます。 独自のConnectorを構築する際は、ぜひAsync Sinkを検討することをお勧めします。

では、このConnectorが特に可観測性データを扱う場合の簡単なパイプラインでどのように機能するか見てみましょう。左側には、生イベントを送信する観測対象のデバイスがあります。取り込みについては今回の範囲外ですが、どのような方法でも構いません。最終的にこれらの生イベントはストリームストレージに送信されます。Kafkaが良い候補ですが、他にも多くの選択肢があります。そして、KafkaソースConnectorなど、ストリームストレージからデータを読み取れるソースConnectorを備えたFlinkアプリケーションがあります。その後、変換処理が行われます。ここでは1つの変換しか示していませんが、複数の変換ステージを設けることもできます。生イベントを変換し、最終的に今回お話ししているSink Connectorを使用して、RemoteWriteインターフェースを呼び出してPrometheusバックエンドに直接書き込みます。Flink内で実行されるSink Connectorが、書き込みのプロトコル、リトライ、バッチ処理などを処理し、最終的にデータをPrometheusバックエンドに送信します。

Thumbnail 1630

Thumbnail 1650

それでは、これが実際のユースケースでどのように機能するか見てみましょう。 このデモでは、コネクテッドカーを扱います。約10万台の車両を対象に、フィールドでの車両の挙動をリアルタイムで理解するためのダッシュボードを構築することが主なユースケースとなります。このデモのユースケースでは、 3種類のイベントを扱います。車両のRPMを受信しますが、今回はハイブリッド車を扱うため、2つのエンジンがあります。内燃機関エンジンのRPMと電気モーターのRPMを受信します。また、車両の警告灯の点灯数も受信します。

Thumbnail 1720

私たちのユースケースは、走行中の車両台数 - つまりいずれかのモーターのRPMがゼロ以外の車両 - と、車両の警告灯の数を把握することです。10万台の車両を扱っているため、個々の車両が走行中かどうかだけでなく、地域別や車種別に何台が走行中で何台に警告灯が点灯しているかを把握し、ダッシュボードで問題の有無を確認できるようにしたいと考えています。 エンドツーエンドのアーキテクチャについて説明すると、実際の10万台の車両を用意することができなかったため、データジェネレーターを使用します。これ自体がApache Flinkアプリケーションで、これらのイベントをシミュレートしてストリーミングストレージのAmazon Managed Service for Apache Kafkaに送信します。その後、Managed Flinkアプリケーションとしてプリプロセッサーコンポーネントを使用し、データを受信して独自の変換を実行し、車両が走行中かどうかと警告灯の数を識別する派生メトリクスを作成し、Amazon Managed Service for Prometheusを使用してPrometheusバックエンドに書き込みます。ダッシュボーディングにはGrafanaを使用します。

Flinkアプリケーション内での集約によって粒度を下げることに加えて、10万台の車両からデータを受信してPrometheusに直接書き込む標準的なRaw Flinkアプリケーションも用意します。これにより、運用メトリクスのための前処理としてFlinkを使用することの付加価値を確認することができます。

Thumbnail 1850

プリプロセッサーアプリケーションの内部を詳しく見ていきましょう。実際の動作をお見せする前に、何が見えるのかを説明し、最後にコードを共有したいと思います。これがFlinkアプリケーションの論理構造です。 ご覧の通り、データは今回の場合Kafkaから入力され、右側でPrometheusに到達します。内部では、いくつかの処理ステップがあります。

Flinkの最もネイティブなインターフェースであるData Stream APIを使用してFlinkアプリケーションを作成する場合、コードで定義するのは文字通り各オペレーターとそれらの間のリンクであり、いくつかのコードスニペットを見ていきます。エンドツーエンドで論理的かつ宣言的にデータフローを定義し、最後にexecuteを呼び出します。これがFlinkプログラミングのモデルです。このデータフローモデルはFlinkに固有のものではなく、他のフレームワークでも使用されていますが、Flinkはおそらく最も一般的な実装の一つです。

Thumbnail 1910

ステップバイステップで見ていきましょう。いくつかのコードスニペットを見ていきます。時間の関係で各関数の詳細には立ち入りませんが、完全な動作コードはオンラインで見つけることができます。Kafkaソースから始めると、ここには特別なものは何もありません。ソースを定義し、Kafkaソースコネクターの実際の定義はこのcreate Kafka sourceメソッド内に隠れています。より興味深いのは、ウォーターマークを定義していないことです。その理由は、この特定のユースケースではイベントタイムセマンティクスを使用していないためです。私たちの集約と処理は、単にシステムクロックであるプロセッシングタイムに基づいています。イベントタイムでも動作しますが、この例ではシンプルに保っています。

Thumbnail 2050

気づかれたかもしれませんが、最後のところで、オペレーターやソースにUIDを定義しています。Flinkをご存知の方にとっては一般的な推奨事項ですが、すべてのオペレーターにUIDを設定することは非常に重要です。これにより、アプリケーションを進化させる際に多くの問題を回避することができます。では、実際のアプリケーションに進みましょう。次のステップはエンリッチメントです。 論理的には、単純なマッピングです - 1つのメッセージを受け取り、エンリッチして、1つのメッセージを出力します。私たちのアプリケーションでは、生のイベントをVehicle Eventsと呼び、出力をEnriched Vehicle Eventsと呼んでいます。

実際のユースケースでは、おそらく参照データセットへの検索が必要になるでしょう。生のイベントにはVIN(車両識別番号)などの車両IDのみが含まれており、モデル情報は含まれていません。そのため、このIDに対応するモデルを示すテーブルが必要になります。Flinkには、これを実現するための多くのパターンがあります。イベントを受信するたびに外部データベースを単純に検索する必要はありません。キャッシュを使用したり、データセットを事前にストリーミングしたり、Flinkのステートに保持したりなど、様々なパターンがあります。実際、AWSのブログでは、エンリッチメントの異なるパターンについて少なくとも3つの記事があります。

ユースケースに応じて、それらを参照することをお勧めします。今回のケースでは、シンプルに保っています。内部で単純なロジックを使用し、基本的なテーブルを検索して車両モデルを追加しています。Enriched Vehicle Eventには、メトリクス、車両ID、時刻だけでなく、そのイベントを発信した車両のモデルも含まれることになります。

Thumbnail 2140

ロジックには2つのブランチがあります。これは2つの異なるメトリクスを計算する必要があるためです。下のブランチは警告イベントの計算用で、ユニークな警告の数を計算します。シンプルに保っていますが、任意に複雑にすることができます。まず、警告に関係のないイベントをフィルタリングします。下の3行は、この集計を行っています。key byは車両モデルと地域でパーティショニングしています。これは、個々の車両ではなく、車両モデルと地域ごとにユニークな警告の数を計算したいからです。処理時間での固定期間(5秒に設定)のTumbling Windowを使用しています。このパラメータは、ユースケースに応じて設定できます。

Thumbnail 2230

一番下には実際の計算があります。KeyWarningProcessWindowFunctionクラスは、単にJavaで集計を行うだけで、特に興味深い点はありません。そして、ここでもUIDを設定しています。アプリケーションでUIDを忘れないようにしましょう。 上のブランチでも、非常に似たことを行っています。内燃機関のRPMと電気モーターのRPMという2つの異なるタイプのイベントを考慮する必要があります。これは、両方から派生メトリクスを計算するためです。ロジックはシンプルですが、より複雑にすることもでき、さらに多くのメトリクスを考慮することもできます。残りの部分は非常に似ており、モデルと地域でkey byを行います。これは、モデルと地域ごとにこの派生メトリクスを計算したいからです。これらが派生メトリクスのディメンションとなり、同様にTumbling Windowを使用して集計を行います。

Thumbnail 2270

アプリケーションの最後で、実際のオブジェクトが同じであるため、2つのフローをマージすることにしました。2つの処理ストリームに対して異なるSinkを用意する方法もありますが、私たちのユースケースではこちらの方がシンプルだと考えたためです。Unionを使用して、文字通り2つのストリームをマージします。その後、集計されたイベントオブジェクトからPrometheus Sinkに必要な入力オブジェクトへのMap変換を行います。これは単純な1つのPOJOから別のPOJOへのマッピングです。Key byによって、同じTime Seriesからのデータが同じ並列ライターによって書き込まれることを保証します。

コネクテッドカーのリアルタイムモニタリングデモ

Thumbnail 2390

Prometheus Time Seriesのラベル付きマトリックス名のKey Selectorはコネクタによってすでに提供されているので、この1行を追加するだけで済みます。これにより並列書き込みの順序が保持され、同じTime Seriesのサンプルが異なるライターに送られて不適切な順序での書き込みが発生することを防ぎます。最後に、createPrometheusSinkメソッドを使用してSinkオブジェクトを作成し、接続します。 このメソッドについて、もう少し詳しく見ていきましょう。Sinkを作成するメソッドは、たとえ馴染みがなくても非常に読みやすいものになっています。各メソッドの詳細はドキュメントで確認できます。

ランタイムパラメータからパラメータを取得しますが、必須なのはエンドポイントのURLだけです。これは接続先として必要不可欠だからです。Amazon Managed Prometheusを使用しているため、リージョンも指定する必要があります。また、リクエストの最大リトライ回数も調整することにしましたが、これはオプションです。その後、フルーエントビルダーインターフェースを使用してオブジェクトを構築し、エンドポイントURLを設定します。

Amazon Managed Prometheusを使用しているため、認証を追加する必要があります。これはManaged Prometheusで必須であり、すぐに使える形で提供されています。Prometheus RemoteWriteインターフェースは認証を規定していません。これはインターフェースの範囲外だからです。このコネクタでは、独自の認証を実装できる汎用的なインターフェースを提供しました。通常、リクエストにトークン、署名、またはOAuthトークンを追加することになります。初期バージョンでは、Amazon Managed Prometheusの書き込みリクエスト署名機能をすぐに使えるオプションの依存関係として提供しています。これを追加するだけで署名が行われ、Managed Flinkを実行している場合、アプリケーションのIAMロールから自動的にすべての認証情報が取得されます。認証情報を提供する必要はありません。アプリケーションのIAMロールから自動的に継承されるためです。その後、他のオプションパラメータを設定してビルドします。

Thumbnail 2530

Thumbnail 2570

では、AWSコンソールで実際のデモをライブで見ていきましょう。 Managed Service for Apache Flinkのコンソールには、3つのFlinkアプリケーションがあります。10万台の車両を生成するVehicle Flinkアプリケーション、Lorenzoが説明したロジックをすべて含むプリプロセッサー、そしてKafkaから直接データサンプルを受け取り、集計を行わずにPrometheusに直接書き込むRaw Event Writerです。データジェネレーターのFlinkダッシュボードを見てみると、 セッション開始直前に起動したところで、これは非常に標準的なアプリケーションです。重要なのは、すでに約5億のデータポイントを10万台の車両から送信していることです。各車両が1秒に1つのサンプルを送信しているためです。

Thumbnail 2590

Thumbnail 2600

Thumbnail 2630

Raw Event Writerに移ると、 Lorenzoが説明したマップ変換を実行しているだけです。これによって、メトリクスを取得してConnector用に特定のフォーマットに変換しています。受信した約5億件のイベントは、そのままPrometheusに書き込まれています。Pre-processor applicationを見てみると、 実際の図はLorenzoが説明したものとほぼ同じです。Flink内では、効率性を高めるために複数のOperatorが連鎖的につながっています。ここで注目したいのは、Sourceが受信したレコード数が5億件であるのに対し、Region別やModel ID別の集計、エンリッチメント、集計を行うことで、Connectorに送信されるメトリクスは約30万件まで減少していることです。これによってPrometheusのパフォーマンスが向上します。なぜなら、クエリ実行時にスキャンするサンプル数が少なくなり、また保存するデータ量も少なくなるためコスト効率も良くなるからです。

Thumbnail 2700

Amazon Managed Grafanaに移りましょう。ここには2つのData Sourceを設定しています。1つは前処理されたデータを受信するPrometheus です。

Thumbnail 2720

もう1つは、すでに受信済みで今も受信し続けている5億件のサンプルをそのまま含むRaw Prometheusです。 ここでは、生データをそのまま使用し、PromQLだけを使って派生メトリクスや集計を構築しようとした場合に何が起こるかをデモンストレーションしたいと思います。ここでは、生のメトリクス、Electric RPM、Internal Combustion RPM、そして警告の数を確認できます。

Thumbnail 2770

このクエリをフィルタリングなしで実行すると、おそらく永遠に終わらないでしょう。そこで、Region別の簡単なフィルタリングを行い、Spainを選択してみましょう。Spainにある車両やモデルだけを表示するようにフィルタリングしてこのクエリを実行すると、 20のシリーズだけをフィルタリングしても、これは全く役に立たないことがわかります。車両で何が起きているのか、ロジックを構築したり理解したりすることが実際にはできません。クエリの実行に時間がかかることがお分かりいただけたと思いますが、ここではRPMの生データだけが表示されています。

Thumbnail 2800

Thumbnail 2810

Thumbnail 2850

車両が走行中かどうかを識別したい場合、 カウントで集計を行うことができます。これを削除して、Region別にカウントを行い、 RPMがゼロと異なるかどうかをチェックするバイナリ関数を作成してみましょう。このクエリを実行すると、過去1時間分のPrometheusのすべてのサンプルを処理する必要があります。繰り返しになりますが、送信した約5億件のレコードを処理することになります。 はい、このように表示されました。これは車両のリアルタイムダッシュボード用には適していないことがわかりますが、メトリクスをPrometheusに取り込むだけで、これらの派生メトリクスや集計を構築することは可能です。

Thumbnail 2870

Thumbnail 2880

Thumbnail 2910

前処理済みのPrometheusあるいは最適化されたPrometheusに戻ってみると、 Flinkアプリケーションにロジックを組み込んだことで、すでに派生メトリクスとなっている2つのメトリクスだけが残っていることがわかります。走行中の車両を見てみると、以前見た10万行ものデータではなく、モデルと地域だけが表示されています。同じように地域でフィルタリングして同じ国を選択すると、 非常に素早くレスポンスが返ってきます。車両IDではなく、特定のモデルごとに走行中の車両数を確認することができ、Flinkアプリケーションですでに集計を行っているため、カウントのためのカスタム集計も必要ありません。

Thumbnail 2940

Thumbnail 2970

車両フリートのモニタリングがどのように見えるか確認してみましょう。 リアルタイムで走行中の車両数を確認することができます。この事例は車両に関するものですが、Flinkで利用可能なデータであれば、どのようなデータでも独自の派生メトリクスを構築・作成できます。任意の時点での警告数も確認できます。モデルと地域による集計を行っているため、 米国での走行中の車両数や、特定のモデルの走行中の車両数を確認することができます。

Thumbnail 2980

Thumbnail 2990

明らかにNeburalaというモデルが非常に人気があることがわかります。 走行中の車両が最も多いモデルが、警告数も最も多いというのは理にかなっています。同様に、警告数が最も多い国が米国というのも納得できます。これは、Flinkがデータをプッシュする機能を実現した良い例といえます。

このテストは10万台の車両で実施していますが、100万台以上でもテストを行っており、ユースケースに応じてPrometheusをタイムシリーズデータベースとして活用することができます。

まとめと今後の展望

Thumbnail 3030

Prometheusは、リアルタイムのダッシュボード作成とリアルタイムアラートに最適化されたタイムシリーズデータベースであることを確認しました。Kubernetesエコシステムで広く使用されており、一般的なITインフラの監視によく使われていますが、他の多くのユースケースにも適用できます。これらの新しいユースケースには課題もありますが、Apache Flinkを使用することで生のメトリクスを前処理することができ、Apache Flinkはこのようなリアルタイム前処理の作業に最適です。そして、今回紹介したFlink Prometheus Connectorにより、このようなユースケースでFlinkとPrometheusを組み合わせて使用する可能性が広がりました。

大規模なデータを観察する必要がある場合、つまりリアルタイムのダッシュボード作成やリアルタイムのアラート通知を行う必要があり、特にIoTデバイスや車両、その他あらゆる広く分散したリソースを大規模に観察する必要がある場合、PrometheusとApache Flinkをコネクターで連携させることで実現できます。このようなユースケースでは、Prometheusは非常にコスト効率の良いソリューションとなります。

Thumbnail 3130

こちらがリソースの紹介です。左側はFlink Prometheusコネクターのドキュメントへのリンクです。これはApache Flinkのドキュメントの一部で、Apache Flinkのドキュメントの最新バージョンで確認できます。中央には、先ほどお見せしたデモのコードがあります。これには、データジェネレーター、プリプロセッサー、そしてRawライターという3つのFlinkアプリケーションが含まれています。このPrometheusコネクターを使用すれば、大規模なデータをPrometheusに書き込むことができます。もちろん、効率は少し落ちますが、実行は可能です。右側には、私たちが使用した2つのAmazonサービスへのリンクがあります。具体的には、Flinkアプリケーションを実行するためのAmazon Managed Service for Apache Flinkと、PrometheusバックエンドとしてのAmazon Managed Service for Prometheusです。

最後に、モバイルアプリでセッションのアンケートにご回答いただけますと幸いです。このようなセッションや、この詳細レベルが皆様にとって有意義だったかを把握する方法となります。5点を付けていただくことで、re:Inventで皆様が期待されるような同様のコンテンツを今後も提供することができます。re:Inventの初日の早朝からご参加いただき、ありがとうございます。Apache FlinkとPrometheusの様々なユースケースについて理解を深めていただけたことを願っています。もしこれらの2つのテクノロジーに馴染みがなかった方も、興味深く感じていただけたのではないでしょうか。改めて、ご参加いただき、ありがとうございました。


※ こちらの記事は Amazon Bedrock を利用することで全て自動で作成しています。
※ 生成AI記事によるインターネット汚染の懸念を踏まえ、本記事ではセッション動画を情報量をほぼ変化させずに文字と画像に変換することで、できるだけオリジナルコンテンツそのものの価値を維持しつつ、多言語でのAccessibilityやGooglabilityを高められればと考えています。

Discussion