🐥

Kinesis Data Streamちょこっとだけ触ってみた(3分)

2024/04/25に公開

背景

最近出た新しいAWSのアソシエイト資格(Data Engineer何ちゃら)の勉強がてら触ってみました。

目的

KDS(Kinesis Data Stream)からPut recordでレコードを送信して、ConsumerからGet recordsをしてレコードの内容を取得する

Kinesis Data Streams

Kinesis Data Streamで適当なストリーム名Test-streamsを設定してオンデマンドで作成

レコード送信してみる

AWSコンソール上部のCloudShellを起動して下記コマンドを3回実施
aws kinesis put-record --stream-name Test-streams --partition-key user1 --data "Hello World" --cli-binary-format raw-in-base64-out

[cloudshell-user@ip-10-134-4-171 ~]$ aws kinesis put-record --stream-name Test-streams --partition-key user1 --data "Hello World" --cli-binary-format raw-in-base64-out
{
    "ShardId": "shardId-000000000000",
    "SequenceNumber": "49651427706747538891980796863704942287944696861433004034"
}
[cloudshell-user@ip-10-134-4-171 ~]$ aws kinesis put-record --stream-name Test-streams --partition-key user1 --data "Hello World" --cli-binary-format raw-in-base64-out
{
    "ShardId": "shardId-000000000000",
    "SequenceNumber": "49651427706747538891980796863724285101058532027739930626"
}
[cloudshell-user@ip-10-134-4-171 ~]$ aws kinesis put-record --stream-name Test-streams --partition-key user1 --data "Hello World" --cli-binary-format raw-in-base64-out
{
    "ShardId": "shardId-000000000000",
    "SequenceNumber": "49651427706747538891980796863726702952697761492247773186"
}
コマンドの説明

このコマンドは、Amazon Kinesis Streams にデータレコードを送信しています。具体的には、aws kinesis put-record コマンドを使用して、「Test-streams」という名前のKinesisストリームに対して、データレコードを追加しています。このコマンドの各パラメータは以下の通りです:

--stream-name Test-streams:データを送信するストリームの名前です。
--partition-key user1:このレコードのパーティションキーを指定します。このキーに基づいて、レコードが特定のシャードに割り当てられます。
--data "Hello World":ストリームに送信するデータです。ここでは「Hello World」という文字列がデータとして送信されています。
--cli-binary-format raw-in-base64-out:このオプションにより、入力されたバイナリデータがBase64エンコーディングされて出力されるようになります。

ここからわかること

  • "Hello World" というデータが Kinesis ストリームに確実に格納されている
  • データレコードは違うShardIdに送られている
    SequenceNumber: このレコードのシーケンス番号で、特定のシャード内で一意にレコードを識別するために使用されます。これはデータが追加された順番を追跡するのに役立ちます。

Consumerからレコードを取得してみる

[cloudshell-user@ip-10-134-4-171 ~]$ aws kinesis describe-stream --stream-name Test-streams
{
    "StreamDescription": {
        "Shards": [
            {
                "ShardId": "shardId-000000000000",
                "HashKeyRange": {
                    "StartingHashKey": "0",
                    "EndingHashKey": "85070591730234615865843651857942052863"
                },
                "SequenceNumberRange": {
                    "StartingSequenceNumber": "49651427706747538891980796863338637764601430342795001858"
                }
            },
            {
                "ShardId": "shardId-000000000001",
:...skipping...
                "ShardId": "shardId-000000000000",
                "HashKeyRange": {
                    "StartingHashKey": "0",
                    "EndingHashKey": "85070591730234615865843651857942052863"
                },
                "SequenceNumberRange": {
                    "StartingSequenceNumber": "49651427706747538891980796863338637764601430342795001858"
                }
            },
            {
                "ShardId": "shardId-000000000001",
:...skipping...
aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Test-streams
コマンドの説明

このコマンド aws kinesis get-shard-iterator は、Amazon Kinesis Streams からデータを読み取るためのシャードイテレータを取得するものです。具体的には、次のパラメータを使っています:

--shard-id shardId-000000000000: イテレータを取得したいシャードのIDを指定します。
--shard-iterator-type TRIM_HORIZON: シャードイテレータのタイプを指定します。TRIM_HORIZON はシャードに格納されているデータの中で最も古いものから読み取りを開始することを意味します。
--stream-name Test-streams: データを読み取るストリームの名前です。
コマンドの実行結果として返される ShardIterator は、指定されたシャードからデータを読み始めるためのポインタのようなものです。このイテレータを使って get-records コマンドを実行することにより、ストリームのデータを読み取ることができます。

このイテレータは、特定のシャードからデータを効率的に読み出すために必要であり、データの一貫性と順序を保ちながら効率的に処理を行うことを可能にします。

shard iteratorを指定してget-recordsしてみる

[cloudshell-user@ip-10-134-4-171 ~]$ aws kinesis get-records --shard-iterator AAAAAAAAAAFViEOglTV8jbgF7Y0oAWH2FCA4zu3KWXvIeNvHaEfMR8sPf5Uy9fDcZFuPpqOBVE+swT9UAYwS1YMrA9/enrAcHU1hgUOaN2WCRsCtmFlss6rsTUSTBvCLU5T1M3sGrZGvQB/RVuUtGN62zcnNFQdN0AitDh9lHuYH8blT65yJXS+nkb0/xqXYjIiO2Wgc+ej0DDr9Zoto+tw2M126Wf7QA18Enq+1jRon3whuaPCXlg==
{
    "Records": [
        {
            "SequenceNumber": "49651427706747538891980796863704942287944696861433004034",
            "ApproximateArrivalTimestamp": "2024-04-24T19:10:16.207000+00:00",
            "Data": "SGVsbG8gV29ybGQ=",
            "PartitionKey": "user1"
        },
        {
            "SequenceNumber": "49651427706747538891980796863724285101058532027739930626",
            "ApproximateArrivalTimestamp": "2024-04-24T19:10:32.680000+00:00",
            "Data": "SGVsbG8gV29ybGQ=",
            "PartitionKey": "user1"
        },
        {
            "SequenceNumber": "49651427706747538891980796863726702952697761492247773186",
            "ApproximateArrivalTimestamp": "2024-04-24T19:10:35.579000+00:00",
            "Data": "SGVsbG8gV29ybGQ=",
            "PartitionKey": "user1"
        }
    ],
    "NextShardIterator": "AAAAAAAAAAHYtl4lBM1d+RHyWFOuqM4fqB4K6AORxKDO7p/IP4U5TOvsVaT359oKioQBAh+x7angDemQxMtyhhS6FKPZErbsr8GlimANC/T993a70OzBlefkxYlQT3pBjZhhzYQ65Fg9+elKIFlyWReqa6xkyqPhFdVxXYwYNntMn6NfT3m0WCczXLmVn9xGTk+k922K74Ayw4oZ2cFHMF77Xw0y6CMY8y7Qg5EfPBXQY3kR5421aA==",
    "MillisBehindLatest": 0
}

下記サイトでDataのValueをdecodeしてみる
https://www.en-pc.jp/tech/base64.php


Dataがレコード送信した内容と一致することがわかった

ようごのせつめい

Partition key

Amazon Kinesis Streams においてデータを分散させるために使用されるキー。
このキーは、データレコードをストリームの異なるシャードにルーティングする際の基準になります。
具体的には、パーティションキーを元にハッシュ値が生成され、このハッシュ値に基づいて各レコードが特定のシャードに割り当てられます。

パーティションキーは、データの均一な分散を実現し、データ処理のスケーラビリティと効率を向上させるために重要です。
キーをうまく設計することで、データの過負荷が特定のシャードに集中することなく、ストリーム全体のリソースを効果的に活用することができます。
例えば、ユーザーIDやタイムスタンプなど、データの特徴を反映した値をパーティションキーとして選ぶことが一般的です。

Shard

Kinesis Data Streams では、ストリームは一つまたは複数のシャードから構成されており、各シャードは独立したデータレコードの集合を保持しています。
パーティションキーに基づいて計算されたハッシュ値によって、各データレコードは特定のシャードに割り当てられます。

Discussion