Closed11

cats.effect の Resource を理解して使いこなしたい

yukiyuki

Resourceという型は、リソースの取得と解放を副作用なく効率的に行うことができる。

https://typelevel.org/cats-effect/datatypes/resource.html

Resource という型をこんな感じに作った後、

val resource: Resource[IO, File] = Resource.make(IO(new File(s"$pathname")))

use することで、そのリソースを使用できる。

resource.use(file => ...) // File 型が取れている
yukiyuki

ドキュメントに従うならこうなるかな?

  • make でリソースを組み立て。
  • うしろのブロックでリソースをリリース。
def inputStream(f: File): Resource[IO, FileInputStream] =
  Resource.make {
    IO(new FileInputStream(f))                         // build
  } { inStream =>
    IO(inStream.close()).handleErrorWith(_ => IO.unit) // release
  }

Resource#make の実装は下記のようになっている。中身としては単に map しているだけではある。map の中身が Tuple になっている。これにより、取得済みのリソースと release 関数とを同時に Tuple として返している。

  def make[F[_], A](acquire: F[A])(release: A => F[Unit])(implicit F: Functor[F]): Resource[F, A] =
    apply[F, A](acquire.map(a => a -> release(a)))

apply の中身が気になってくるので見てみる。こう。

  def apply[F[_], A](resource: F[(A, F[Unit])])(implicit F: Functor[F]): Resource[F, A] =
    Allocate[F, A] {
      resource.map { case (a, release) =>
        (a, (_: ExitCase[Throwable]) => release)
      }
    }

Allocate というのは代数データになっている。リソースの割り振りとファイナライザを同時にもっているデータ構造。

  /**
   * `Resource` data constructor that wraps an effect allocating a resource,
   * along with its finalizers.
   */
  final case class Allocate[F[_], A](
    resource: F[(A, ExitCase[Throwable] => F[Unit])])
    extends Resource[F, A]
yukiyuki

また、Resource はモナドになっている。pureflatMap、ならびに tailRecM が実装済になっている。

flatMap 時には Bind と呼ばれる代数データに格納されるのは他のモナドと共通だと思う。

  /**
   * `Resource` data constructor that encodes the `flatMap` operation.
   */
  final case class Bind[F[_], S, A](
    source: Resource[F, S],
    fs: S => Resource[F, A])
    extends Resource[F, A]

また、Suspend というデータもあるので、キャンセル処理とかもできるのかもしれない。

  /**
   * `Resource` data constructor that suspends the evaluation of another
   * resource value.
   */
  final case class Suspend[F[_], A](
    resource: F[Resource[F, A]])
    extends Resource[F, A]
yukiyuki

ちょっとドキュメントのコードを写経していろいろ覚えようかなと思う。
とくに後半がちょっと込み入っていて読むだけだと把握が難しい。

yukiyuki

copy 関数で transfer を呼び出す。transfer 内で実際の処理が始まる。copy 関数は Resource#use の解決が責務になっている。

import java.io.{File, FileInputStream, FileOutputStream}

import cats.effect.{IO, Resource}

object Copy {

  def inputStream(f: File): Resource[IO, FileInputStream] =
    Resource.make {
      IO(new FileInputStream(f))
    } { inputStream =>
      IO(inputStream.close()).handleErrorWith(_ => IO.unit)
    }

  def outputStream(f: File): Resource[IO, FileOutputStream] =
    Resource.make {
      IO(new FileOutputStream(f))
    } { outputStream =>
      IO(outputStream.close()).handleErrorWith(_ => IO.unit)
    }

  def inputOutputStream(
      in: File,
      out: File): Resource[IO, (FileInputStream, FileOutputStream)] =
    for {
      inStream <- inputStream(in)
      outStream <- outputStream(out)
    } yield (inStream, outStream)

  def transfer(origin: FileInputStream,
               destination: FileOutputStream): IO[Long] = ???

  def copy(origin: File, destination: File): IO[Long] =
    inputOutputStream(origin, destination).use {
      case (in, out) => transfer(in, out)
    }
}
yukiyuki

copy の中身を実装したものは下記のようになる。transfer 関数と transmit 関数が追加される。

transmit 関数の中身は、

  1. ファイル内容を読み込む。それを buffer に書き込む。
  2. 読み込みバイト数(amount)を使用して、-1 でなければ(read 関数はデータが読み込まれていないと -1 が返る)buffer からファイルに書き込みをかける。
  3. もう一度 transmit 関数を回す。再帰。
  4. もし amount-1 になれば、そこで acc を返す。これは後続で count として使用される。
import java.io.{File, FileInputStream, FileOutputStream}

import cats.syntax.all._
import cats.effect.{IO, Resource}

object Copy {

  def inputStream(f: File): Resource[IO, FileInputStream] =
    Resource.make {
      IO(new FileInputStream(f))
    } { inputStream =>
      IO(inputStream.close()).handleErrorWith(_ => IO.unit)
    }

  def outputStream(f: File): Resource[IO, FileOutputStream] =
    Resource.make {
      IO(new FileOutputStream(f))
    } { outputStream =>
      IO(outputStream.close()).handleErrorWith(_ => IO.unit)
    }

  def inputOutputStream(
      in: File,
      out: File): Resource[IO, (FileInputStream, FileOutputStream)] =
    for {
      inStream <- inputStream(in)
      outStream <- outputStream(out)
    } yield (inStream, outStream)

  def transfer(origin: FileInputStream,
               destination: FileOutputStream): IO[Long] =
    for {
      buffer <- IO(new Array[Byte](1024 * 10))
      total <- transmit(origin, destination, buffer, 0L)
    } yield total

  def transmit(origin: FileInputStream,
               destination: FileOutputStream,
               buffer: Array[Byte],
               acc: Long): IO[Long] =
    for {
      amount <- IO(origin.read(buffer, 0, buffer.length))
      count <- if (amount > -1)
        IO(destination.write(buffer, 0, amount)) >> transmit(origin,
                                                             destination,
                                                             buffer,
                                                             acc + amount)
      else IO.pure(acc)
    } yield count

  def copy(origin: File, destination: File): IO[Long] =
    inputOutputStream(origin, destination).use {
      case (in, out) => transfer(in, out)
    }
}
yukiyuki

前のスクラップとのコードの差分拾えたりするといいかも?

yukiyuki

cats-effect においては、IO インスタンスは実行がキャンセルされることがある。これに対応する必要がある。つまり Resource もキャンセルされる可能性がある。

並行処理中にキャンセルが起きると、たとえばストリームを読込中にストリームが閉じられてしまうなどの事象が起きるみたい?

それを防ぐために transfer が実行中にストリームが閉じられないような並行処理での制御機構を入れる必要がある。今回はセマフォを使用する。

下記の実装にセマフォが追加される。

import java.io.{File, FileInputStream, FileOutputStream}

import cats.effect.concurrent.Semaphore
import cats.syntax.all._
import cats.effect.{Concurrent, IO, Resource}

object Copy {

  def inputStream(f: File,
                  guard: Semaphore[IO]): Resource[IO, FileInputStream] =
    Resource.make {
      IO(new FileInputStream(f))
    } { inputStream =>
      guard.withPermit(IO(inputStream.close()).handleErrorWith(_ => IO.unit))
    }

  def outputStream(f: File,
                   guard: Semaphore[IO]): Resource[IO, FileOutputStream] =
    Resource.make {
      IO(new FileOutputStream(f))
    } { outputStream =>
      guard.withPermit(IO(outputStream.close()).handleErrorWith(_ => IO.unit))
    }

  def inputOutputStream(
      in: File,
      out: File,
      guard: Semaphore[IO]): Resource[IO, (FileInputStream, FileOutputStream)] =
    for {
      inStream <- inputStream(in, guard)
      outStream <- outputStream(out, guard)
    } yield (inStream, outStream)

  def transfer(origin: FileInputStream,
               destination: FileOutputStream): IO[Long] =
    for {
      buffer <- IO(new Array[Byte](1024 * 10))
      total <- transmit(origin, destination, buffer, 0L)
    } yield total

  def transmit(origin: FileInputStream,
               destination: FileOutputStream,
               buffer: Array[Byte],
               acc: Long): IO[Long] =
    for {
      amount <- IO(origin.read(buffer, 0, buffer.length))
      count <- if (amount > -1)
        IO(destination.write(buffer, 0, amount)) >> transmit(origin,
                                                             destination,
                                                             buffer,
                                                             acc + amount)
      else IO.pure(acc)
    } yield count

  def copy(origin: File, destination: File)(
      implicit concurrent: Concurrent[IO]): IO[Long] =
    for {
      guard <- Semaphore[IO](1)
      count <- inputOutputStream(origin, destination, guard).use {
        case (in, out) => guard.withPermit(transfer(in, out))
      }
    } yield count
}
yukiyuki

Main 関数の実行は IOApp を使用したシンプルなものになる。

Main.scala
import java.io.File

import cats.effect.{ExitCode, IO, IOApp}

object Main extends IOApp {
  override def run(args: List[String]): IO[ExitCode] =
    for {
      _ <- if (args.length < 2)
        IO.raiseError(
          new IllegalArgumentException("Need origin and destination files"))
      else IO.unit
      orig = new File(args(0))
      dest = new File(args(1))
      count <- Copy.copy(orig, dest)
      _ <- IO(
        println(s"$count bytes copied from ${orig.getPath} to ${dest.getPath}"))
    } yield ExitCode.Success
}

yukiyuki

あとはサンプルを見ると高階カインドで IO を抽象化できるといった趣旨のことが書いてあるくらいかな。とりあえずだいぶ理解できた気がする。クローズ!

このスクラップは2020/11/30にクローズされました