cats.effect の Resource を理解して使いこなしたい
これを理解して利用したい。ファイルを作成してそのファイルに書き込むだけなんだけど。
Resource
という型は、リソースの取得と解放を副作用なく効率的に行うことができる。
Resource
という型をこんな感じに作った後、
val resource: Resource[IO, File] = Resource.make(IO(new File(s"$pathname")))
use
することで、そのリソースを使用できる。
resource.use(file => ...) // File 型が取れている
ドキュメントに従うならこうなるかな?
-
make
でリソースを組み立て。 - うしろのブロックでリソースをリリース。
def inputStream(f: File): Resource[IO, FileInputStream] =
Resource.make {
IO(new FileInputStream(f)) // build
} { inStream =>
IO(inStream.close()).handleErrorWith(_ => IO.unit) // release
}
Resource#make
の実装は下記のようになっている。中身としては単に map しているだけではある。map の中身が Tuple になっている。これにより、取得済みのリソースと release
関数とを同時に Tuple として返している。
def make[F[_], A](acquire: F[A])(release: A => F[Unit])(implicit F: Functor[F]): Resource[F, A] =
apply[F, A](acquire.map(a => a -> release(a)))
apply
の中身が気になってくるので見てみる。こう。
def apply[F[_], A](resource: F[(A, F[Unit])])(implicit F: Functor[F]): Resource[F, A] =
Allocate[F, A] {
resource.map { case (a, release) =>
(a, (_: ExitCase[Throwable]) => release)
}
}
Allocate
というのは代数データになっている。リソースの割り振りとファイナライザを同時にもっているデータ構造。
/**
* `Resource` data constructor that wraps an effect allocating a resource,
* along with its finalizers.
*/
final case class Allocate[F[_], A](
resource: F[(A, ExitCase[Throwable] => F[Unit])])
extends Resource[F, A]
また、Resource
はモナドになっている。pure
と flatMap
、ならびに tailRecM
が実装済になっている。
flatMap
時には Bind
と呼ばれる代数データに格納されるのは他のモナドと共通だと思う。
/**
* `Resource` data constructor that encodes the `flatMap` operation.
*/
final case class Bind[F[_], S, A](
source: Resource[F, S],
fs: S => Resource[F, A])
extends Resource[F, A]
また、Suspend
というデータもあるので、キャンセル処理とかもできるのかもしれない。
/**
* `Resource` data constructor that suspends the evaluation of another
* resource value.
*/
final case class Suspend[F[_], A](
resource: F[Resource[F, A]])
extends Resource[F, A]
ちょっとドキュメントのコードを写経していろいろ覚えようかなと思う。
とくに後半がちょっと込み入っていて読むだけだと把握が難しい。
copy
関数で transfer
を呼び出す。transfer
内で実際の処理が始まる。copy
関数は Resource#use
の解決が責務になっている。
import java.io.{File, FileInputStream, FileOutputStream}
import cats.effect.{IO, Resource}
object Copy {
def inputStream(f: File): Resource[IO, FileInputStream] =
Resource.make {
IO(new FileInputStream(f))
} { inputStream =>
IO(inputStream.close()).handleErrorWith(_ => IO.unit)
}
def outputStream(f: File): Resource[IO, FileOutputStream] =
Resource.make {
IO(new FileOutputStream(f))
} { outputStream =>
IO(outputStream.close()).handleErrorWith(_ => IO.unit)
}
def inputOutputStream(
in: File,
out: File): Resource[IO, (FileInputStream, FileOutputStream)] =
for {
inStream <- inputStream(in)
outStream <- outputStream(out)
} yield (inStream, outStream)
def transfer(origin: FileInputStream,
destination: FileOutputStream): IO[Long] = ???
def copy(origin: File, destination: File): IO[Long] =
inputOutputStream(origin, destination).use {
case (in, out) => transfer(in, out)
}
}
copy の中身を実装したものは下記のようになる。transfer
関数と transmit
関数が追加される。
transmit
関数の中身は、
- ファイル内容を読み込む。それを
buffer
に書き込む。 - 読み込みバイト数(amount)を使用して、
-1
でなければ(read
関数はデータが読み込まれていないと-1
が返る)buffer
からファイルに書き込みをかける。 - もう一度
transmit
関数を回す。再帰。 - もし
amount
が-1
になれば、そこでacc
を返す。これは後続で count として使用される。
import java.io.{File, FileInputStream, FileOutputStream}
import cats.syntax.all._
import cats.effect.{IO, Resource}
object Copy {
def inputStream(f: File): Resource[IO, FileInputStream] =
Resource.make {
IO(new FileInputStream(f))
} { inputStream =>
IO(inputStream.close()).handleErrorWith(_ => IO.unit)
}
def outputStream(f: File): Resource[IO, FileOutputStream] =
Resource.make {
IO(new FileOutputStream(f))
} { outputStream =>
IO(outputStream.close()).handleErrorWith(_ => IO.unit)
}
def inputOutputStream(
in: File,
out: File): Resource[IO, (FileInputStream, FileOutputStream)] =
for {
inStream <- inputStream(in)
outStream <- outputStream(out)
} yield (inStream, outStream)
def transfer(origin: FileInputStream,
destination: FileOutputStream): IO[Long] =
for {
buffer <- IO(new Array[Byte](1024 * 10))
total <- transmit(origin, destination, buffer, 0L)
} yield total
def transmit(origin: FileInputStream,
destination: FileOutputStream,
buffer: Array[Byte],
acc: Long): IO[Long] =
for {
amount <- IO(origin.read(buffer, 0, buffer.length))
count <- if (amount > -1)
IO(destination.write(buffer, 0, amount)) >> transmit(origin,
destination,
buffer,
acc + amount)
else IO.pure(acc)
} yield count
def copy(origin: File, destination: File): IO[Long] =
inputOutputStream(origin, destination).use {
case (in, out) => transfer(in, out)
}
}
前のスクラップとのコードの差分拾えたりするといいかも?
cats-effect においては、IO
インスタンスは実行がキャンセルされることがある。これに対応する必要がある。つまり Resource
もキャンセルされる可能性がある。
並行処理中にキャンセルが起きると、たとえばストリームを読込中にストリームが閉じられてしまうなどの事象が起きるみたい?
それを防ぐために transfer
が実行中にストリームが閉じられないような並行処理での制御機構を入れる必要がある。今回はセマフォを使用する。
下記の実装にセマフォが追加される。
import java.io.{File, FileInputStream, FileOutputStream}
import cats.effect.concurrent.Semaphore
import cats.syntax.all._
import cats.effect.{Concurrent, IO, Resource}
object Copy {
def inputStream(f: File,
guard: Semaphore[IO]): Resource[IO, FileInputStream] =
Resource.make {
IO(new FileInputStream(f))
} { inputStream =>
guard.withPermit(IO(inputStream.close()).handleErrorWith(_ => IO.unit))
}
def outputStream(f: File,
guard: Semaphore[IO]): Resource[IO, FileOutputStream] =
Resource.make {
IO(new FileOutputStream(f))
} { outputStream =>
guard.withPermit(IO(outputStream.close()).handleErrorWith(_ => IO.unit))
}
def inputOutputStream(
in: File,
out: File,
guard: Semaphore[IO]): Resource[IO, (FileInputStream, FileOutputStream)] =
for {
inStream <- inputStream(in, guard)
outStream <- outputStream(out, guard)
} yield (inStream, outStream)
def transfer(origin: FileInputStream,
destination: FileOutputStream): IO[Long] =
for {
buffer <- IO(new Array[Byte](1024 * 10))
total <- transmit(origin, destination, buffer, 0L)
} yield total
def transmit(origin: FileInputStream,
destination: FileOutputStream,
buffer: Array[Byte],
acc: Long): IO[Long] =
for {
amount <- IO(origin.read(buffer, 0, buffer.length))
count <- if (amount > -1)
IO(destination.write(buffer, 0, amount)) >> transmit(origin,
destination,
buffer,
acc + amount)
else IO.pure(acc)
} yield count
def copy(origin: File, destination: File)(
implicit concurrent: Concurrent[IO]): IO[Long] =
for {
guard <- Semaphore[IO](1)
count <- inputOutputStream(origin, destination, guard).use {
case (in, out) => guard.withPermit(transfer(in, out))
}
} yield count
}
Main 関数の実行は IOApp
を使用したシンプルなものになる。
import java.io.File
import cats.effect.{ExitCode, IO, IOApp}
object Main extends IOApp {
override def run(args: List[String]): IO[ExitCode] =
for {
_ <- if (args.length < 2)
IO.raiseError(
new IllegalArgumentException("Need origin and destination files"))
else IO.unit
orig = new File(args(0))
dest = new File(args(1))
count <- Copy.copy(orig, dest)
_ <- IO(
println(s"$count bytes copied from ${orig.getPath} to ${dest.getPath}"))
} yield ExitCode.Success
}
あとはサンプルを見ると高階カインドで IO
を抽象化できるといった趣旨のことが書いてあるくらいかな。とりあえずだいぶ理解できた気がする。クローズ!