🐥
Spring IntegrationでTCP通信(Server)
はじめに
TCP通信の実装をしたことがなかったので、今回はSpring Integrationを使ってTCP Serverを実装してみました。
要件
-
クライアント側から3バイト送られてくるので、演算結果をレスポンスとして返す
- 1バイト目は加算・減算を表す"+" or "-"
- 2バイト目と3バイト目は
1-9
の数値
-
レスポンス
- 1バイト目には正常終了・異常終了を表す"A" or "N"
- 2バイト目に演算結果を格納
- 異常終了時には0を入れる
処理フロー
実装
結構雑に実装してしまっているかもしれませんが、大体のイメージはこんな感じです。
Config
TcpConfig.kt
/**
* TCPサーバーの設定クラス
* リクエストを受け付けるポート: ${tcp.server.port}
*/
@EnableIntegration
@Configuration
class TcpServerConfig(
@Value("\${tcp.server.port}")
private val port: Int,
private val calcHandler: CalcHandler,
private val calcSerializer: CalcSerializer,
) {
@Bean
fun tcpServer(): IntegrationFlow {
return IntegrationFlow.from(generateInboundGateway())
.handle {message: Message<CalcRequest> -> calcHandler.handle(message) }
.get()
}
private fun generateInboundGateway(): TcpInboundGatewaySpec {
return Tcp.inboundGateway(
Tcp.nioServer(port)
.serializer(calcSerializer)
.deserializer(calcSerializer)
)
}
}
Serializer
TCP通信におけるバイト配列とアプリケーションオブジェクトの相互変換を行う。
- クライアントからのバイトデータをリクエストオブジェクトに変換(deserialize)
- 演算結果のレスポンスオブジェクトをバイトデータに変換(serialize)
CalcSerializer.kt
@Component
class CalcSerializer : Serializer<CalcResponse>, Deserializer<CalcRequest> {
override fun deserialize(inputStream: InputStream): CalcRequest {
return try {
val buffer = ByteArray(BUFFER_SIZE)
if (inputStream.read(buffer) != BUFFER_SIZE) {
error("Invalid input size")
}
val num1 = convertToInt(buffer[FIRST_NUMBER_INDEX])
val num2 = convertToInt(buffer[SECOND_NUMBER_INDEX])
val operation = when (convertToInt(buffer[OPERATION_INDEX]).toChar()) {
ADDITION_SYMBOL -> Operation.ADD
SUBTRACTION_SYMBOL -> Operation.SUBTRACT
else -> error("Invalid operation")
}
CalcRequest(num1, num2, operation)
} catch (e: Exception) {
CalcRequest(0, 0, Operation.OTHER)
}
}
override fun serialize(response: CalcResponse, outputStream: OutputStream) {
val responseBytes = when (response.status) {
Status.SUCCESS -> buildResponseBytes(SUCCESS_STATUS, response.result)
Status.ERROR -> buildResponseBytes(ERROR_STATUS, ERROR_RESULT)
}
outputStream.write(responseBytes)
}
private fun buildResponseBytes(status: Char, calcResult: Int): ByteArray =
byteArrayOf(status.code.toByte(), calcResult.toByte())
private fun convertToInt(byte: Byte): Int =
String.format("%02X", byte).toInt(16)
companion object {
private const val BUFFER_SIZE = 3
private const val FIRST_NUMBER_INDEX = 1
private const val SECOND_NUMBER_INDEX = 2
private const val OPERATION_INDEX = 0
private const val SUCCESS_STATUS = 'A'
private const val ERROR_STATUS = 'N'
private const val ERROR_RESULT = 0
private const val ADDITION_SYMBOL = '+'
private const val SUBTRACTION_SYMBOL = '-'
}
}
Handler
CalcHandler.kt
interface CalcHandler {
fun handle(message: Message<CalcRequest>): Message<CalcResponse>
}
@Handler
class CalcHandler(private val calcUseCase: CalcUseCase) : CalcHandler {
override fun handle(message: Message<CalcRequest>): Message<CalcResponse> {
val response = when (message.payload.operation) {
Operation.OTHER -> CalcResponse(0, Status.ERROR)
else -> calcUseCase.execute(message.payload)
}
return MessageBuilder.withPayload(response)
.copyHeaders(message.headers)
.build()
}
}
UseCase
CalcUseCase.kt
interface CalcUseCase {
fun execute(request: CalcRequest): CalcResponse
}
@UseCase
class CalcUseCaseImpl: CalcUseCase {
override fun execute(request: CalcRequest): CalcResponse {
val result = when (request.operation) {
Operation.ADD -> request.num1 + request.num2
Operation.SUBTRACT -> request.num1 - request.num2
Operation.OTHER -> error("Invalid operation")
}
return CalcResponse(result, Status.SUCCESS)
}
}
まとめ
Spring Integrationを使って、TCP Serverを実装を紹介しました。公式ドキュメントを眺めていると、TCP 接続イベントやインターセプターなどの機能も記載されていたので、今後これらも試してみたいと思います。
Discussion