KotlinとSpring BootでKCL(Kinesis Client Library)を使ったマルチストリーム処理を実装する
はじめに
本記事では、Kinesis Client Library (KCL) を使って複数のKinesis Data Streamsを同時に処理する方法を解説します。
想定するユースケース
DynamoDBテーブルの変更をCDC(Change Data Capture)でKinesisに流し、それを処理するシナリオを例にします。
完全なソースコードはGithubで公開しています。
1. KCLのマルチストリーム処理
本記事では、Kinesis Data StreamsやKCLの詳細説明は別の機会にして、実装に焦点を当てます。
簡単に言うと:
- Kinesis Data Streams: リアルタイムデータを処理するAWSサービス
- KCL: Kinesisからデータを効率的に読み取るライブラリ
- マルチストリーム処理: 1つのアプリケーションで複数のKinesisストリームを同時に処理する機能
2. なぜマルチストリーム処理が必要か
従来の課題
これまでは1つのKCLアプリケーションで1つのストリームしか処理できませんでした。
問題点:
- 複数アプリの管理が複雑
- インフラコストが増大
- データの相関処理が困難
マルチストリーム処理のメリット
KCL 2.3以降では、単一アプリで複数ストリームを処理できます。
メリット:
- 運用がシンプルに
- コスト削減
- クロスストリーム処理が容易
クロスストリーム処理とは
異なるストリームからのデータを組み合わせて処理することを指しています。
例えば、今回の例では:
-
animals-stream
から動物の情報 -
foods-stream
から食べ物の情報
これらを単一のアプリケーションで受け取ることで、「どの動物が何を食べたか」のような相関処理が可能になります。
// 例:RecordProcessor内での実装イメージ
override fun processRecords(input: ProcessRecordsInput) {
input.records().forEach { record ->
val data = parseRecord(record)
when (data.tableName) {
"animals" -> {
// 動物情報を一時的に保存
animalCache[data.name] = data
}
"foods" -> {
// 食べ物情報と動物情報を組み合わせて処理
val relatedAnimal = animalCache[data.animalId]
if (relatedAnimal != null) {
// クロスストリーム処理の例
processAnimalFoodRelation(relatedAnimal, data)
}
}
}
}
}
従来は別々のアプリケーションで処理していたため、このような相関処理には:
- 外部ストレージ(Redis等)での状態共有
- 後続の統合処理
などが必要でしたが、マルチストリーム処理により単一アプリケーション内で完結することができます。
3. 実装編
環境構成
今回実装するシステムの全体像:
システム構成の説明
- DynamoDB Tables: データの保存先(animals, foods)
- CDC (Change Data Capture): DynamoDBの変更をKinesisにストリーミング
- Kinesis Streams: DynamoDBからの変更イベントを受け取る
- KCL Application: 複数のKinesisストリームからデータを処理
依存関係
build.gradle.kts
で以下のライブラリを追加します:
dependencies {
// KCL本体
implementation("software.amazon.kinesis:amazon-kinesis-client:3.0.1")
// AWS SDKs (KCLが内部で使用)
implementation("software.amazon.awssdk:kinesis:2.30.26")
implementation("software.amazon.awssdk:dynamodb:2.30.26")
implementation("software.amazon.awssdk:cloudwatch:2.30.26")
}
各依存関係の役割:
- amazon-kinesis-client: KCLのメインライブラリ
- awssdk:kinesis: Kinesisストリームとの通信
- awssdk:dynamodb: チェックポイントとリース情報の保存
- awssdk:cloudwatch: メトリクスの送信(オプション)
実装のポイント
1. RecordProcessor - データ処理ロジック
RecordProcessorは、Kinesisストリームから受信したデータを実際に処理するコンポーネントです。
ShardRecordProcessor
インターフェースを実装し、主に以下の5つのメソッドを実装する必要があります:
-
initialize
: シャード処理の初期化 -
processRecords
: レコードのバッチ処理(最も重要) -
leaseLost
: シャードのリースを失った場合の処理 -
shardEnded
: シャードの処理が完了した場合の処理 -
shutdownRequested
: KCLのシャットダウン要求時の処理
@Component
class RecordProcessor : ShardRecordProcessor {
override fun processRecords(processRecordsInput: ProcessRecordsInput) {
MDC.put(SHARD_ID_MDC_KEY, shardId)
try {
log.info("Processing {} record(s)", processRecordsInput.records().size)
processRecordsInput.records().forEach { record ->
// レコードのデータをByteArrayに変換
val byteArray = ByteArray(record.data().remaining())
record.data().get(byteArray)
// DynamoDB CDCのJSON形式のデータが含まれている
log.info("Processing record: {}", String(byteArray, Charsets.UTF_8))
// 実際のアプリケーションでは、ここでJSONをパースして
// ビジネスロジックを実装します
}
// チェックポイント:処理済みデータの位置を記録
// これにより、アプリケーション再起動時に重複処理を防ぐ
processRecordsInput.checkpointer().checkpoint()
} catch (e: KinesisClientLibRetryableException) {
log.error("Caught throwable while processing records. Aborting.", e)
} finally {
MDC.remove(SHARD_ID_MDC_KEY)
}
}
// その他のライフサイクルメソッドは省略
}
ポイント:
- MDCを使用してシャードIDをログに含める
- レコードはバッチで処理される(複数のレコードが一度に来る)
- チェックポイントを記録することで処理済み位置を保存
2. MultiStreamTracker - 複数ストリームの設定
MultiStreamTrackerは、処理対象となる複数のKinesisストリームを定義するコンポーネントです。
streamConfigList()
メソッドで処理したいストリームのリストを返します。
@Component
class MultiStreamTrackerImpl(
private val kinesisConfig: KinesisConfig
) : MultiStreamTracker {
override fun streamConfigList(): List<StreamConfig> {
return listOf(
// animalsストリームの設定
StreamConfig(
// ARN形式でストリームを指定
StreamIdentifier.multiStreamInstance(
Arn.fromString(kinesisConfig.animalsStreamName),
// エポックミリ秒(任意の値でOK)
1234567890L
),
// どこから読み始めるか
// TRIM_HORIZON: 最も古いレコードから
// LATEST: 最新のレコードから
// AT_TIMESTAMP: 特定の時刻から
InitialPositionInStreamExtended.newInitialPosition(
InitialPositionInStream.TRIM_HORIZON
)
),
// foodsストリームの設定
StreamConfig(
StreamIdentifier.multiStreamInstance(
Arn.fromString(kinesisConfig.foodsStreamName),
1234567890L
),
InitialPositionInStreamExtended.newInitialPosition(
InitialPositionInStream.TRIM_HORIZON
)
)
)
}
// リース削除戦略(デフォルトは削除しない)
override fun formerStreamsLeasesDeletionStrategy(): FormerStreamsLeasesDeletionStrategy? =
FormerStreamsLeasesDeletionStrategy.NoLeaseDeletionStrategy()
}
ポイント:
- 各ストリームはARN形式で指定する必要がある
- 初期読み取り位置を設定可能(今回は最初から読む設定)
- ストリームを追加/削除する場合は、このリストを更新するだけでOK
3. Consumer - 全体の制御
Consumerクラスは、KCLアプリケーション全体の調整を行うコンポーネントです。
主に以下の役割を担います:
- AWSクライアントの設定
- スケジューラの初期化と起動
- アプリケーションのライフサイクル管理
@Component
class Consumer(
private val kinesisClientFactory: KinesisClientFactory,
private val dynamoDbClientFactory: DynamoDbClientFactory,
private val cloudWatchClientFactory: CloudWatchClientFactory,
private val kinesisConfig: KinesisConfig,
private val recordProcessorFactory: RecordProcessorFactory
) {
private val executorService: ExecutorService = Executors.newSingleThreadExecutor()
private lateinit var scheduler: Scheduler
init {
start()
}
private fun start() {
// AWSクライアントの作成
val kinesisAsyncClient = kinesisClientFactory.create()
val dynamoDbAsyncClient = dynamoDbClientFactory.create()
val cloudWatchAsyncClient = cloudWatchClientFactory.create()
val workerIdentifier = UUID.randomUUID().toString()
// マルチストリーム設定
val streamTracker = MultiStreamTrackerImpl(kinesisConfig)
// KCLの設定ビルダー
val configsBuilder = ConfigsBuilder(
streamTracker,
APPLICATION_NAME, // アプリケーション名(チェックポイントテーブル名にも使用)
kinesisAsyncClient,
dynamoDbAsyncClient,
cloudWatchAsyncClient,
workerIdentifier,
recordProcessorFactory
)
// リース管理設定
// リース: どのワーカーがどのシャードを処理するかの情報
val leaseManagementConfig = LeaseManagementConfig(
LEASE_TABLE_NAME, // DynamoDBのテーブル名
APPLICATION_NAME,
dynamoDbAsyncClient,
kinesisAsyncClient,
workerIdentifier
).workerUtilizationAwareAssignmentConfig(
LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig()
.disableWorkerMetrics(true)
.workerMetricsTableConfig(
LeaseManagementConfig.WorkerMetricsTableConfig(APPLICATION_NAME)
)
)
// スケジューラの作成
scheduler = Scheduler(
configsBuilder.checkpointConfig(),
configsBuilder.coordinatorConfig(),
leaseManagementConfig,
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig(),
configsBuilder.retrievalConfig()
)
// スケジューラを別スレッドで起動
executorService.submit(scheduler)
}
@PreDestroy
fun shutdown() {
if (this::scheduler.isInitialized) {
scheduler.shutdown()
}
executorService.shutdown()
}
companion object {
const val APPLICATION_NAME = "sample-kcl-app"
// チェックポイント情報を保存するDynamoDBテーブル名
const val LEASE_TABLE_NAME = "lease_table"
}
}
ポイント:
- スケジューラが実際のストリーム処理を管理
- リーステーブル(DynamoDB)でワーカーとシャードの割り当てを管理
- 複数インスタンス起動時は、自動的に負荷分散される
4. 動作確認
テストシナリオ
以下の流れで動作を確認します:
- DynamoDB(localstack)にデータを追加
- CDCによりKinesisにデータが流れる
- KCLアプリケーションで処理
アプリケーションの起動
# Docker Composeで環境を起動
$ docker compose up -d
テストデータの投入
# animalsテーブルにデータ追加
$ awslocal dynamodb put-item \
--table-name animals \
--item '{"name": {"S": "dog"}}'
# foodsテーブルにデータ追加
$ awslocal dynamodb put-item \
--table-name foods \
--item '{"name": {"S": "apple"}}'
※awslocal
はLocalStack用のAWS CLIラッパーです。
実行結果
アプリケーションログ:
INFO 1 --- [dProcessor-0000] com.sample.kcl.RecordProcessor : Processing 1 record(s)
INFO 1 --- [dProcessor-0000] com.sample.kcl.RecordProcessor : Processing record: {"tableName": "animals", "recordFormat": "application/json", "userIdentity": null, "eventID": "41645dda", "dynamodb": {"ApproximateCreationDateTime": 1746843754, "SizeBytes": 20, "Keys": {"name": {"S": "dog"}}, "NewImage": {"name": {"S": "dog"}}}, "awsRegion": "ap-northeast-1", "eventSource": "aws:dynamodb", "eventName": "INSERT"}
INFO 1 --- [dProcessor-0007] com.sample.kcl.RecordProcessor : Processing 1 record(s)
INFO 1 --- [dProcessor-0007] com.sample.kcl.RecordProcessor : Processing record: {"tableName": "foods", "recordFormat": "application/json", "userIdentity": null, "eventID": "267a8fdd", "dynamodb": {"ApproximateCreationDateTime": 1746843762, "SizeBytes": 22, "Keys": {"name": {"S": "apple"}}, "NewImage": {"name": {"S": "apple"}}}, "awsRegion": "ap-northeast-1", "eventSource": "aws:dynamodb", "eventName": "INSERT"}
両方のストリームからデータが処理されていることが確認できましたので、成功です!
おわりに
KCLのマルチストリーム機能により、複数のKinesisストリームを効率的に処理できるようになりました。単一アプリケーションで管理できるため、運用コストを削減しながら、複雑なストリーム処理を実現できます。
また、本記事で実装した完全なソースコードはGithubで公開していますので、ご参照ください。
Discussion