🌊

KotlinとSpring BootでKCL(Kinesis Client Library)を使ったマルチストリーム処理を実装する

に公開

はじめに

本記事では、Kinesis Client Library (KCL) を使って複数のKinesis Data Streamsを同時に処理する方法を解説します。

想定するユースケース

DynamoDBテーブルの変更をCDC(Change Data Capture)でKinesisに流し、それを処理するシナリオを例にします。

完全なソースコードはGithubで公開しています。

1. KCLのマルチストリーム処理

本記事では、Kinesis Data StreamsやKCLの詳細説明は別の機会にして、実装に焦点を当てます。

簡単に言うと:

  • Kinesis Data Streams: リアルタイムデータを処理するAWSサービス
  • KCL: Kinesisからデータを効率的に読み取るライブラリ
  • マルチストリーム処理: 1つのアプリケーションで複数のKinesisストリームを同時に処理する機能

2. なぜマルチストリーム処理が必要か

従来の課題

これまでは1つのKCLアプリケーションで1つのストリームしか処理できませんでした。

問題点:

  • 複数アプリの管理が複雑
  • インフラコストが増大
  • データの相関処理が困難

マルチストリーム処理のメリット

KCL 2.3以降では、単一アプリで複数ストリームを処理できます。

メリット:

  • 運用がシンプルに
  • コスト削減
  • クロスストリーム処理が容易

クロスストリーム処理とは

異なるストリームからのデータを組み合わせて処理することを指しています。

例えば、今回の例では:

  • animals-streamから動物の情報
  • foods-streamから食べ物の情報

これらを単一のアプリケーションで受け取ることで、「どの動物が何を食べたか」のような相関処理が可能になります。

// 例:RecordProcessor内での実装イメージ
override fun processRecords(input: ProcessRecordsInput) {
    input.records().forEach { record ->
        val data = parseRecord(record)
        
        when (data.tableName) {
            "animals" -> {
                // 動物情報を一時的に保存
                animalCache[data.name] = data
            }
            "foods" -> {
                // 食べ物情報と動物情報を組み合わせて処理
                val relatedAnimal = animalCache[data.animalId]
                if (relatedAnimal != null) {
                    // クロスストリーム処理の例
                    processAnimalFoodRelation(relatedAnimal, data)
                }
            }
        }
    }
}

従来は別々のアプリケーションで処理していたため、このような相関処理には:

  • 外部ストレージ(Redis等)での状態共有
  • 後続の統合処理

などが必要でしたが、マルチストリーム処理により単一アプリケーション内で完結することができます。

3. 実装編

環境構成

今回実装するシステムの全体像:

システム構成の説明

  1. DynamoDB Tables: データの保存先(animals, foods)
  2. CDC (Change Data Capture): DynamoDBの変更をKinesisにストリーミング
  3. Kinesis Streams: DynamoDBからの変更イベントを受け取る
  4. KCL Application: 複数のKinesisストリームからデータを処理

依存関係

build.gradle.ktsで以下のライブラリを追加します:

dependencies {
    // KCL本体
    implementation("software.amazon.kinesis:amazon-kinesis-client:3.0.1")
    
    // AWS SDKs (KCLが内部で使用)
    implementation("software.amazon.awssdk:kinesis:2.30.26")
    implementation("software.amazon.awssdk:dynamodb:2.30.26")
    implementation("software.amazon.awssdk:cloudwatch:2.30.26")
}

各依存関係の役割:

  • amazon-kinesis-client: KCLのメインライブラリ
  • awssdk:kinesis: Kinesisストリームとの通信
  • awssdk:dynamodb: チェックポイントとリース情報の保存
  • awssdk:cloudwatch: メトリクスの送信(オプション)

実装のポイント

1. RecordProcessor - データ処理ロジック

RecordProcessorは、Kinesisストリームから受信したデータを実際に処理するコンポーネントです。
ShardRecordProcessorインターフェースを実装し、主に以下の5つのメソッドを実装する必要があります:

  • initialize: シャード処理の初期化
  • processRecords: レコードのバッチ処理(最も重要)
  • leaseLost: シャードのリースを失った場合の処理
  • shardEnded: シャードの処理が完了した場合の処理
  • shutdownRequested: KCLのシャットダウン要求時の処理
@Component
class RecordProcessor : ShardRecordProcessor {
    
    override fun processRecords(processRecordsInput: ProcessRecordsInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId)
        try {
            log.info("Processing {} record(s)", processRecordsInput.records().size)
            processRecordsInput.records().forEach { record ->
                // レコードのデータをByteArrayに変換
                val byteArray = ByteArray(record.data().remaining())
                record.data().get(byteArray)
                
                // DynamoDB CDCのJSON形式のデータが含まれている
                log.info("Processing record: {}", String(byteArray, Charsets.UTF_8))
                
                // 実際のアプリケーションでは、ここでJSONをパースして
                // ビジネスロジックを実装します
            }
            
            // チェックポイント:処理済みデータの位置を記録
            // これにより、アプリケーション再起動時に重複処理を防ぐ
            processRecordsInput.checkpointer().checkpoint()
        } catch (e: KinesisClientLibRetryableException) {
            log.error("Caught throwable while processing records. Aborting.", e)
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY)
        }
    }
    
    // その他のライフサイクルメソッドは省略
}

ポイント:

  • MDCを使用してシャードIDをログに含める
  • レコードはバッチで処理される(複数のレコードが一度に来る)
  • チェックポイントを記録することで処理済み位置を保存

2. MultiStreamTracker - 複数ストリームの設定

MultiStreamTrackerは、処理対象となる複数のKinesisストリームを定義するコンポーネントです。
streamConfigList()メソッドで処理したいストリームのリストを返します。

@Component
class MultiStreamTrackerImpl(
    private val kinesisConfig: KinesisConfig
) : MultiStreamTracker {
    
    override fun streamConfigList(): List<StreamConfig> {
        return listOf(
            // animalsストリームの設定
            StreamConfig(
                // ARN形式でストリームを指定
                StreamIdentifier.multiStreamInstance(
                    Arn.fromString(kinesisConfig.animalsStreamName),
                    // エポックミリ秒(任意の値でOK)
                    1234567890L
                ),
                // どこから読み始めるか
                // TRIM_HORIZON: 最も古いレコードから
                // LATEST: 最新のレコードから
                // AT_TIMESTAMP: 特定の時刻から
                InitialPositionInStreamExtended.newInitialPosition(
                    InitialPositionInStream.TRIM_HORIZON
                )
            ),
            // foodsストリームの設定
            StreamConfig(
                StreamIdentifier.multiStreamInstance(
                    Arn.fromString(kinesisConfig.foodsStreamName),
                    1234567890L
                ),
                InitialPositionInStreamExtended.newInitialPosition(
                    InitialPositionInStream.TRIM_HORIZON
                )
            )
        )
    }
    
    // リース削除戦略(デフォルトは削除しない)
    override fun formerStreamsLeasesDeletionStrategy(): FormerStreamsLeasesDeletionStrategy? =
        FormerStreamsLeasesDeletionStrategy.NoLeaseDeletionStrategy()
}

ポイント:

  • 各ストリームはARN形式で指定する必要がある
  • 初期読み取り位置を設定可能(今回は最初から読む設定)
  • ストリームを追加/削除する場合は、このリストを更新するだけでOK

3. Consumer - 全体の制御

Consumerクラスは、KCLアプリケーション全体の調整を行うコンポーネントです。
主に以下の役割を担います:

  • AWSクライアントの設定
  • スケジューラの初期化と起動
  • アプリケーションのライフサイクル管理
@Component
class Consumer(
    private val kinesisClientFactory: KinesisClientFactory,
    private val dynamoDbClientFactory: DynamoDbClientFactory,
    private val cloudWatchClientFactory: CloudWatchClientFactory,
    private val kinesisConfig: KinesisConfig,
    private val recordProcessorFactory: RecordProcessorFactory
) {
    private val executorService: ExecutorService = Executors.newSingleThreadExecutor()
    private lateinit var scheduler: Scheduler

    init {
        start()
    }

    private fun start() {
        // AWSクライアントの作成
        val kinesisAsyncClient = kinesisClientFactory.create()
        val dynamoDbAsyncClient = dynamoDbClientFactory.create()
        val cloudWatchAsyncClient = cloudWatchClientFactory.create()
        
        val workerIdentifier = UUID.randomUUID().toString()
        
        // マルチストリーム設定
        val streamTracker = MultiStreamTrackerImpl(kinesisConfig)
        
        // KCLの設定ビルダー
        val configsBuilder = ConfigsBuilder(
            streamTracker,
            APPLICATION_NAME,  // アプリケーション名(チェックポイントテーブル名にも使用)
            kinesisAsyncClient,
            dynamoDbAsyncClient,
            cloudWatchAsyncClient,
            workerIdentifier,
            recordProcessorFactory
        )
        
        // リース管理設定
        // リース: どのワーカーがどのシャードを処理するかの情報
        val leaseManagementConfig = LeaseManagementConfig(
            LEASE_TABLE_NAME,  // DynamoDBのテーブル名
            APPLICATION_NAME,
            dynamoDbAsyncClient,
            kinesisAsyncClient,
            workerIdentifier
        ).workerUtilizationAwareAssignmentConfig(
            LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig()
                .disableWorkerMetrics(true)
                .workerMetricsTableConfig(
                    LeaseManagementConfig.WorkerMetricsTableConfig(APPLICATION_NAME)
                )
        )
        
        // スケジューラの作成
        scheduler = Scheduler(
            configsBuilder.checkpointConfig(),
            configsBuilder.coordinatorConfig(),
            leaseManagementConfig,
            configsBuilder.lifecycleConfig(),
            configsBuilder.metricsConfig(),
            configsBuilder.processorConfig(),
            configsBuilder.retrievalConfig()
        )
        
        // スケジューラを別スレッドで起動
        executorService.submit(scheduler)
    }
    
    @PreDestroy
    fun shutdown() {
        if (this::scheduler.isInitialized) {
            scheduler.shutdown()
        }
        executorService.shutdown()
    }
    
    companion object {
        const val APPLICATION_NAME = "sample-kcl-app"
        // チェックポイント情報を保存するDynamoDBテーブル名
        const val LEASE_TABLE_NAME = "lease_table"
    }
}

ポイント:

  • スケジューラが実際のストリーム処理を管理
  • リーステーブル(DynamoDB)でワーカーとシャードの割り当てを管理
  • 複数インスタンス起動時は、自動的に負荷分散される

4. 動作確認

テストシナリオ

以下の流れで動作を確認します:

  1. DynamoDB(localstack)にデータを追加
  2. CDCによりKinesisにデータが流れる
  3. KCLアプリケーションで処理

アプリケーションの起動

# Docker Composeで環境を起動
$ docker compose up -d

テストデータの投入

# animalsテーブルにデータ追加
$ awslocal dynamodb put-item \
    --table-name animals \
    --item '{"name": {"S": "dog"}}'

# foodsテーブルにデータ追加
$ awslocal dynamodb put-item \
    --table-name foods \
    --item '{"name": {"S": "apple"}}'

awslocalはLocalStack用のAWS CLIラッパーです。

実行結果

アプリケーションログ:

INFO 1 --- [dProcessor-0000] com.sample.kcl.RecordProcessor : Processing 1 record(s)
INFO 1 --- [dProcessor-0000] com.sample.kcl.RecordProcessor : Processing record: {"tableName": "animals", "recordFormat": "application/json", "userIdentity": null, "eventID": "41645dda", "dynamodb": {"ApproximateCreationDateTime": 1746843754, "SizeBytes": 20, "Keys": {"name": {"S": "dog"}}, "NewImage": {"name": {"S": "dog"}}}, "awsRegion": "ap-northeast-1", "eventSource": "aws:dynamodb", "eventName": "INSERT"}
INFO 1 --- [dProcessor-0007] com.sample.kcl.RecordProcessor : Processing 1 record(s)
INFO 1 --- [dProcessor-0007] com.sample.kcl.RecordProcessor : Processing record: {"tableName": "foods", "recordFormat": "application/json", "userIdentity": null, "eventID": "267a8fdd", "dynamodb": {"ApproximateCreationDateTime": 1746843762, "SizeBytes": 22, "Keys": {"name": {"S": "apple"}}, "NewImage": {"name": {"S": "apple"}}}, "awsRegion": "ap-northeast-1", "eventSource": "aws:dynamodb", "eventName": "INSERT"}

両方のストリームからデータが処理されていることが確認できましたので、成功です!

おわりに

KCLのマルチストリーム機能により、複数のKinesisストリームを効率的に処理できるようになりました。単一アプリケーションで管理できるため、運用コストを削減しながら、複雑なストリーム処理を実現できます。
また、本記事で実装した完全なソースコードはGithubで公開していますので、ご参照ください。

参考資料

Discussion