Closed5
Akka Stream をマスターする
充実のドキュメントを上から順にやる
Source
でデータの読み込みや標準入力をはじめとする副作用のある入力処理をさばき(今回は違うけど)、Sink
にて最後、データの保存や標準出力をはじめとする副作用のある出力処理をさばくという感じなのかな。
import akka.actor.ActorSystem
import akka.stream.scaladsl._
import akka.util.ByteString
import java.nio.file.Paths
object Factorials extends App {
implicit val system = ActorSystem("quick-start")
val source = Source(1 to 100)
val factorials = source.scan(BigInt(1))((acc, next) => acc * next)
val result = factorials.map(num => ByteString(s"$num\n")).runWith(FileIO.toPath(Paths.get("factorials.txt")))
implicit val ec = system.dispatcher
result.onComplete(_ => system.terminate())
}
Flow という概念を使うと、次のように共通化もできるらしい。
import akka.actor.ActorSystem
import akka.stream.IOResult
import akka.stream.scaladsl._
import akka.util.ByteString
import java.nio.file.Paths
import scala.concurrent.Future
object Factorials extends App {
implicit val system = ActorSystem("quick-start")
val source = Source(1 to 100)
val factorials = source.scan(BigInt(1))((acc, next) => acc * next)
def lineSink(filename: String): Sink[String, Future[IOResult]] = Flow[String].map(s => ByteString(s + "\n")).toMat(FileIO.toPath(Paths.get(filename)))(Keep.right)
val result = factorials.map(num => ByteString(s"$num\n")).runWith(FileIO.toPath(Paths.get("factorials.txt")))
val result2 = factorials.map(_.toString()).runWith(lineSink("factorial2.txt"))
implicit val ec = system.dispatcher
result.onComplete(_ => system.terminate())
}
-
toMat
:Flow
とSink
をつなげることができる。 -
Keep.right
:Sink
は内部では Traversal になっているので、その結合関数をどう扱うかを決定する。
-
mapConcat
は flatMap みたいなもの。Source に対して flatten する。ので、List[A]
がSource[A, ?]
に変わる。つまり、リストをストリームに変える。
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.scaladsl._
final case class Author(handle: String)
final case class Hashtag(name: String)
final case class Tweet(author: Author, timestamp: Long, body: String) {
def hashtags: Set[Hashtag] =
body
.split(" ")
.collect {
case t if t.startsWith("#") => Hashtag(t.replaceAll("[^#\\w]", ""))
}
.toSet
}
object Twitter extends App {
implicit val system: ActorSystem = ActorSystem("reactive-tweets")
val akkaTag = Hashtag("#akka")
val tweets: Source[Tweet, NotUsed] = Source(
Tweet(Author("rolandkuhn"), System.currentTimeMillis, "#akka rocks!") ::
Tweet(Author("patriknw"), System.currentTimeMillis, "#akka !") ::
Tweet(Author("bantonsson"), System.currentTimeMillis, "#akka !") ::
Tweet(Author("drewhk"), System.currentTimeMillis, "#akka !") ::
Tweet(Author("ktosopl"), System.currentTimeMillis, "#akka on the rocks!") ::
Tweet(Author("mmartynas"), System.currentTimeMillis, "wow #akka !") ::
Tweet(Author("akkateam"), System.currentTimeMillis, "#akka rocks!") ::
Tweet(Author("bananaman"), System.currentTimeMillis, "#bananas rock!") ::
Tweet(Author("appleman"), System.currentTimeMillis, "#apples rock!") ::
Tweet(Author("drama"), System.currentTimeMillis, "we compared #apples to #oranges!") ::
Nil)
val result = tweets
.filterNot(_.hashtags.contains(akkaTag))
.map(_.hashtags)
.reduce(_ ++ _)
.mapConcat(identity)
.map(_.name.toUpperCase)
.runWith(Sink.foreach(println))
implicit val ec = system.dispatcher
result.onComplete(_ => system.terminate())
}
下記はちゃんと55が出る。
import akka.NotUsed
import akka.stream.scaladsl._
import akka.actor.ActorSystem
object Main extends App {
implicit val system = ActorSystem("quick-start")
val source = Source(1 to 10)
implicit val ec = system.dispatcher
val sinkflow: Sink[Int, NotUsed] = Flow[Int].fold(0)(_ + _).to(Sink.foreach(i => println(i)))
source.runWith(sinkflow)
system.terminate()
}
このスクラップは2021/11/04にクローズされました