Closed5

Akka Stream をマスターする

yukiyuki

Source でデータの読み込みや標準入力をはじめとする副作用のある入力処理をさばき(今回は違うけど)、Sink にて最後、データの保存や標準出力をはじめとする副作用のある出力処理をさばくという感じなのかな。

import akka.actor.ActorSystem
import akka.stream.scaladsl._
import akka.util.ByteString

import java.nio.file.Paths

object Factorials extends App {
  implicit val system = ActorSystem("quick-start")

  val source = Source(1 to 100)
  val factorials = source.scan(BigInt(1))((acc, next) => acc * next)
  val result = factorials.map(num => ByteString(s"$num\n")).runWith(FileIO.toPath(Paths.get("factorials.txt")))

  implicit val ec = system.dispatcher

  result.onComplete(_ => system.terminate())
}
yukiyuki

Flow という概念を使うと、次のように共通化もできるらしい。

import akka.actor.ActorSystem
import akka.stream.IOResult
import akka.stream.scaladsl._
import akka.util.ByteString

import java.nio.file.Paths
import scala.concurrent.Future

object Factorials extends App {
  implicit val system = ActorSystem("quick-start")

  val source = Source(1 to 100)
  val factorials = source.scan(BigInt(1))((acc, next) => acc * next)

  def lineSink(filename: String): Sink[String, Future[IOResult]] = Flow[String].map(s => ByteString(s + "\n")).toMat(FileIO.toPath(Paths.get(filename)))(Keep.right)

  val result = factorials.map(num => ByteString(s"$num\n")).runWith(FileIO.toPath(Paths.get("factorials.txt")))
  val result2 = factorials.map(_.toString()).runWith(lineSink("factorial2.txt"))

  implicit val ec = system.dispatcher

  result.onComplete(_ => system.terminate())
}
  • toMat : FlowSink をつなげることができる。
  • Keep.right : Sink は内部では Traversal になっているので、その結合関数をどう扱うかを決定する。
yukiyuki
  • mapConcat は flatMap みたいなもの。Source に対して flatten する。ので、List[A]Source[A, ?] に変わる。つまり、リストをストリームに変える。
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.scaladsl._

final case class Author(handle: String)

final case class Hashtag(name: String)

final case class Tweet(author: Author, timestamp: Long, body: String) {
  def hashtags: Set[Hashtag] =
    body
      .split(" ")
      .collect {
        case t if t.startsWith("#") => Hashtag(t.replaceAll("[^#\\w]", ""))
      }
      .toSet
}

object Twitter extends App {
  implicit val system: ActorSystem = ActorSystem("reactive-tweets")

  val akkaTag = Hashtag("#akka")

  val tweets: Source[Tweet, NotUsed] = Source(
    Tweet(Author("rolandkuhn"), System.currentTimeMillis, "#akka rocks!") ::
      Tweet(Author("patriknw"), System.currentTimeMillis, "#akka !") ::
      Tweet(Author("bantonsson"), System.currentTimeMillis, "#akka !") ::
      Tweet(Author("drewhk"), System.currentTimeMillis, "#akka !") ::
      Tweet(Author("ktosopl"), System.currentTimeMillis, "#akka on the rocks!") ::
      Tweet(Author("mmartynas"), System.currentTimeMillis, "wow #akka !") ::
      Tweet(Author("akkateam"), System.currentTimeMillis, "#akka rocks!") ::
      Tweet(Author("bananaman"), System.currentTimeMillis, "#bananas rock!") ::
      Tweet(Author("appleman"), System.currentTimeMillis, "#apples rock!") ::
      Tweet(Author("drama"), System.currentTimeMillis, "we compared #apples to #oranges!") ::
      Nil)

  val result = tweets
    .filterNot(_.hashtags.contains(akkaTag))
    .map(_.hashtags)
    .reduce(_ ++ _)
    .mapConcat(identity)
    .map(_.name.toUpperCase)
    .runWith(Sink.foreach(println))

  implicit val ec = system.dispatcher

  result.onComplete(_ => system.terminate())
}
yukiyuki

下記はちゃんと55が出る。

import akka.NotUsed
import akka.stream.scaladsl._
import akka.actor.ActorSystem

object Main extends App {
  implicit val system = ActorSystem("quick-start")

  val source = Source(1 to 10)

  implicit val ec = system.dispatcher
  val sinkflow: Sink[Int, NotUsed] = Flow[Int].fold(0)(_ + _).to(Sink.foreach(i => println(i)))
  source.runWith(sinkflow)

  system.terminate()
}
このスクラップは2021/11/04にクローズされました