🐥

Spring IntegrationでTCP通信(Server)

2025/01/28に公開

はじめに

TCP通信の実装をしたことがなかったので、今回はSpring Integrationを使ってTCP Serverを実装してみました。

要件

  • クライアント側から3バイト送られてくるので、演算結果をレスポンスとして返す

    • 1バイト目は加算・減算を表す"+" or "-"
    • 2バイト目と3バイト目は1-9の数値
  • レスポンス

    • 1バイト目には正常終了・異常終了を表す"A" or "N"
    • 2バイト目に演算結果を格納
      • 異常終了時には0を入れる

処理フロー

実装

結構雑に実装してしまっているかもしれませんが、大体のイメージはこんな感じです。

Config

TcpConfig.kt
/**
* TCPサーバーの設定クラス
* リクエストを受け付けるポート: ${tcp.server.port}
*/
@EnableIntegration
@Configuration
class TcpServerConfig(
  @Value("\${tcp.server.port}")
  private val port: Int,
  private val calcHandler: CalcHandler,
  private val calcSerializer: CalcSerializer,
) {

  @Bean
  fun tcpServer(): IntegrationFlow {
    return IntegrationFlow.from(generateInboundGateway())
      .handle {message: Message<CalcRequest> -> calcHandler.handle(message) }
      .get()
  }

  private fun generateInboundGateway(): TcpInboundGatewaySpec {
    return Tcp.inboundGateway(
      Tcp.nioServer(port)
        .serializer(calcSerializer)
        .deserializer(calcSerializer)
    )
  }
}

Serializer

TCP通信におけるバイト配列とアプリケーションオブジェクトの相互変換を行う。
- クライアントからのバイトデータをリクエストオブジェクトに変換(deserialize)
- 演算結果のレスポンスオブジェクトをバイトデータに変換(serialize)

CalcSerializer.kt
@Component
class CalcSerializer : Serializer<CalcResponse>, Deserializer<CalcRequest> {
  override fun deserialize(inputStream: InputStream): CalcRequest {
    return try {
      val buffer = ByteArray(BUFFER_SIZE)
      if (inputStream.read(buffer) != BUFFER_SIZE) {
        error("Invalid input size")
      }
      val num1 = convertToInt(buffer[FIRST_NUMBER_INDEX])
      val num2 = convertToInt(buffer[SECOND_NUMBER_INDEX])
      val operation = when (convertToInt(buffer[OPERATION_INDEX]).toChar()) {
        ADDITION_SYMBOL -> Operation.ADD
        SUBTRACTION_SYMBOL -> Operation.SUBTRACT
        else -> error("Invalid operation")
      }

      CalcRequest(num1, num2, operation)
    } catch (e: Exception) {
      CalcRequest(0, 0, Operation.OTHER)
    }
  }

  override fun serialize(response: CalcResponse, outputStream: OutputStream) {
    val responseBytes = when (response.status) {
      Status.SUCCESS -> buildResponseBytes(SUCCESS_STATUS, response.result)
      Status.ERROR -> buildResponseBytes(ERROR_STATUS, ERROR_RESULT)
    }
    outputStream.write(responseBytes)
  }

  private fun buildResponseBytes(status: Char, calcResult: Int): ByteArray =
    byteArrayOf(status.code.toByte(), calcResult.toByte())

  private fun convertToInt(byte: Byte): Int =
    String.format("%02X", byte).toInt(16)

  companion object {
    private const val BUFFER_SIZE = 3
    private const val FIRST_NUMBER_INDEX = 1
    private const val SECOND_NUMBER_INDEX = 2
    private const val OPERATION_INDEX = 0
    private const val SUCCESS_STATUS = 'A'
    private const val ERROR_STATUS = 'N'
    private const val ERROR_RESULT = 0
    private const val ADDITION_SYMBOL = '+'
    private const val SUBTRACTION_SYMBOL = '-'
  }
}

Handler

CalcHandler.kt
interface CalcHandler {
  fun handle(message: Message<CalcRequest>): Message<CalcResponse>
}

@Handler
class CalcHandler(private val calcUseCase: CalcUseCase) : CalcHandler {
    override fun handle(message: Message<CalcRequest>): Message<CalcResponse> {
        val response = when (message.payload.operation) {
            Operation.OTHER -> CalcResponse(0, Status.ERROR)
            else -> calcUseCase.execute(message.payload)
        }

        return MessageBuilder.withPayload(response)
            .copyHeaders(message.headers)
            .build()
    }
}

UseCase

CalcUseCase.kt
interface CalcUseCase {
  fun execute(request: CalcRequest): CalcResponse
}

@UseCase
class CalcUseCaseImpl: CalcUseCase {
  override fun execute(request: CalcRequest): CalcResponse {
    val result = when (request.operation) {
      Operation.ADD -> request.num1 + request.num2
      Operation.SUBTRACT -> request.num1 - request.num2
      Operation.OTHER -> error("Invalid operation")
    }
    return CalcResponse(result, Status.SUCCESS)
  }
}

まとめ

Spring Integrationを使って、TCP Serverを実装を紹介しました。公式ドキュメントを眺めていると、TCP 接続イベントインターセプターなどの機能も記載されていたので、今後これらも試してみたいと思います。

Discussion