Open24

Elastic search / MSK / LogStash関連メモ

ktkt

Elasticsearch、OpenSearch、Kibanaの概要

Elasticsearch

オープンソースの検索エンジン
JSON形式でのデータの格納、検索、分析機能を提供し、リアルタイムの全文検索と分析が可能

OpenSearch

ElasticsearchとKibanaのオープンソースフォーク
AWSが開発したESのマネージドサービスで、
Elasticsearch 7.10.2およびKibana 7.10.2をベースにしている

Kibana

Elasticsearchのデータを視覚化し、分析するためのオープンソースのインターフェース

ダッシュボードの作成、データの視覚化、ログデータの探索などの機能がある

ktkt

シャードについて

インデックスのデータを分割し、分散させる機能

主な役割

  • データの分割
    • インデックス内のデータを複数の部分に分割する
      • 大量のデータをより小さく管理しやすい部分に分ける
    • 分割されたシャードはESクラスタ内の複数ノードに分割して格納される
      • データの読み書き操作が並列で実施できる
      • 処理能力が向上する

種類

  • プライマリシャード
    • インデックスの実際のデータを保持する
    • 作成時に設定するが後から変更はできない
  • レプリカシャード
    • プライマリシャードのコピー
    • データの可用性を高めて読み込み操作の負荷を分散する
    • index作成後も変更可能

メリット

  • スケーラビリティ
    • データを複数のマシンに分散させることができる
  • パフォーマンス向上
  • 高可用性

考慮

  • サイズ (データ総量 GB)
    • 大きすぎる:処理が遅くなる
    • 小さすぎる
  • シャード数 (インデックスに割り当てられるシャードの総数)
  • リソース配分
    • クラスタ内のリソースを消費する
ktkt

ElasticsearchのインデックスとRDSのテーブルの比較

■インデックス:テーブル

Elasticsearchのインデックス: ドキュメントの集合体
インデックスは一般に類似の特性を持つデータをグルーピングするために使用される

RDSのテーブル: レコード(行)の集合体
テーブルは関連するデータを整理し構造化された形式で格納する
各テーブルは特定のデータ型の列(フィールド)を持っている

■ドキュメントとレコード

Elasticsearchのドキュメント: インデックス内の個々のデータ項目
ドキュメントはJSON形式で格納され、非常に柔軟なデータ構造を持つ
各ドキュメントは異なるフィールドやデータ型を持つことができる

RDSのレコード(行): テーブル内の個々のデータ項目
レコードはテーブルの列に定義されたデータ型に従ってデータを格納する
テーブルのすべてのレコードは同じ構造を持つ

■フィールドとカラム

Elasticsearchのフィールド:ドキュメント内の個々のデータポイント
フィールドはキーと値のペアで表現されて異なるデータ型を持つことができる

RDSのカラム(列): テーブル内のデータの特定の種類を定義する
各カラムは特定のデータ型を持ち、テーブル内のすべてのレコードは
そのカラムのデータ型に従ってデータを格納する

ktkt

mapping

ドキュメントがどのように保存されてインデックスが作成されるか定義する

  • Meta-fields

    • ドキュメントに関連づけられたメタデータをどのように扱うか設定する
  • Fields or properties

    • ドキュメントに関するフィールド、プロパティリスト

フィールドについて

text

全文検索のためにESが文字列を解析する

keyword

文字列全体を1つの値として扱う

メタフィールド

_で始まる規則がある

引用記事:
https://baubaubau.hatenablog.com/entry/2020/07/02/203000#:~:text=ignore_above という項目は keyword,検索はこうなります。

ktkt

kibanaのAPIコマンド

PUT _index_template/テンプレート名

インデックステンプレートの作成

PUT _index_template/テンプレート名
{
  "index_patterns": ["パターン1*", "パターン2*"],
  "template": {
    "settings": {
      "number_of_shards": 1
    },
    "mappings": {
      "properties": {
        "field1": { "type": "text" },
        "field2": { "type": "keyword" }
      }
    }
  },
  "priority": 1,
  "version": 1
}

テンプレート名: 作成するテンプレートの名前
index_patterns: このテンプレートが適用されるインデックスパターン
settings: インデックスの設定
mappings: フィールドのマッピング定義

PUT index名/_doc

特定のインデックスに新しいドキュメントを追加するために使用
/_doc: ElasticsearchのAPIパスの一部で、ドキュメントを操作するためのエンドポイントを指す

ドキュメントID: 追加するドキュメントの一意の識別子
このIDを指定することで、特定のドキュメントを明示的に参照または更新することができる
IDを指定しない場合(単に PUT /index名/_doc とする場合)、Elasticsearchは自動的に
一意のIDを生成する

PUT /index名/_doc/ドキュメントID
{
  "field1": "value1",
  "field2": "value2"
}

指定されたインデックスからドキュメントを検索するために使用する

GET /index名/_search
{
  "query": {
    "match": {
      "field": "検索語句"
    }
  }
}

GET _cat/aliases?h=alias

_cat/aliases: _cat APIはElasticsearchのクラスターの状態を読みやすい形式で表示するためのもの
aliases すべてのインデックスエイリアスに関する情報を提供
?h=alias: クエリパラメータです。h は「header」の略alias と指定することで、エイリアス名のみが結果に表示される

ktkt

MSK概要

Apache Kafkaの管理型サービス

Apache Kafka

リアルタイムデータフィードを扱うためのオープンソースのストリーミングプラットフォーム
MSK:AWSのマネージドサービス

特徴

  • フルマネージド
    • クラスタのセットアップ
  • スケーラビリティ
  • セキュリティ
  • 高可用性・耐久性
  • 互換性

ユースケース

  • リアルタイムデータ処理
  • イベント駆動アーキテクチャ
  • ログ集約
  • ストリーム処理
  • メッセージブローカー

Kafka は pub-sub システム

  • メッセージを書き込む人(プロデューサー)はトピックの最後に追記
  • メッセージを読み込む人(コンシューマー)はトピックをどこまで読んだかを記憶しておき(オフセット)、読み込んでいないデータがきたら読み込む
  • 複数のコンシューマーが並列で Kafka に接続してパラレルで処理することが可能
  • Kafka にはデータが永続化されるため、後から参加したコンシューマーが他のコンシューマーがすでに処理済みのデータを再度処理することができる

https://dev.classmethod.jp/articles/report-on-ant398-introducing-amazon-managed-streaming-for-kafka/

基本概念

トピックとパーティション

トピック:
イベントを整理するための基本ユニット
データの保存先または公開先であるユーザー定義のカテゴリまたはフィードの名前

イベントログ
トピックはパーティション化される

別々の Kafkaブローカー上に存在できる複数のログファイルに分割される

クライアントアプリケーション

  • 多数のブローカーに対して同時に公開/サブスクライブできる
  • 複数のブローカーにパーティションを複製することでデータの高可用性を実現できる

公開/サブスクライブ/メッセージングパターン:
送信側と受信側の間でメッセージを送信する方法

publisher(送信者)→topic→subscriber→受信者

各トピックは複数のサブスクライバーを持つことができ、
すべてのサブスクライバーはトピックに公開されるすべてのメッセージを受け取る

イベントストリーミング:
イベントごとにシステム状態を記録する

kafka分散システム:
複数のデータセンター、クライアントにまたがる1つ以上のサーバ(kafkaブローカー)のクラスターで構成。kafkaブローカーと通信してイベントストリームの読み取り、書き込み、処理を行う

プロデューサー・コンシューマ:

プロデューサ:
データをトピックに配置するアプリケーション

コンシューマ:
トピックからデータを読み取るアプリケーション

Kafka クラスターと Kafka ブローカー:

Kafka ブローカー:
プロデューサーとコンシューマー間のトランザクションを調整

ブローカー:
イベントの書き込みおよび読み取りに関するクライアント要求をすべて処理

パーティションは別のKafka ブローカー上に配置可能

Kafka をデータ損失から保護するための重要な方法
複数ブローカーにおけるデータのコピー数を指定する
トピックレプリケーション係数を設定することで実行できる

レプリケーション係数 が 3 の場合、
別のブローカーのパーティションごとにトピックのコピーが 3 つ保持される

Kafka Connect

データ統合フレームワーク
メッセージキューやリレーショナルデータベースなどの外部ソースから Kafka クラスターにデータを取得するために使用する
クラスターからデータを (別の場所で保存または同期するために) 取得する際にも使用できる

コネクターを利用する

  • ソースコネクタ:データストアからデータを取得する
  • シンクコネクタ:kafkaトピックからデータストアにデータを提供する

Kafka Streams:
Kafka トピック内でデータを処理および変換できるストリーミングアプリケーションをビルドする Java API

リアルタイムでトピックからデータを読み取り、そのデータを処理 (フィルタリング、グループ化、集計など) し、処理後のデータを別のトピックやレコードシステムに書き込む

引用資料:
https://kenta-kosugi.medium.com/apache-kafka-が生まれた理由-2a6f022b2935
https://www.redhat.com/ja/blog/apache-kafka-10-essential-terms-and-concepts-explained#:~:text=Kafka に�%[…]�です。

ktkt

Logstash概要

異なるデータソースから情報を収集し、それを変換してElasticsearchに送信するデータ処理パイプラインツール。ログファイル、データベース、クラウドサービスなどからデータを抽出し、Elasticsearchで統合できるようにする。データの形式を変換する。

MSKを前段に置いて構成する理由:

  • データのフィルタリング、変換、拡張、リッチな形式へのフォーマット変更
  • 様々なソースからのデータを一箇所で収集して処理する→データの統合プロセスを単純化
  • データの前処理
  • データパイプラインの耐障害性を高める
  • データパイプラインのスケーラビリティを向上
  • リアルタイムのデータ処理に置いて有用

リアルタイムのデータ処理に置いて活用

  • システム、アプリケーション、センサーなどからのデータはリアルタイムでKafkaトピックにプッシュされる
  • Kafkaトピックにストリームされたデータは、リアルタイム処理エンジン(例:Apache Flink、Apache Storm、Kafka Streams)によって処理される
  • 処理されたデータは、ダッシュボード、データストア、アラートシステムなどのエンドポイントに送信される

メリット:

  • データの収集と処理がリアルタイムで行われ、遅延が最小限に抑えられる
  • 大量のデータストリームを効率的に処理し、スケールアップが容易
  • 耐障害性とデータの耐久性が高く、システムのダウンタイムに強い

MSKを入れない場合:

  • データはAPI、バッチ処理などを介して収集される
  • DB、lambdaなどのAPで処理される
  • 処理後のデータはデータストアやエンドユーザーに配信される

<デメリット>
リアルタイム処理の遅延
高スループットのデータストリームに対応するのが困難になる恐れ
スケーラビリティや耐障害性に制限が生じる

ktkt

Open Search ヒープについて

ヒープはJava Virtual Machine (JVM)がJavaオブジェクトのメモリ管理を行うために使用するメモリ領域

ヒープサイズ:JVMオプションで設定

jvm.options ファイルを通じて Xms(初期ヒープサイズ)と Xmx(最大ヒープサイズ)を設定

サイズの決定: ヒープサイズは物理メモリの半分以下に設定するのが一般的

ガベージコレクション:

重要性: JVMのガベージコレクタ(GC)は、使用されなくなったオブジェクトをヒープから削除してメモリを解放

GCのパフォーマンスはヒープサイズに大きく依存

チューニング:

頻繁なGCや長いGC停止時間は、システムのレイテンシーに影響を与える
GCログを監視して適切にヒープサイズを調整することが重要

メモリプレッシャー:

監視: オーバーヘッドを抑え、システムの安定性を確保
対策: ヒーププレッシャーが高い場合、データのシャーディング、ノードの追加、クエリ最適化などを検討する

ノードごとの設定:
一貫性: クラスター内の全てのノードで一貫したヒープサイズ設定を行うことが推奨

ヒープ以外のメモリ:

オフヒープメモリ: OpenSearchはフィルターキャッシュなどのためにヒープ外メモリも使用
ヒープサイズを設定する際は、全体のメモリ使用量を考慮することが重要

最適化:

ヒープの設定は、ワークロードやデータ量に応じて適切に行うことが重要
大きすぎるヒープはGCのオーバーヘッドを増加
小さすぎるヒープはメモリ不足のエラーを引き起こす

ヒープ管理はOpenSearchのパフォーマンスと安定性に直結するため、
継続的な監視と適切な調整が必要

具体的な設定は各クラスターの状況に応じて異なるため、実際の環境でのテストと評価が不可欠

ヒープの関連性

ヒープの量はインデックス数、インデックスごとのフィールド数、およびシャード数に直接比例する

マスターノードのヒープ使用率をモニタリングし、そのサイズが適切であることを確認

大きなセグメントは小さなセグメントと比較して、データ量ごとのオーバーヘッドが少ない
ノードごとにできるだけ多く保存できるようにするためには、ヒープ使用率を管理し、できる限りオーバーヘッドを減らす

https://www.elastic.co/jp/blog/how-many-shards-should-i-have-in-my-elasticsearch-cluster

ktkt

シャード数の算出方法

大量のデータをシャードと呼ばれる細かいユニットに分割し、それらのシャードを複数のインスタンスに分散して保持する

"インデックスのサイズからシャード数を算出するにあたって、それぞれのシャードのサイズの目安を30GBにする"

シャードは ストレージ(storage) の単位であり、また 処理(computation) の単位
シャードを独立した形でクラスタ内のインスタンスにデプロイし、インデックスの処理をそれぞれで並列に行う

プライマリシャードとレプリカシャード:

プライマリシャードは全ての書き込みリクエストを受け付ける
新しく追加されたドキュメントをレプリカにパス

デフォルトでは書き込みがレプリカに確認(acknowledge)されるのを待ってから呼び出し元に書き込み成功のレスポンスを行う
プライマリとレプリカシャードはデータの保存に冗長性をもたらし、データのロスを起こりにくくする

ktkt

プライマリとレプリカのシャードを同じインスタンスに配置しない

シングルインデックスとローリングインデックス

シングルインデックス
全てのコンテンツを保持する”source of truth”な外部のリポジトリを使い、データは一つのインデックスに保持される

ローリングインデックス
データを継続的に受け取り、データはタイムスタンプによって(通常は1日24時間)異なるインデックスに保持される

シャーディングの計算のスタート地点

インデックスに必要なストレージサイズ

それぞれのシャードをストレージの単位として扱うと、いくつのシャードが必要になるかのベースラインを見出すことが出来る
トータルのストレージ容量を30GBで割って最初に必要なシャード数を算出

インデックスのサイズが30GB以下であるのであれば、一つのシャードのみを使うべき

https://aws.amazon.com/jp/blogs/news/get-started-with-amazon-elasticsearch-service-how-many-shards-do-i-need/#:~:text=Elasticsearchではインデックス作成の,なければなりません。

ktkt

Amazon OpenSearch Service クラスターのステータス

クラスターのステータスが黄色の場合:

すべてのインデックスのプライマリシャードがクラスター内のノードに割り当てられる
ただし、1 つ以上のレプリカシャードはどのノードにも割り当てられない

黄色のクラスターのステータスが自動的に解決しない場合

インデックス設定を更新するか、未割り当てのシャードを手動で再ルーティングすることでステータスを解決できる

引用資料:
https://repost.aws/ja/knowledge-center/opensearch-red-yellow-status

ktkt

JVMメモリプレッシャーの意味

JVMヒープメモリにかかる負荷

OpenSearchのクラスターにおける「データノード: JVMメモリプレッシャー」という指標

ノード上のJava Virtual Machine (JVM) のメモリ使用状況

ヒープ使用率: JVMヒープメモリの使用率

ヒープはJavaアプリケーションが動的にオブジェクトを確保するためのメモリ領域

メモリ制限の近接度: メモリプレッシャーが高い

利用可能なヒープメモリに対する現在の使用量が増加しており、制限値に近づいている

メモリプレッシャーが高い場合の影響:

ガベージコレクション:

メモリプレッシャーが高い場合、ガベージコレクタはより頻繁に実行される可能性があり

OutOfMemoryError:

メモリプレッシャーへの対応

ヒープサイズの調整:

ヒープサイズが不十分な場合は、JVMのヒープサイズを増やすことを検討する

ガベージコレクションの最適化:

ガベージコレクションの設定をチューニングすることで、メモリプレッシャーを減らす

ノードの追加:

クラスターにノードを追加して、全体の負荷を分散させる

データの分散とシャーディング:

データの分散やシャーディングを最適化することで、各ノードのメモリ負荷を軽減

公式ドキュメント:
https://repost.aws/ja/knowledge-center/opensearch-high-jvm-memory-pressure

ktkt

OpenSearchの料金について (メモ)

以下表に記載がある
https://aws.amazon.com/jp/opensearch-service/pricing/

参照箇所:

オンデマンドインスタンスの料金・コンピューティング最適化 – 現行世代

Amazon EBS ボリュームの料金 (EBS ボリュームを選択した場合に適用)

ktkt

OpenSearchのアラーム整理

FreeStorageSpace

クラスター内のデータノードの空き領域。Sum はクラスターの合計空き容量を示しますが、正確な値を得るには期間を 1 分にする必要があります。Minimum と Maximum は、それぞれ空き領域が最も少ないノードと最も多いノードを示します。このメトリクスは、個別のノードでも利用できます。OpenSearch Service は、このメトリクスが 0 に達したときに ClusterBlockException をスローします。復旧するには、インデックスを削除する、より大きなインスタンスを追加する、既存のインスタンスに EBS ベースのストレージを追加する、のいずれかを実行する必要があります

ClusterIndexWritesBlocked

クラスターで、着信する書き込みリクエストを受け入れるか、ブロックするかを指定します。値 0 では、クラスターでリクエストを受け入れます。値 1 ではリクエストをブロックします。代表的なものとしては、FreeStorageSpace が少なすぎる、JVMMemoryPressure が高すぎるなどがあります。この問題を軽減するには、ディスク容量の追加やクラスターのスケーリングを検討します。

クラスターメトリクス:
https://docs.aws.amazon.com/ja_jp/opensearch-service/latest/developerguide/managedomains-cloudwatchmetrics.html

CloudWatch アラーム
https://docs.aws.amazon.com/ja_jp/opensearch-service/latest/developerguide/cloudwatch-alarms.html

コンソールの警告
https://docs.aws.amazon.com/opensearch-service/latest/developerguide/monitoring-events.html

ktkt

ES概要整理

全文検索エンジン

複数の文書から特定の文字列を検索する

grep型:

複数のテキストファイルの内容を順次走査して検索文字を探す

インデックス型:

あらかじめ検索対象となる文書群を走査して転置インデックスデータを準備 -> アクセス
高速検索が可能

転置インデックス:

文字列が検索対象となるドキュメント群のどの位置に存在するかを示す牽引構造

※RDBの場合はテーブルを作成してからINSERTする

Elasticsearchは事前にインデックスが作成されていなければ、
ドキュメント作成時に自動的にインデックスと項目定義を作成してからドキュメントを作成

検索について:

Elasticsearchでは字句解析が必要な場面でAnalyzerと呼ばれる機能を利用する

Dynamic Mapping:

インデックス定義をあらかじめ設定していなくてもドキュメントの作成時にElasticsearchが自動的にインデックスや項目定義を作成してくれる機能

Explicit Mapping:

事前にマッピングを定義してインデックスを作成

型の種類

Object:

JSONのオブジェクトのように階層構造を持たせることができる

Nested:

Objectを配列のように扱う型

引用記事:
https://qiita.com/Schott_man/items/4707973f0d6fe1d0a61f
https://qiita.com/Schott_man/items/c4da6551daff41e6ab2d

ktkt

OpenSearchのスペック・ベストプラクティス整理

シャード数方針

シャードがデータノード数の偶数倍になるようにする

データノード = 4
シャード = 4,8,12

データノード = 6
シャード = 3,6,9,12

シャードをデータノード全体に均等に分散されるよう設定するべき
ここでいうシャードとは、プライマリシャードを意味する

レプリカシャードはプライマリシャードに対してのコピー数が設定できる

レプリカシャード数を設定した分、プライマリシャード数と掛け合わせたシャードが作られる

プライマリ:4 レプリカ:1 = 合計4つ
プライマリ:4 レプリカ:4 = 16

プライマリシャードとレプリカシャードは同じノードに設置できない為、
データノード数の数が合わないと配置できなくなる

シャード数よりもシャードサイズの方が重要
5 GiB のデータがある場合、1 個のシャードを使用するべき

サイズ

各シャードが 10~30 GiB (検索ワークロードの場合)
または 30~50 GiB (ログワークロードの場合) になるようにシャード数を設定する
(50GiBが最大)

メモリと推奨シャード数の関連性

ノードが保持できるシャードの総数

ノードの Java 仮想マシン (JVM) ヒープメモリに比例

JVMヒープメモリ = マシンスペックで割り当てられる物理メモリの約半分ほど
※kibanaのコマンドから確認できる

以下原則

  • ヒープメモリの GiB あたりのシャード数が 25 個以下になるようにする
  • 1ノードあたり1,000個以下

32 GiB のヒープメモリを持つノードのシャード数 = 800個以下
32 * 25 = 800

c6g.xlarge.search
メモリ (GiB):16GiBの場合

8GiBほどメモリが割り当てられる

8 * 25 = 200 200個以下のシャード数にするのが原則となる

vCPUとの関連性

1シャードあたり 1.5 vCPU

c6g.xlarge.search
vCPU:4の場合

1.5 * 2 = 3

シャード数 3個までが推奨値

c6g.2xlarge.search
vCPU:8の場合

1.5 * 4 = 6

シャード数 6個までが推奨値

シャードがインデックス作成または検索リクエストに関わっている場合、
vCPU を使用してリクエストを処理する

運用上のベストプラクティス (シャード戦略)

シャードはOpenSearch Serviceドメイン内のデータノード全体でワークロードを分散する

<流れ>

OpenSearch Service にデータを送信

インデックスに送信

インデックスを作成する際に作成するプライマリシャードの数をOpenSearchに指示

クラスター内のデータノード全体で各インデックスのシャードをマッピング

インデックスのプライマリシャードとレプリカシャードが別々のデータノードに確実に存在するようになる

最初のレプリカはインデックスにデータのコピーが確実に 2 個あるようにする
常に少なくとも 1 個のレプリカを使用する
追加のレプリカは、さらなる冗長性と読み取りキャパシティーを提供

インデックス作成のリクエストをインデックスに属するシャードを含むすべてのデータノードに送信する

インデックスの作成リクエストの流れ

プライマリシャードを含むデータノードに送信

レプリカシャードを含むデータノードに送信

検索リクエスト
コーディネーターノードによって、
インデックスに属するすべてのシャードのプライマリシャードまたはレプリカシャードにルーティング

例:

5 個のプライマリシャードと 1 個のレプリカがあるインデックスの場合

各インデックス作成リクエストは 10 個のシャードにタッチ

検索リクエストは n 個のシャードに送信 = n はプライマリシャードの数

5 個のプライマリシャードと 1 個のレプリカがあるインデックスの場合、
各検索クエリは、そのインデックスの 5 個のシャード (プライマリまたはレプリカ) にタッチ

引用資料:
https://docs.aws.amazon.com/ja_jp/opensearch-service/latest/developerguide/bp.html

ktkt

UltraWarmについて

接続されたストレージではなく、Amazon S3 と高度なキャッシュソリューションを使用する

ウォームインデックスはホットストレージに戻されない限り読み取り専用

  • 専用のマスターノードが必要
  • s3を使用するためオーバーヘッドが発生しない
  • プライマリシャードのみを考慮する
  • 20 GiB シャードには、20 GiB のウォームストレージが必要
  • 一度に 1 つのインデックスを UltraWarm に移行

設定

有効にするかつ、ウォームノード数を選択する

コマンドから移行が可能

POST _ultrawarm/migration/my-index/_warm
GET _ultrawarm/migration/_status?v

移行の自動化

Amazon OpenSearch Service でのインデックスステート管理 を使用して自動化が可能

移行の調整

UltraWarm ストレージへのインデックスの移行には、強制マージが必要
強制マージオペレーションは削除対象としてマークされた
ドキュメントをパージし、ディスク領域を節約する

デフォルトでは、UltraWarm はインデックスを 1 つのセグメントにマージする

index.ultrawarm.migration.force_merge.max_num_segments 設定を使用
値を大きくすると移行プロセスが高速になるが、
移行終了後のウォームインデックスのクエリレイテンシーが長くなる

PUT my-index/_settings
{
  "index": {
    "ultrawarm": {
      "migration": {
        "force_merge": {
          "max_num_segments": 1
        }
      }
    }
  }
}

公式:
https://docs.aws.amazon.com/ja_jp/opensearch-service/latest/developerguide/ultrawarm.html

ktkt

Apache Kafkaの概要とアーキテクチャ 整理

Apache Kafka(以降、Kafka)= スケーラビリティに優れた分散メッセージキュー
(処理性能を重視したメッセージキュー)
複数台のマシンでクラスタを構成して分散処理を行うことで高いスループットを発揮
クラスタ内でデータを複製するため、一部のマシンに障害が発生してもデータを失うことなく処理を継続できる

メッセージキュー

システム間のデータの受け渡しを仲介し、データを一時的に保持(キューイング)するミドルウェア

メリット:

  • メッセージキューを介してシステム間の通信を行うことで、システム間の接続経路を簡略化し、システムを疎結合に保つ
  • システム間の通信を非同期化することで、データ流量の急激な増加によるシステムの負荷上昇を抑制

ユースケース:

  • データハブの構築

    • 多数のデータ間で大量のデータを受け渡すハブの役割を果たす
  • リアルタイムストリーミング

    • データの急増に対応するキューイング

用語整理

<登場人物>

クラスター:

ブローカー、トピック、パーティション、Zookeeperなどを構成する基盤システム

ブローカー:

メッセージシステムの中核となるサーバー
Producerから送信されたメッセージ(レコード)を受け取り、トピックのパーティションに格納
コンシューマーへのメッセージ配信を行う

  • トピック管理とパーティションを組んでおり、読み書きの負荷を分散する
  • パーティションのレプリカを他のブローカーに作成する
  • コンシューマーが最後に読んだメッセージの位置を記録
    • 途中から再開が可能になる

トピック:

メッセージのカテゴリ、フィード
kafkaがデータを整理する基本単位

一度トピックに書き込まれたデータは不変

パーティション:

トピックの中でデータを分割・管理する

  • データの分割
  • 並列処理
    • 複数のプロデューサーが同時に異なるパーティションにデータを書き込める
    • コンシューマが同時に異なるパーティションからデータを読み出せる
  • オフセットの管理
  • レプリケーションによる耐障害性の向上

プロデューサー:

メッセージを特定のパーティションに送信する
メッセージを送信する際にパーティションを指定、
あるいはkafkaにパーティションの選択を委ねることができる

複数のメッセージをバッチとしてグループ化し、まとめて送信も可能

コンシューマー:

トピックからメッセージを取得
トピックの特定パーティションからメッセージを読み出す

トピック内のメッセージが複数のコンシューマーに分散される
並行処理が可能になる

読み出したメッセージの位置を管理して、どのメッセージを読んだか追跡する

障害発生後の再開時に途中から読み出すことができる

Zookeeper:

分散システムにおける構成管理、名づけ、同期、グループサービス等を提供するサービス
ブローカー、トピック、パーテーションなどのメタデータを管理

システム構成

複数のBrokerでクラスタを構成

クラスタ上にTopicと呼ばれる分散キューを構成

Kafkaのメッセージ = キーバリュー形式でありRecordと呼ばれる

Topic = 複数のBrokerに分散配置されたPartitionで構成

Partition単位でRecordの書き込み/読み込みを行う

Topicに対する並列書き込み/読み出しを実現

Broker同士はクラスタコーディネータであるZooKeeperを使用して連携
Brokerの1つがリーダーとなりBrokerクラスタを管理

Producerと呼ばれる書き込み用ライブラリを通じて、BrokerのTopicにRecordを書き込み

Consumerと呼ばれる読み出し用ライブラリを通じてRecordを取り出し

Topicに書き込んだデータはBrokerのディスクに保存され、読み出されてもすぐには削除されない

同じTopicのデータを複数のアプリケーションから読み出すことができる

一定期間が経過したRecordや、Partitionの最大容量を超えた分のRecordは自動的に削除される
Partition 内のRecordはBroker間で複製されるため、一部のBrokerノードに障害が発生してもRecordが失われにくい

データ管理

Topic → 複数のパーティションで構成
Partition → ブローカー間で複製された複数のレプリカで構成

Partition = リードレプリカ(必ず1つ存在) + Followerレプリカ (なくてもOK)
リードレプリカに書き込まれたレコードはFollowerレプリカに複製される

各Leader ReplicaはBrokerに均等に割り当てられ、
Partitionへの書き込み/読み出しはLeader Replicaにのみ行われる

※Brokerやネットワークの問題により、LeaderからFollowerへの複製が遅延または停止
LeaderとFollowerの内容が同期しなくなることがある

デフォルトの設定では、複製が追い付いていない状態が
10秒間続いたFollowerは同期していないとみなされる

Brokerは常に「同期しているFollowerのリスト」を追跡
同期しているReplicaをIn Sync Replica(ISR)と呼ぶ

PartitionのISR数が最小数を下回る

Partition(のLeader Replica)はProducerから書き込み出来ない

ISR数が最小数まで回復

Partitionは再びProducerから書き込み出来る

データ保存

ブローカーはトピックに書き込まれたレコードをファイルに保存して永続化する

レプリカごとにデータディレクトリを作成

レコードは各ディレクトリ内のセグメントファイルに書き込まれる
セグメントの集合をログと呼ぶ

引用資料:
https://qiita.com/sigmalist/items/5a26ab519cbdf1e07af3

全体概要が分かりやすい資料:
https://zenn.dev/amezousan/scraps/7df6c1d21d8600

ktkt

kafka用のコマンド

# clusterの情報取得
aws kafka describe-cluster --cluster-arn arn:aws:kafka:ap-northeast-1:638015075418:cluster/maint-common-msk/7f463b86-2a26-48e9-ba76-f8e5be509eaa-4

# リージョンのアカウントにあるすべての MSK 設定を一覧表示
aws kafka list-configurations

# topicの情報取得
/***/kafka_ダウンロードバージョン/bin/kafka-topics.sh --bootstrap-server ブートストラップサーバー VPCエンドポイント --describe --topic トピック名

公式:

https://kafka.apache.org/26/documentation.html#topicconfigs
https://docs.aws.amazon.com/cli/latest/reference/kafka/describe-cluster.html