Kinesis Data Stream の動作を確認してみる
Kinesis Data Stream
- AWS が提供するストリーミングマネージドサービス基盤。
- AWS CLI、SDK、KCLといったライブラリで操作可能。
- この記事ではAPIで実行される基本的な仕組みを理解することが目的。
- AWS CLI で操作してみた結果を確認してみる
ストリームの作成・削除
シャード数を指定するプロビジョニングモードで作成する
aws kinesis create-stream --stream-name test-stream1 --shard-count 1
aws kinesis describe-stream-summary --stream-name test-stream1
{
"StreamDescriptionSummary": {
"StreamName": "test-stream1",
"StreamARN": "arn:aws:kinesis:ap-northeast-1:xxxxxxxxx:stream/test-stream1",
"StreamStatus": "ACTIVE",
"RetentionPeriodHours": 24,
"StreamCreationTimestamp": "2022-07-07T15:41:26+09:00",
"EnhancedMonitoring": [
{
"ShardLevelMetrics": []
}
],
"EncryptionType": "NONE",
"OpenShardCount": 1,
"ConsumerCount": 0
}
}
aws kinesis delete-stream --stream-name test-stream1
Producer 側によるデータの書き込み
PutRecord、もしくは、PutRecords API でストレームに対してデータの書き込みを行う。
PutRecord
- 単一データレコードの書き込み用
- シャードあたり、1000レコード/sec、(パーティションキー含み)1MiB/sec がサポートされる
- 実行すると対象データが登録されたシャードID、シーケンス番号がレスポンスとして返却される。
aws kinesis put-record --stream-name test-stream1 --partition-key 123 --data testdata
{
"ShardId": "shardId-000000000000",
"SequenceNumber": "49631010431464371151031729505167814065343619411222724610"
}
PutRecords
- 複数データレコードの一括書き込み用
- シャードあたり、1000レコード/sec、(パーティションキー含み)1MiB/sec がサポートされる点はPutRecordと同じ。
- パーティションキーを含んだpayloadのサイズとして、各レコードあたり最大1MiB、全レコード含めてる場合は5MiBが上限値。
- 1 API で最大 500レコードまで登録できる。
- PutRecords API ではリクエストに含めたデータレコードの順番はストリーム側で保証されないので注意。
- 失敗レコード数、各レコード毎にシャードID、及び、シーケンス番号がレスポンスとして返る
aws kinesis put-records --stream-name test-stream1 \
--records \
Data=blob1,PartitionKey=partitionkey1 \
Data=blob2,PartitionKey=partitionkey2 \
Data=blob3,PartitionKey=partitionkey3 \
--cli-binary-format raw-in-base64-out
{
"FailedRecordCount": 0,
"Records": [
{
"ShardId": "shardId-000000000000",
"SequenceNumber": "49631010431464371151031729505791619788264821254246105090"
},
{
"ShardId": "shardId-000000000000",
"SequenceNumber": "49631010431464371151031729505792828714084435883420811266"
},
{
"ShardId": "shardId-000000000000",
"SequenceNumber": "49631010431464371151031729505794037639904050512595517442"
}
]
}
パーティションキー
- パーティションキーは、ストリーム内のデータをシャード単位でグループ化するために使用される。
- パーティションキーは Unicode 文字列で、各キーの最大長は 256 文字に制限
- 指定されたパーティションキーをKinesisはMD5 ハッシュ関数にかけて、パーティションキーを 128 ビットの整数値にマッピング、対応するデータレコードを特定のシャードに格納する
### ハッシュ関数にかけて16進数の128bitのハッシュキーに変換
>>> import hashlib
>>> md5hash=hashlib.md5(b'123').hexdigest()
>>> print(md5hash)
202cb962ac59075b964b07152d234b70
### 対象の16進数を10進数に変換
>>> print(int(md5hash,16))
42767516990368493138776584305024125808
### 各シャードに対応するハッシュキーの範囲はこちらでわかる。先ほど10進数に変換した数値が範囲に含まれるシャードに対象レコードが配置される認識
aws kinesis describe-stream --stream-name test-stream1 | jq -r '.StreamDescription.Shards[]'
{
"ShardId": "shardId-000000000000",
"HashKeyRange": {
"StartingHashKey": "0",
"EndingHashKey": "340282366920938463463374607431768211455"
},
"SequenceNumberRange": {
"StartingSequenceNumber": "49631165774887304883587633739165346216849000595592314882"
}
}
### 1シャードの場合、StartingHashKey=0、EndingHashKey=340282366920938463463374607431768211455だが128bitの16進数の開始値と終了値が以下の通りのため
>>> print(int("00000000000000000000000000000000",16))
0
>>> print(int("ffffffffffffffffffffffffffffffff",16))
340282366920938463463374607431768211455
Consumer 側によるデータの読み込み
- 初めに、ストリームに格納されたデータを読み取るには、GetShardIterator API で対象となるシャードのシャードイテレーターを取得する必要がある。この際指定必須なものとしては、StreamName,ShardId,ShardIteratorTypeの3つ。
- 次に、GetRecords API で取得したシャードイテレータを指定し、データを取り出すという流れ。
GetShardIterator - ShardIteratorType
GetShardIterator API で指定する shard-iterator-type により、GetRecords API で取得できるレコードの基準が変わる。
-
AT_SEQUENCE_NUMBER
- 指定したシーケンス番号から読み取り実施。
-
AFTER_SEQUENCE_NUMBER
- 指定したシーケンス番号より後のレコードから読み取り実施
-
AT_TIMESTAMP
- 指定したタイムスタンプから読み取り実施。
- ピンポイントのタイムスタンプのレコードがない場合は、その次点のレコードから取得。
- シャードで最も古いレコードよりも古いタイムスタンプが指定されたら動作はTRIM_HORIZONと同様。
-
TRIM_HORIZON
- シャードに残存している最も古いレコードから読み取り。
-
LATEST
- シャードに残存している最新のレコードの後から読み取りを実施。
- そのため、取得できるのは、あくまでGetShardIterator時点より後で登録されていくデータレコードとなる。
GetRecords
- 1 つのシャードから最大 10 MB のデータを取得でき、呼び出しごとに最大 10,000 レコードを取得可能。
- GetRecords への各呼び出しは、1 つの読み込みトランザクションとしてカウント
- 各シャードは 1 秒あたり最大 5 件のトランザクションをサポート
- 各トランザクションは最大 10,000 レコードを提供でき、トランザクションあたり 10 MB のクォータ上限
- 各シャードは、最大 2 MB/秒の合計データ読み取りレートをサポートしている。仮にシャードからの最大読み取りサイズ10MBを読み取った場合は、次の 5 秒以内に行われたそれ以降の呼び出しでは、例外 ProvisionedThroughputExceededException がスローされる
- Limit 句で取得するレコード数を指定(default 10000)
# シャードイテレータ取得(LATEST)
SHARD_ITERATOR=$(aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type LATEST --stream-name test-stream1 --query 'ShardIterator')
# get-records するがこの時点では何もデータレコードは取得できない(この例ではLATESTのため)
aws kinesis get-records --shard-iterator $SHARD_ITERATOR
{
"Records": [],
"NextShardIterator": "AAAAAAAAAAHK2PAEZveGI16HGrn6suWCR3nX9z/42azA9V7J/E2iTI+lekKxfhmYN2Jdc55M9duWQZ/HQQmEeXOmIafyypSlCxK+m10OihPvLW4RwD1ga28H1N0SLxmT+AuHf1VG84BoX8BTP9tCMrA47TPNlNLCnaIX/FSXxjk9P2kMUiIxHG7jug2HIRQ2nXbQUMRy/bRuWZ8T3YE/w3SwZ929WdKhSOzZsyd1NLLm5h53AWUOww==",
"MillisBehindLatest": 0
}
# 適当にデータをput-record
aws kinesis put-record --stream-name test-stream1 --partition-key 123 --data testdata
{
"ShardId": "shardId-000000000000",
"SequenceNumber": "49631010431464371151031729519717236304406277532833284098"
}
# もう一度データレコードを get-records すると直前に登録したデータレコードが取得できる
aws kinesis get-records --shard-iterator $SHARD_ITERATOR
{
"Records": [
{
"Data": "dGVzdGRhdGE=",
"PartitionKey": "123",
"ApproximateArrivalTimestamp": 1656773892.592,
"SequenceNumber": "49631010431464371151031729519717236304406277532833284098"
}
],
"NextShardIterator": "AAAAAAAAAAH0RcIvB1T9/CNRWjYi99KD8Rz6PmoUGNbmCVj2U/OPeQo1fZ22C/GrVu/w0XMo4x8b2jIv7fQL35O77XN3fOKdnTuBHXasGcaHYSAnND25Nlh0SN3/sEJj2HtAX6B1DL4FgylvMRwSy8f8fGVgVIB6wLeJvuN+lJzL9iEu5cGpYvps3PjJvfpxB96PqSPOFzH2deUB7M/g3qQAraA3sNUt77Ral8r9zLFZe1BKf8Z+YQ==",
"MillisBehindLatest": 0
}
# base64 デコードすることで生データが閲覧できる。
echo "dGVzdGRhdGE=" | base64 -d
testdata
Kinesis Client Library(KCL)
Kinesis Data Streams 専用の Consumer ライブラリ。Kinesis Data Stream を運用する上で重要な以下のようなタスクを自動的に実行してくれる。KCLコンシュマーアプリケーション間の情報共有目的として、各シャード毎にどのレコードまで処理したのか、また、各シャードはどのワーカーで処理しているかなどをDynamoDBで管理する。KCL は Java ライブラリだが、multi-language 対応されており、他の言語(Python等)から呼び出せるようになっている。
- 複数のコンシューマーアプリケーションインスタンス間での負荷分散
- コンシューマーアプリケーションインスタンスの障害に対する応答
- 処理済みのレコードのチェックポイント作成
- リシャーディングへの対応
- デフォルトで拡張ファンアウトを使うのでスループット的にも有利。
KCL for Python の場合
Python で実行できるのは非常に嬉しい
# Set up for Linux
sudo yum -y install python-pip
sudo pip install virtualenv
virtualenv kclpy-sample-env
source kclpy-sample-env/bin/activate
pip install amazon_kclpy
git clone https://github.com/awslabs/amazon-kinesis-client-python.git
cd amazon-kinesis-client-python
# samples/sample.properties を修正する
# The name of an Amazon Kinesis stream to process.
streamName = test-stream1
~~~
# The KCL defaults to us-east-1
regionName = ap-northeast-1
KCL アプリケーション実行
### amazon_kclpy_helper.py コマンドで実行するjavaコマンドを出力できるのでそのjavaコマンドを実行するという形
amazon_kclpy_helper.py --print_command --java /usr/bin/java --properties samples/sample.properties
### 面倒な場合は以下でそのまま出力されるjavaコマンドを実行できる
`amazon_kclpy_helper.py --print_command --java /usr/bin/java --properties samples/sample.properties`
sample_kclpy_app.py がデフォルトで実行されるアプリケーションロジック。中のprocess_recordメソッドの[Insert your processing logic here]の箇所を修正することで独自のロジックを適用できる。
def process_record(self, data, partition_key, sequence_number, sub_sequence_number):
"""
Called for each record that is passed to process_records.
:param str data: The blob of data that was contained in the record.
:param str partition_key: The key associated with this record.
:param int sequence_number: The sequence number associated with this record.
:param int sub_sequence_number: the sub sequence number associated with this record.
"""
####################################
# Insert your processing logic here
####################################
self.log("Record (Partition Key: {pk}, Sequence Number: {seq}, Subsequence Number: {sseq}, Data Size: {ds}"
.format(pk=partition_key, seq=sequence_number, sseq=sub_sequence_number, ds=len(data)))
実行される sample_kclpy_app.py を明示的に指定する場合、samples/sample.propertiesのexecutableNameを修正する
executableName = /root/kinesis/amazon-kinesis-client-python/samples/sample_kclpy_app.py
Kinesis Data Stream のモニタリング
AWSサービスのモニタリングといえば、CloudWatchメトリクス。メトリクスの意味を理解して使いこなすことは非常に重要。KDSでは以下の2種類のメトリクスが使えるが、いずれも1分間隔のメトリクスで拡張の方は有料。
- 基本 - ストリームレベル
- PutRecord.* は PutRecord API に関するメトリクス
- PutRecords.* は PutRecords API に関するメトリクス
- GetRecords.* は GetRecords API に関するメトリクス
- GetRecords.IteratorAgeMilliseconds は現在の時刻と、各 GetRecords 呼び出しの最後のレコードがストリームに書き込まれた時刻の差を意味する。要するに、このメトリクス値を確認することでプロデューサーからの書き込みに対してコンシュマーによる読み込みがどの程度遅延しているかを判別することができる。書き込まれたと同時に読み込まれるような場合は0。基本的にはその期間のGetRecordsオペレーションの中での最大遅延を示す Maximum か、平均遅延を示す Average で確認するのが無難かなと考える。
- IncomingBytes/IncomingRecords はPutRecord、PutRecords API による指定期間中のbyte数、レコード数。
- WriteProvisionedThroughputExceeded は書き込み操作が容量不足でスロットリングにより拒否されたデータレコード数を示す。
- ReadProvisionedThroughputExceeded は読み込み操作が容量不足でスロットリングにより拒否されたデータレコード数を示す。
- SampleCount x 1 = 各 API x 1。
- 拡張 - シャードレベル
Consumer として Lambda を利用
- Lambda から取得するデータレコード数(BatchSize)、ストリーム側でデータレコードをバッファリングする時間を指定するバッチ処理ウィンドウ(MaximumBatchingWindowInSeconds)、1シャードあたりにLambdaを同時にいくつConsumerとして起動するか(ParallelizationFactor)等指定可能
- 拡張ファンアウトを使うConsumerとして構成も可能。
Quota
Discussion
初投稿ですがよろしくお願いします!