🥰

コワくない関数型: java.nio と Scala の fs2 で HTTP サーバーを書き比べる

2021/07/05に公開

関数型という言葉にはいろいろな意味がありますが、ここではデザインパターンとしての関数型の特徴を見ていきましょう.
テーマはよくあるノンブロッキングな HTTP Echo サーバー. 送信した内容をそっくりそのまま返すだけの単純な実装です.
まず java.nio を使った実装を書き、次に Scala の functional stream ライブラリである fs2 を使った実装を書きます. fs2 は Typelevel 社のエコシステムに組み込まれているので cats や cats-effect と相性がいいです. Typelevel 社のライブラリは(大半は英語ですが)ドキュメントが充実しているので一度目を通してみましょう.

https://fs2.io/#/

ちなみに fs2 は Functional Streams for Scala の略.

java.nio を使ったノンブロッキングHTTPサーバー

まずは java.nio バージョン.

server.scala
trait Server {
  def run(port:Int):Unit
}

java.nio でシングルスレッドでノンブロッキングな IO をするには SelectorSelectableChannelを実装した SocketChannel を使います. 内部的には kqueueepoll を使っている模様.

まず、指定したポートとソケットをバインドします.

serverOnNIO.scala
class ServerOnNIO(
  args: Seq[String],
  socketChannel: ServerSocketChannel,
  private val selector: Selector = Selector.open()
) extends Server {
  def run(port: Int = 10007): Unit = {
    socketChannel.configureBlocking(false)
    Try {
      socketChannel.socket.bind(InetSocketAddress(port))
    }.recover { case c: BindException =>
      println(s"failed to bind address: ${c.getMessage}")
      socketChannel.close;
      ()
    }
}

ノンブロッキングにしたいので channel.configureBlocking(false)を指定して selector に登録します.

selector から key を取り出して、その状態に応じて accept,read,write を行います.

ByteBuffer を操作して read, write を行いますが Bufferlimit,positonなどの状態を持っているので状態を適当に操作する必要があります. そして処理が終わった key は selector から削除します.

なお Iterator から要素を削除する場合は、 foreach ではなく iterator.hasNext,iterator.nextiterator.remove を使うことが推奨されています.

class ServerOnNIO(
    args: Seq[String],
    socketChannel: ServerSocketChannel,
    private val selector: Selector = Selector.open(),
    private val cs: Charset = Charset.forName("UTF-8")
) extends Server {
  def run(port: Int = 10007): Unit = {
    socketChannel.configureBlocking(false)
    Try {
      socketChannel.socket.bind(InetSocketAddress(port))
    }.recover { case c: BindException =>
      println(s"failed to bind address: ${c.getMessage}")
      socketChannel.close;
      ()
    }
    socketChannel.register(selector, SelectionKey.OP_ACCEPT)
    while (true) {
      if (selector.select > 0) {
        val keysIter = selector.selectedKeys.iterator
        while (keysIter.hasNext) {
          val key = keysIter.next()
          keysIter.remove()
          key.channel match {
            case acceptChannel: ServerSocketChannel if key.isAcceptable =>
              acceptChannel
                .accept()
                .configureBlocking(false)
                .register(selector, SelectionKey.OP_READ);
              ()
            case readChannel: SocketChannel if key.isReadable =>
              val buf = ByteBuffer.allocate(100);
              readChannel.read(buf) match {
                case n if n < 0 => ()
                case _ => {
                  buf.flip()
                  val s = cs.decode(buf)
                  buf.flip()
                  readChannel.configureBlocking(false)
                  readChannel.register(
                    selector,
                    SelectionKey.OP_WRITE,
                    s.toString
                  )
                }
              }
            case writeChannel: SocketChannel if key.isWritable =>
              writeChannel.write(
                ByteBuffer.wrap(key.attachment.asInstanceOf[String].getBytes)
              );
              writeChannel.configureBlocking(false)
              writeChannel.register(selector, SelectionKey.OP_READ)
              ()
            case _ => ()
          }

        }
      }
    }
  }
}

`sbt console で サーバーを走らせて別の Repl からアクセスしてみましょう.

import java.nio.channels.SocketChannel
import java.net.InetSocketAddress
import java.nio.charset.Charset

val charset = Charset.forName("UTF-8")
val buf = ByteBuffer.allocate(100)
val c = SocketChannel.open(new InetSocketAddress("localhost",10007))
c.write(charset.encode("hello, world!\n"))
c.read(buf)
charset.decode(buf)

きわめて命令的な書き方なのでコンピュータになにをしているのかはわかりやすいですが、その分やりたいこと(ここでは echo)が埋もれてしまっています.

お気づきの方もいるかもしれませんが、ソケットのリソースの解放をしていないので再度サーバーを起動したときにひょっとすると BIndException が起こるかもしれませんね(´・ω・`)

リソースの管理に加えて加えて Selctor, Buffer,Socket などあれこれ状態を考えないといけないのでつらいですね.

さて、それでは fs2 を使った実装を見てみましょう.

fs2 を使った ノンブロッキングHTTPサーバー

次に fs2 を使って 関数型ストリームの考え方で HTTP サーバーを書いてみましょう. 関数型ストリームでは(ほとんど)すべてが Stream として扱われます.
サーバーはフロントエンドと比べてそれほど細かい状態を持たないのでリクエストを受け取って(いくつかの副作用(effect)を起こしつつ)レスポンスを返すある種の関数,または Stream と考えやすいです.
イメージはbyte => T => do something pure/effectful => T => byte. ただし読み込み・書き込みは IO の副作用です.

パッケージを追加しましょう.

build.sbt
val fs2Version = "3.0.6"
val catseffectVersion = "3.1.1"
libraryDependencies ++= Seq(
    "co.fs2" %% "fs2-core" % fs2Version,
    "co.fs2" %% "fs2-io" % fs2Version,
    "org.typelevel" %% "cats-effect" % catseffectVersion,
    "com.comcast" %% "ip4s-core" % "3.0.3"
  )

まず必要なパッケージを読み込みます.

import fs2.io.net.Network
import cats.effect.Concurrent
import fs2.text
import com.comcast.ip4s.port
import fs2.Stream

Network は TCP や UDP などのネットワークレイヤーに関わる副作用を、Concurrent は非同期処理に関わる副作用を提供します.

fs2.text は byte や文字列の加工をするためのユーティリティを提供します.

com.comcast.ip4s は Port や Host などを表現したオブジェクトを使えるようにします.(これは must ではないですが書きやすさのため)

fs2.Stream は scala の Stream より リアクティブ・関数型な機能が充実した Stream です.

では早速サーバーを書いてみましょう.
byte => T => do something pure/effectful => T => byte を思い出しましょう.

単純な Echo サーバーなので byte=>String,String=>byteがあればいいです. fs2.text が提供する関数を使いましょう.


object FS2Server {
  def createServer[F[_]: Concurrent: Network](
      bindTo: Option[Port] = Port.fromInt(10007)
  ): F[Unit] =
    Network[F]
      .server(port = bindTo)
      .map { client =>
        client.reads
          .through(text.utf8Decode)
          .through(text.lines)
          .interleave(Stream.constant("\n"))
          .through(text.utf8Encode)
          .through(client.writes)
          .handleErrorWith { case e =>
            println(s"something went wrong: $e"); Stream.empty
          }
      }
      .parJoin(100)
      .compile
      .drain
}

Network は fs2 3.x で追加された機能です. 指定されたポートとソケットをバインドすると、新しい接続があるたびに client : Socket[F]が流れてきます. reads メソッドで client から byte を fs2.Stream で読みだして関数を適用したのち書き込みをしています. parJoin は Stream[Stream] の内側のStream を最大 100 まで並列に処理してひとつの Stream にします.
最後に compile.drainStreamからUnitを取り出します. drain は流れてきた値を破棄します.今回は Unit を返すのでこれが適当ですが他にも compile.toVector,compile.toListなど、処理の結果を Stream から取り出したいときに使います.

非同期IOが java.nio の Selector や SelectableChannel を用いて実装されていることを知っておくのは大事ですが、アプリケーションを書く際に(パフォーマンスをカリカリにチューニングする場合やデバッグをする場合を除けば)それらを意識しないといけないとしたらストレスフルです.
一方で関数型ストリームを活用すると上のコードのようにより宣言的、直感的でdryな書き方ができるようになります. 嬉しいですね😊 また型をみればどのような副作用を使用しているのかわかるので安心ですね😊
副作用と本来の処理が明確に分かれているおかげか、入出力・リソースの開放・非同期処理など定番の副作用はさまざまなライブラリが提供してくれています.

リアクティブストリームのライブラリは Rx-*** といった名称で各言語の実装があるので言語が変わっても同様の実装を再現しやすいというメリットもあります.

ということでデザインパターンとしての関数型の良さを少しは伝えられたでしょうか. Scala はこういった関数型志向のイケてるライブラリ(開発が活発なライブラリは Scala 3 にも対応済み)がたくさんあるので皆さん是非 Scala をかきましょう😊

補足

fs2 は java.nio をラップしているのでコードが短くなるのは当然と言えば当然で、nio と fs2 のコード量を直接比較するのはちょっとズルかもしれないです🙄

cats-effect 3.x でだいぶインターフェースが変わって古い記事がちょっと役に立たなくなってるので関数型つよいマンがいたら記事を書いてほしいぽよ~

Discussion