Haskellのstreamlyを使ってみる
import Data.Function ((&))
import qualified Streamly.Data.Fold as Fold
import qualified Streamly.Data.Stream as Stream
import qualified Streamly.FileSystem.File as File
wcb :: String -> IO Int
wcb file =
File.read file -- Stream IO Word8
& Stream.fold Fold.length -- IO Int
Streamly.Data.Stream.fold
でガッと消費するっぽい
fold :: Monad m => Fold m a b -> Stream m a -> m b
なので,消費者は Fold m a b
の型を持つっぽい
直接 Stream m a
からfoldできる関数群も foldr
とかとして用意されてる
となると 生産者として Unfold
もあるやろと思ったら Unfold m a b
があるっぽい,Fold
と型変数の並びは一緒だな
Fold
と同様に直接 Stream m a
にできる unfoldr
とかの関数もある
foldMany
やunfoldMany
という関数もある
foldした際に余ったstreamに再度foldを適用する関数っぽい
生産者は基本 UnFold m a b
で, 中身は forall s. Unfold (s -> m (Step s b)) (a -> m s)
になってる
s -> m (Step s b)
のstep関数と,a -> m s
のinitialize関数の組
s
は存在型になってて拡張性が確保されている
消費者は Fold m a b
で,中身は forall s. Fold (s -> a -> m (Step s b)) (m (Step s b)) (s -> m b) (s -> m b)
step, initial, extract, finalの4つ組らしい
initialとfinalはリソースの初期化と解放処理をしたりするところ
extractはなんだ?
If the fold is used as a scan, the extract function is used by the scan driver to map the current state s of the fold to the fold result. Thus extract can be called multiple times. In some folds, where scanning does not make sense, this function is left unimplemented; such folds cannot be used as scans.
と書いてあるがピンとこないな
postscan
の実装に使われてるっぽい
postscan
の実装見てる感じ,StepがPartialな状態からextract適用して次のFoldの初期値にしてつなげるみたいなことができるっぽい
concatMap
にも使われてるし,Fold同士の合成時に呼ばれるっぽい
変換は Stream m a
で中身は forall s. UnStream (State StreamK m a -> s -> m (Step s a)) s
型的には普通にstep関数っぽいけど, StreamK
はなんだ
StreamK
は forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r
state, yield, singleton, stopの4つ組
State
とかめちゃくちゃ凝ったことしてるが,さすがに直接触ることはなさそうだし一旦詳細を追うのはやめとこう.
filterM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a
filterM f (Stream step state) = Stream step' state
where
{-# INLINE_LATE step' #-}
step' gst st = do
r <- step gst st
case r of
Yield x s -> do
b <- f x
return $ if b
then Yield x s
else Skip s
Skip s -> return $ Skip s
Stop -> return Stop
実装見ると使い方はこんな感じにやってるっぽい
State
が絡んでるのは gst
のところだけど,stepで取り出すところにしか関係してなさそう
Streamly.Internal.Data.Stream.Transform
ではStateを変換する部分があるが,そのまま使うかState
のstreamVar
とyieldLimit
をNothing
にする adaptState
だけつかってる
adaptState
はbase monadを交換するときだけ使えみたいなことがNoteに書いてある
生産者にはUnfold
以外にProducer m a b
がある
中身はforall s. Producer (s -> m (Step s b)) (a -> m s) (s -> m a)
でstep, inject, extractの3つ組
simplify
でinjectはUnfold
のinitになるが,extractは無視される
Producer
から直接Stream
に変換する関数はないっぽい?
Parser
という消費者もいる,parseもできるっぽい
Fold
もUnfold
もStream
もMonadのinstanceではないが,Stream
はCrossStream
でラップしてやるとMonadになって,複数のStreamの値を使った実装とかもかけるっぽい
なおInternalにしかない模様
Fold
は単一ストリームに対してparallelに実行してzipする関数がtee
として用意されてる
Folds support stream fusion for generating loops comparable to the speed of C. However, it has some limitations. For fusion to work, the folds must be inlined, folds must be statically known and not generated dynamically, folds should not be passed recursively.
Another limitation is due to the quadratic complexity causing slowdown when too many nested compositions are used. Especially, the performance of the Applicative instance and splitting operations (e.g. splitWith) degrades quadratically (O(n^2)) when combined n times, roughly 8 or less sequenced operations are fine. For these cases folds can be converted to parsers and then used as ParserK.
はい.それはそう
File.read
の実装を読むと自前のbracket関数を使ってリソース管理をしているから,これらで Stream
のinitialとかfinalにリソースの初期化と解放を設定しているっぽい
bracketIO
はbracketIO3
を呼んでて,bracketIO3
は正常終了時,例外時,GC時の3つのときのfinalizerをとるっぽい.bracketIO
は全部同じにしてる
GCでリソース周り困るときってどんなときなんだ......
wc :: String -> IO (Bool, Counts)
wc file = do
File.readChunks file -- Stream IO (Array Word8)
& Stream.parMapM cfg partialCounts -- Stream IO (Bool, Counts)
& Stream.fold add -- IO (Bool, Counts)
where
cfg = Stream.maxThreads numCapabilities . Stream.ordered True
add = Fold.foldl' addCounts (False, Counts 0 0 0 True)
parMapM
を使うとChunkに分割したArrayそれぞれを並列に動かしてくれる驚きの機能がある
並行でStream処理した後のやつをzipしたりmergeするのにはparZipWith
やparMergeBy
が使えるっぽい
無限リストのような無限ストリームを扱いたい場合は,Fold.latest
で遅延できるっぽい
途中で残余を気にせず終了するときは Foldのstep関数を無理やりDoneにすればいいっぽい
例
maybe :: Monad m => (a -> Maybe b) -> Fold m a (Maybe b)
maybe f = foldt' (const (Done . f)) (Partial Nothing) id
遅延の例はここ
parsec系の変わりにStreamly.Unicode.Parser
使ったり,ByteString
にしたければArray Word8
にチャンクごとに変換して使うとかして全部のっかっていくのが正しそう