Open30

Haskellのstreamlyを使ってみる

るじゃんどるるじゃんどる

5億年ぐらいconduitとpipesしかつかったことがないので,意味不明なくらいIOに振り切ってる streamlyを使ってみる

るじゃんどるるじゃんどる
import Data.Function ((&))

import qualified Streamly.Data.Fold as Fold
import qualified Streamly.Data.Stream as Stream
import qualified Streamly.FileSystem.File as File

wcb :: String -> IO Int
wcb file =
    File.read file           -- Stream IO Word8
  & Stream.fold Fold.length  -- IO Int

Streamly.Data.Stream.fold でガッと消費するっぽい

るじゃんどるるじゃんどる

fold :: Monad m => Fold m a b -> Stream m a -> m b なので,消費者は Fold m a b の型を持つっぽい
直接 Stream m a からfoldできる関数群も foldrとかとして用意されてる

るじゃんどるるじゃんどる

となると 生産者として Unfold もあるやろと思ったら Unfold m a b があるっぽい,Fold と型変数の並びは一緒だな
Foldと同様に直接 Stream m a にできる unfoldr とかの関数もある

るじゃんどるるじゃんどる

foldManyunfoldManyという関数もある
foldした際に余ったstreamに再度foldを適用する関数っぽい

るじゃんどるるじゃんどる

生産者は基本 UnFold m a bで, 中身は forall s. Unfold (s -> m (Step s b)) (a -> m s) になってる
s -> m (Step s b) のstep関数と,a -> m s のinitialize関数の組
s は存在型になってて拡張性が確保されている

るじゃんどるるじゃんどる

消費者は Fold m a b で,中身は forall s. Fold (s -> a -> m (Step s b)) (m (Step s b)) (s -> m b) (s -> m b)
step, initial, extract, finalの4つ組らしい
initialとfinalはリソースの初期化と解放処理をしたりするところ
extractはなんだ?

るじゃんどるるじゃんどる

If the fold is used as a scan, the extract function is used by the scan driver to map the current state s of the fold to the fold result. Thus extract can be called multiple times. In some folds, where scanning does not make sense, this function is left unimplemented; such folds cannot be used as scans.

と書いてあるがピンとこないな
postscanの実装に使われてるっぽい

るじゃんどるるじゃんどる

postscan の実装見てる感じ,StepがPartialな状態からextract適用して次のFoldの初期値にしてつなげるみたいなことができるっぽい

るじゃんどるるじゃんどる

変換は Stream m a で中身は forall s. UnStream (State StreamK m a -> s -> m (Step s a)) s
型的には普通にstep関数っぽいけど, StreamKはなんだ

るじゃんどるるじゃんどる

StreamKforall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r
state, yield, singleton, stopの4つ組
Stateとかめちゃくちゃ凝ったことしてるが,さすがに直接触ることはなさそうだし一旦詳細を追うのはやめとこう.

るじゃんどるるじゃんどる
filterM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a
filterM f (Stream step state) = Stream step' state
  where
    {-# INLINE_LATE step' #-}
    step' gst st = do
        r <- step gst st
        case r of
            Yield x s -> do
                b <- f x
                return $ if b
                         then Yield x s
                         else Skip s
            Skip s -> return $ Skip s
            Stop   -> return Stop

実装見ると使い方はこんな感じにやってるっぽい
State が絡んでるのは gst のところだけど,stepで取り出すところにしか関係してなさそう

るじゃんどるるじゃんどる

Streamly.Internal.Data.Stream.TransformではStateを変換する部分があるが,そのまま使うかStatestreamVaryieldLimitNothingにする adaptState だけつかってる
adaptStateはbase monadを交換するときだけ使えみたいなことがNoteに書いてある

るじゃんどるるじゃんどる

生産者にはUnfold以外にProducer m a bがある
中身はforall s. Producer (s -> m (Step s b)) (a -> m s) (s -> m a)でstep, inject, extractの3つ組
simplifyでinjectはUnfoldのinitになるが,extractは無視される

るじゃんどるるじゃんどる

FoldUnfoldStreamもMonadのinstanceではないが,StreamCrossStreamでラップしてやるとMonadになって,複数のStreamの値を使った実装とかもかけるっぽい
なおInternalにしかない模様

るじゃんどるるじゃんどる

Foldは単一ストリームに対してparallelに実行してzipする関数がteeとして用意されてる

るじゃんどるるじゃんどる

Folds support stream fusion for generating loops comparable to the speed of C. However, it has some limitations. For fusion to work, the folds must be inlined, folds must be statically known and not generated dynamically, folds should not be passed recursively.

Another limitation is due to the quadratic complexity causing slowdown when too many nested compositions are used. Especially, the performance of the Applicative instance and splitting operations (e.g. splitWith) degrades quadratically (O(n^2)) when combined n times, roughly 8 or less sequenced operations are fine. For these cases folds can be converted to parsers and then used as ParserK.

はい.それはそう

るじゃんどるるじゃんどる

File.readの実装を読むと自前のbracket関数を使ってリソース管理をしているから,これらで Streamのinitialとかfinalにリソースの初期化と解放を設定しているっぽい

るじゃんどるるじゃんどる

bracketIObracketIO3を呼んでて,bracketIO3は正常終了時,例外時,GC時の3つのときのfinalizerをとるっぽい.bracketIOは全部同じにしてる
GCでリソース周り困るときってどんなときなんだ......

るじゃんどるるじゃんどる
wc :: String -> IO (Bool, Counts)
wc file = do
      File.readChunks file             -- Stream IO (Array Word8)
    & Stream.parMapM cfg partialCounts -- Stream IO (Bool, Counts)
    & Stream.fold add                  -- IO (Bool, Counts)

    where

    cfg = Stream.maxThreads numCapabilities . Stream.ordered True
    add = Fold.foldl' addCounts (False, Counts 0 0 0 True)

parMapMを使うとChunkに分割したArrayそれぞれを並列に動かしてくれる驚きの機能がある

るじゃんどるるじゃんどる

並行でStream処理した後のやつをzipしたりmergeするのにはparZipWithparMergeByが使えるっぽい

るじゃんどるるじゃんどる

無限リストのような無限ストリームを扱いたい場合は,Fold.latestで遅延できるっぽい

るじゃんどるるじゃんどる

途中で残余を気にせず終了するときは Foldのstep関数を無理やりDoneにすればいいっぽい

るじゃんどるるじゃんどる

parsec系の変わりにStreamly.Unicode.Parser使ったり,ByteStringにしたければArray Word8にチャンクごとに変換して使うとかして全部のっかっていくのが正しそう