[PureScript] 非同期処理を行うAffモジュールの関数を大体全部紹介するよ!
PureScriptのAff
モジュール(Effect.Aff
)には様々な関数が定義されていますが、メジャーではない関数はあまり紹介される機会がないかと思います。
そこで今回はAff
モジュールの関数をサンプルコードつきで片っ端から紹介していきます。
興味のある関数があったら目次から飛んでいただけたらと思います。
ちなみにサンプルコードはPureScriptのバージョン0.15.10
で書いたものになります。
Fiber
まず関数の前にFiber
です。
Aff
モジュールには様々な関数が定義されていますが、その一部はFiber
を返してきたり、Fiber
が引数になっていたりするので、先に説明しておく必要があるでしょう。
Fiberとは
ファイバーは並列処理の文脈で登場する一般的な概念であり、PureScript特有の概念ではありません。
Wikipediaを見ると次のように説明されています。
ファイバー(英: fiber)は、計算機科学の分野において、非常に軽量な実行スレッドを示す。
ファイバー同士はスレッドと同じくアドレス空間を共有するが、両者には区別が存在する。 ファイバーが協調マルチタスクを使用するのに対し、スレッドはプリエンプティブマルチタスクを用いる。スレッドでは、ビジーなスレッドに割り込み他のスレッドを復帰させるためにカーネルのスレッドスケジューラを用いることが多いが、ファイバーは他のスレッドを実行させるために自ら制御を譲る。
PureScriptで単に使用することを考えるならば、スレッドのようなものだと思っておけば一旦は十分かと思います。
実際joinFiber
というスレッドでよく見るような名前の関数が定義されており、知っていれば大体同じノリで使うことができます。
PureScriptにおいてファイバーはFiber
という型として定義さています。
参考までに定義を載せておきます。
newtype Fiber a = Fiber
{ run :: Effect Unit
, kill :: Fn.Fn2 Error (Either Error Unit -> Effect Unit) (Effect (Effect Unit))
, join :: (Either Error a -> Effect Unit) -> Effect (Effect Unit)
, onComplete :: OnComplete a -> Effect (Effect Unit)
, isSuspended :: Effect Boolean
}
※レコードに沢山関数が定義されていますが、これを直接使うわけではないです。これらは別の関数を実行したとき間接的に呼ばれます。
Affをつくる
makeAff
makeAff :: forall a. ((Either Error a -> Effect Unit) -> Effect Canceler) -> Aff a`
Effect
をもとにAff
を作ります。
(Either Error a -> Effect Unit)
の部分はコールバック関数で、Error
か任意の型a
の値を渡すことができます。
makeAff
が返す型Aff a
を見ると、このa
の値を非同期処理の結果として返すことができるということがわかるでしょう。
Effect Canceler
のCanceler
というのは、この処理がキャンセルされた場合に呼び出される処理を表す型です。
次の例はログ出力しつつ処理結果として文字列を返す例です。
Canceler
としてはAff
モジュールに定義されているnonCanceler
という関数を使って、何もしないCanceler
を返しています。
main :: Effect Unit
main = launchAff_ do
a <- makeAff \callback -> do
log "関数が呼ばれたよ"
callback (Right "Done!")
pure nonCanceler
log a
関数が呼ばれたよ
Done!
Canceler
の使用例を説明するには、実際に処理をキャンセルする必要があるので、killFiber
のところで説明します。
コールバック関数を呼ばないとどうなるの?
makeAff
したときコールバック関数を呼ばないとどうなるかというと、処理がそこで終わります。
次の例ではmakeAff
を二回呼び出していますが、二回目のmakeAff
ではコールバック関数を無視してCanceler
だけを返しています。
するとどうなるかというと二回目のmakeAff
以降の処理は呼ばれなくなります。すなわち処理終了
というログ出力はされずに処理が終わります。
main :: Effect Unit
main = launchAff_ do
log "処理開始"
_ <- makeAff \cb -> do
log "コールバック関数を呼ぶよ"
cb $ Right unit
pure nonCanceler
_ <- makeAff \_ -> do
log "コールバック関数を呼ばないよ"
pure nonCanceler
log "処理終了"
処理開始
コールバック関数を呼ぶよ
コールバック関数を呼ばないよ
liftEffect
Aff
を作る他の方法は、liftEffect
を使うことですが、こちらは馴染みのある方法でしょう。
main :: Effect Unit
main = launchAff_ do
a <- liftEffect do
log "関数が呼ばれたよ"
pure "Done!"
log a
関数が呼ばれたよ
Done!
Affを実行する
launchAff_
launchAff_ :: forall a. Aff a -> Effect Unit
これはおなじみの関数ですね。
Effect Unit
を返します。
非同期処理の結果が必要であれば、こいつではなくlaunchAff
の方を使いましょう。
main :: Effect Unit
main = launchAff_ do
log "関数が呼ばれたよ"
関数が呼ばれたよ
launchAff
launchAff :: forall a. Aff a -> Effect (Fiber a)
launch_
と似ていますが、こちらはEffect (Fiber a)
を返します。
処理結果が必要な場合はこちらを使います。
次の例では、launchAff
で2つFiber
を作り出し、それらを用いて処理結果をログ出力しています。
joinFiber
を使うことで処理の完了を待ち、結果を取得することができます。
main :: Effect Unit
main = do
fiber1 <- launchAff do
log "関数1が呼ばれたよ"
pure "Done1"
fiber2 <- launchAff do
log "関数2が呼ばれたよ"
pure "Done2"
launchAff_ do
results <- traverse joinFiber [fiber1, fiber2]
logShow results
関数1が呼ばれたよ
関数2が呼ばれたよ
["Done1","Done2"]
launchSuspendedAff
launchSuspendedAff :: forall a. Aff a -> Effect (Fiber a)
launchAff
やlaunchAff_
とは異なり、Aff
をsuspend(一時停止)し、ファイバーを返します。
どういうことかというと、launchAff
やlaunchAff_
の処理はこれらの関数を呼び出したらすぐ開始されますが、launchSuspendAff
の処理はjoinFiber
するまで開始されないのです。
以下の例はこれらの関数をすべて使った例です。
suspendされていることを確認するため、真っ先にlaunchSuspendAff
を呼び出しています。
処理結果をわかりやすくするため、上から書いた順に番号を振っています。
main :: Effect Unit
main = do
-- joinFiberするまでsuspendされる
suspendedFiber <- launchSuspendedAff do
log "1. launchSuspendedAff"
pure "1. launchSuspendedAff result"
-- 実行されて返り値を使える
fiber <- launchAff do
log "2. launchAff"
pure "2. launchAff result"
-- 実行されるが返り値はない
launchAff_ do
log "3. launchAff_"
-- 結果を出力
launchAff_ do
log =<< joinFiber fiber
-- 最後にsuspendされていたfiberを実行して結果を出力
launchAff_ do
log =<< joinFiber suspendedFiber
2. launchAff
3. launchAff_
2. launchAff result
1. launchSuspendedAff
1. launchSuspendedAff result
最初に書いて呼び出したlaunchSuspendedAff
が最後に呼び出されていることがわかるでしょう。
runAff_
runAff_ :: forall a. (Either Error a -> Effect Unit) -> Aff a -> Effect Unit
launchAff_
と似ていますが、こちらは完了時に実行するコールバック関数を受け取ります。
次の例では、Aff a
の部分でログ出力して文字列を返しています。
(Either Error a -> Effect Unit)
がコールバック関数にあたる部分ですが、Aff a
の結果のa
の値がEither Error a
のa
となって渡されてきます。
すなわちこの例の場合Either Error String
型の値が渡されます。
main :: Effect Unit
main = do
runAff_
(either (log <<< message) log) -- LeftでもRightでもログ出力する
do
log "関数が呼ばれたよ"
pure "処理結果だよ" -- こいつがコールバック関数の引数になる
関数が呼ばれたよ
処理結果だよ
runAff
runAff :: forall a. (Either Error a -> Effect Unit) -> Aff a -> Effect (Fiber Unit)
runAff_
は処理結果がEffect Unit
でしたが、こちらはEffect (Fiber Unit)
となっており、Fiber
を返します。
次の例はrunAff
で返されたFiber
をkillFiber
で止める例です。
途中でログ出力が終わっていることがわかるでしょう。
main :: Effect Unit
main = do
fiber <- runAff
(either (log <<< message) log)
do
log "関数 Start"
-- 時間が掛かる処理の代わり
delay (Milliseconds 500.0)
log "関数 End"
pure "処理結果"
launchAff_ $ killFiber (error "止めたよ") fiber
関数 Start
runSuspendedAff
runSuspendedAff
:: forall a
. (Either Error a -> Effect Unit)
-> Aff a
-> Effect (Fiber Unit)
launchSuspendAff
のコールバック関数を受け取る版です。
main :: Effect Unit
main = do
suspendedFiber <- runSuspendedAff
(either (log <<< message) log)
do
log "1. runSuspendedAff"
pure "1. runSuspendedAff result"
fiber <- runAff
(either (log <<< message) log)
do
log "2. runAff"
pure "2. runAff result"
runAff_
(either (log <<< message) log)
do
log "3. runAff_"
pure "3. runAff_ result"
launchAff_ $ joinFiber fiber
launchAff_ $ joinFiber suspendedFiber
2. runAff
2. runAff result
3. runAff_
3. runAff_ result
1. runSuspendedAff
1. runSuspendedAff result
Affをforkする
forkAff
forkAff :: forall a. Aff a -> Aff (Fiber a)
親のAff
からAff
をforkしてFiber
を返します。
次の例はAff
を3つforkする例です。
並列で動きますが、delay
関数で待ちを発生させているため、処理開始順と処理内のログ出力の順序が異なります。
一方、処理結果の出力順はjoinFiber
の順序に依存しています。
main :: Effect Unit
main = launchAff_ do
log "Parent Start"
a <- forkAff do
delay (Milliseconds 500.0)
log "1. Child"
pure "1. result"
b <- forkAff do
log "2. Child"
pure "2. result"
c <- forkAff do
delay (Milliseconds 200.0)
log "3. Child"
pure "3. result"
logShow =<< traverse joinFiber [a, b, c]
log "Parent End"
Parent Start
2. Child
3. Child
1. Child
["1. result","2. result","3. result"]
Parent End
suspendAff
suspendAff :: forall a. Aff a -> Aff (Fiber a)
親のAff
からsuspendした状態のFiber
を返します。
suspendされたAff
は、joinFiber
で結果を確認するまで実行されません。
main :: Effect Unit
main = launchAff_ do
log "1. parent"
f1 <- suspendAff do
log "2. suspendAff"
pure "2. suspendAff result"
f2 <- forkAff do
log "3. forkAff"
pure "3. forkAff result"
log "4. parent"
delay (Milliseconds 300.0)
log "5. parent"
log =<< joinFiber f2
log =<< joinFiber f1
log "6. parent"
1. parent
3. forkAff
4. parent
5. parent
3. forkAff result
2. suspendAff
2. suspendAff result
6. parent
Fiberを扱う
joinFiber
joinFiber :: Fiber ~> Aff
Fiberが完了して結果が得られるまで処理をブロックします。
Fiberが例外を投げた場合、その例外は現在のFiberで再スローされます。
次の例はforkしたFiberの実行完了をjoinFiber
で待ち、結果を表示する例です。
delay
の時間差的にb
の処理の方が先に終わっているはずですが、joinFiber a
でa
の完了を待っている(ブロックされている)ため、b
の結果はすぐに表示されません(a
の結果が表示されてすぐ表示される)。
main :: Effect Unit
main = launchAff_ do
a <- forkAff do
log "処理1の実行開始"
delay (Milliseconds 1000.0)
log "処理1の実行終了"
pure "結果1"
b <- forkAff do
log "処理2の実行開始"
delay (Milliseconds 10.0)
log "処理2の実行終了"
pure "結果2"
log =<< joinFiber a
log =<< joinFiber b
処理1の実行開始
処理2の実行開始
処理2の実行終了
処理1の実行終了
結果1
結果2
killFiber
killFiber :: forall a. Error -> Fiber a -> Aff Unit
Fiberをkillします。その際、Fiber内のCancelerが呼び出されます。
Cancelerを用いる処理が実行される前にkillされた場合、当然Cancelerは呼び出されません。
また、Fiberが完了したあとにkillした場合もCancelerは呼び出されません。
Fiberが完全に完了するまで処理をブロックします。
次の例は、FiberをkillFiber
でkillした際、Cancelerが実行されることが確認できる例です。
コメントされているdelay
のコメントを外すと、キャンセルするより先にforkした処理が終わるので、Cancelerが実行されないことを確認できます。
main :: Effect Unit
main = launchAff_ do
fiber <- forkAff do
log "処理開始"
_ <- makeAff \callback -> do
callback (Right unit)
pure $ Canceler \e -> do
log $ "Cancelerが呼ばれたよ。メッセージは: " <> (message e)
delay (Milliseconds 100.0)
log "処理終了"
pure "OK"
-- 以下を入れて中断する前に処理が終わるようにするとCancelerは呼ばれなくなる
-- delay (Milliseconds 500.0)
killFiber (error "中断したよ") fiber
result <- try (joinFiber fiber)
-- lmapでLeftのErrorだけmessageでStringに変換している
case lmap message result of
Left e -> log $ "エラー: " <> e
Right v -> log $ "処理結果は" <> v
処理開始
Cancelerが呼ばれたよ。メッセージは: 中断したよ
エラー: 中断したよ
delay
のコメントを外すと結果は次のようになります。
処理開始
処理終了
処理結果はOK
様々な関数
supervise
supervise :: forall a. Aff a -> Aff a
Aff
に対して新しくAff
作成します。
ただし、そのAff
が完了した場合、その中でforkされ且つsuspend状態のFiberはkillされ、Cancelerが実行されます。
つまり親の処理が完了したら、forkした処理は途中であったとしても強制的にキャンセルされるということです。
次の例は時間が掛かる処理がキャンセルされることを示す例です。
キャンセルされなければ、throw
しているのでエラーになるはずですが、これが呼ばれる前に親が終了してキャンセルされるので何事もなく処理が終了します。
supervise do
の箇所を_ <- forkAff do
に置き換えたり、あるいは親の完了前にthrow
を呼び出すためにdelay (Milliseconds 3000.0)
を消したりしてみれば、処理がキャンセルされずエラーになることがわかるでしょう。
main :: Effect Unit
main = launchAff_ do
log "parent start."
supervise do
log " supervise start."
_ <- forkAff do
delay (Milliseconds 3000.0)
log " child1"
-- 親が終了するとキャンセルされるので時間がかかっているこいつは呼ばれない
liftEffect $ throw "cancelled"
_ <- forkAff do
delay (Milliseconds 10.0)
-- こいつは親より時間がかからないから呼ばれる
log " child2"
delay (Milliseconds 100.0)
log " supervise end."
log "parent end."
parent start.
supervise start.
child2
supervise end.
parent end.
attempt
attempt :: forall a. Aff a -> Aff (Either Error a)
try
の単相版です。
実際実装はtry
そのものを使っています。
Aff
の文脈であることを明示したいときに使うのでしょうか。
次の例は処理に成功するAff
と失敗するAff
の両方を試す例です。
main :: Effect Unit
main = launchAff_ do
a <- attempt do pure "成功したよ"
b <- attempt do throwError (error "エラーだよ")
log =<< toString a
log =<< toString b
toString :: forall m. Applicative m => Either Error String -> m String
toString = either (pure <<< message) pure
成功したよ
エラーだよ
apathize
apathize :: forall a. Aff a -> Aff Unit
エラーを無視します。
次の例ではapathize
を用いた方はthrowError
していてもエラーにならないのに対し、forkAff
している方はエラーになります。
main :: Effect Unit
main = launchAff_ do
apathize do
log "処理1"
throwError (error "エラーだよ")
-- これはエラーになる(コメントを外してみればわかるよ)
-- joinFiber =<< forkAff do
-- log "処理2"
-- throwError (error "エラーだよ")
処理1
delay
delay :: Milliseconds -> Aff Unit
実行しているFiber
を指定した時間(msec)一時停止します。
main :: Effect Unit
main = launchAff_ do
log "Before Delay"
delay (Milliseconds 500.0)
log "After Delay"
Before Delay
After Delay
※実際はちょっと遅れてAfter Delayと出力されます
never
never :: forall a. Aff a
解決されない非同期処理です。
makeAff
のところでも説明しましたが、次の例のようにnever
以降は呼び出されなくなります。
決して呼び出されないことを明示したり、何もしないAff
がほしいときに使えそうです。
main :: Effect Unit
main = launchAff_ do
log "処理開始"
_ <- makeAff \cb -> do
log "コールバック関数を呼ぶよ"
cb $ Right unit
pure nonCanceler
_ <- never
log "処理終了"
処理開始
コールバック関数を呼ぶよ
finally
finally :: forall a. Aff Unit -> Aff a -> Aff a
二番目のAff
の実行が成功しようが失敗しようが、一番目のAff
の処理を実行する。
よくある try ~ finally の機構のようなものです。
処理結果によらず何か最終処理を行いたいときに使えるでしょう。
main :: Effect Unit
main = launchAff_ do
let
finalizer = makeAff \callback -> do
log "最終処理"
callback (Right unit)
pure nonCanceler
a = makeAff \callback -> do
log "エラーが発生する関数が呼ばれたよ"
callback (Left $ Aff.error "fail")
pure nonCanceler
-- 次のようにした場合、aの実行でエラーになり処理は止まりfinalizerは呼ばれないが、finallyを使えばfinalizerは絶対に呼ばれる
-- _ <- joinFiber =<< forkAff a
-- _ <- joinFiber =<< forkAff finalizer
_ <- finally finalizer a
pure unit
エラーが発生する関数が呼ばれたよ
最終処理
file:///path/output/Effect.Aff/foreign.js:530
throw util.fromLeft(step);
^
Error: エラーだよ
※スタックトレースの出力は省略しています。パスも適当に変えてます。
上記の例ではエラーが起きるままにしていますが、次のようにtry
やcatchError
を使えば処理を続けることもできます。
_ <- try $ finally finalizer a
-- もしくは
catchError (finally finalizer a) (log <<< message)
invincible
invincible :: forall a. Aff a -> Aff a
この関数をかませたAff
は強制終了できなくなります。
(invinsibleは『無敵』という意味)
次の例では、わざとdelay
で処理完了に掛かる時間を延ばし、処理中にkillFiber
でFiber
をkillしています。しかし結果を見ればわかるとおり処理は最後まで実行されています。つまりkillしてもキャンセルできないわけです。
main :: Effect Unit
main = launchAff_ do
a <- forkAff $ invincible do
log "invincible start."
delay (Milliseconds 200.0)
log "invincible end."
-- invincibleをかましてないと、endが表示される前に中断されてしまう
_ <- killFiber (error "kill") a
pure unit
invincible fun start.
invincible fun end.
cancelWith
cancelWith :: forall a. Aff a -> Canceler -> Aff a
Canceler
が設定されていないAff
にCanceler
を設定した新しいAff
が返されます。
main :: Effect Unit
main = launchAff_ do
let
a = forkAff do
log "処理開始"
delay (Milliseconds 200.0)
log "処理終了"
pure "OK"
fiber <- cancelWith a (Canceler \e -> log $ message e)
-- 以下を入れて中断する前に処理が終わるようにするとCancelerは呼ばれなくなる
-- delay (Milliseconds 500.0)
killFiber (error "中断したよ") fiber
result <- try (joinFiber fiber)
-- lmapでLeftのErrorだけmessageでStringに変換している
case lmap message result of
Left e -> log $ "エラー: " <> e
Right v -> log $ "処理結果は" <> v
処理開始
エラー: 中断したよ
bracket
bracket :: forall a b. Aff a -> (a -> Aff Unit) -> (a -> Aff b) -> Aff b
この関数はリソースの取得~使用~後始末というような一連の処理の流れを扱う関数です。
最初のAff a
がリソースの取得で、次の(a -> Aff Unit)
が後始末、最後の(a -> Aff b)
がリソースの使用にそれぞれ対応しています。
大きな特徴は、呼び出し順序が固定されており、かつリソースの取得と後始末の処理は必ず実行されるというところです。
余談ですが、finally
関数やinvincible
関数はこの関数を利用しています。
main :: Effect Unit
main = launchAff_ do
r <- bracket
(do
log "リソースを取得する関数が呼ばれました"
delay (Milliseconds 100.0)
pure "hoge")
(\v -> do
log $ "リソース" <> v <> "の後始末をする関数が呼ばれました")
(\v -> do
log "リソースを使用する関数が呼ばれました"
pure $ v <> " fuga")
log $ "結果:" <> r
リソースを取得する関数が呼ばれました
リソースを使用する関数が呼ばれました
リソースhogeの後始末をする関数が呼ばれました
結果:hoge fuga
generalBracket
generalBracket :: forall a b. Aff a -> BracketConditions a b -> (a -> Aff b) -> Aff b
type BracketConditions a b =
{ killed :: Error -> a -> Aff Unit
, failed :: Error -> a -> Aff Unit
, completed :: b -> a -> Aff Unit
}
1つ目のAff a
と、3つ目の(a -> Aff b)
は関係していて、1つ目の値を3つ目の関数で変換するような形になっています。
そして2つ目のBracketConditions a b
はレコードになっていますが、killされたり例外が投げられたり、正常に完了したりしたときの状況を見ることができます。
次の例は、正常に完了した場合、killされた場合、例外が投げられた場合すべて確認できるようになっています。
main :: Effect Unit
main = launchAff_ do
let
aff = do
log "処理開始"
delay (Milliseconds 500.0)
log "処理終了"
pure "value"
bracketFun = generalBracket
aff
{ killed: \e a -> do
log "killされたよ"
log $ message e
log a
, failed: \e a -> do
log "エラーになったよ"
log $ message e
log a
, completed: \b a -> do
log "正常終了したよ"
log $ "変換前の値:" <> a
log $ "変換後の値:" <> b
}
fiber1 <- forkAff $ bracketFun (\v -> pure $ v <> " added")
logShow =<< joinFiber fiber1
log "------"
fiber2 <- forkAff $ bracketFun pure
killFiber (error "中断") fiber2
log "------"
fiber3 <- forkAff $ bracketFun (const (throwError (error "何らかのエラー")))
_ <- try $ joinFiber fiber3
pure unit
処理開始
処理終了
正常終了したよ
変換前の値:value
変換後の値:value added
"value added"
------
処理開始
処理終了
killされたよ
中断
value
------
処理開始
処理終了
エラーになったよ
何らかのエラー
value
Parallelによる処理
Aff
は次のように、Control.Parallel
モジュールの型クラスParallel
のインスタンスになっています。したがって、Control.Parallel
モジュールに定義されている様々な関数を使うことができますし、
foreign import data ParAff :: Type -> Type
instance parallelAff :: Parallel ParAff Aff where
parallel = (unsafeCoerce :: forall a. Aff a -> ParAff a)
sequential = _sequential
使用例を挙げたいと思います。
parSequence
exec :: Number -> Aff String
exec a = do
log $ "Start " <> show a
delay $ Milliseconds a
log $ "End " <> show a
pure $ "wait " <> show a <> " ms."
main :: Effect Unit
main = launchAff_ do
results <- parSequence [exec 500.0, exec 200.0]
log $ show results
Start 500.0
Start 200.0
End 200.0
End 500.0
["wait 500.0 ms.","wait 200.0 ms."]
parSequence_
delayArray :: Array (Aff Unit)
delayArray = replicate 10 do
log "処理開始"
delay $ Milliseconds 100.0
log "処理終了"
main :: Effect Unit
main = launchAff_ $ parSequence_ delayArray
処理開始
処理開始
処理開始
処理開始
処理開始
処理開始
処理開始
処理開始
処理開始
処理開始
処理終了
処理終了
処理終了
処理終了
処理終了
処理終了
処理終了
処理終了
処理終了
処理終了
これを普通のsequence_
に変えると並列ではなくなるのでめっちゃ遅くなります。
main = launchAff_ $ sequence_ delayArray
処理開始
処理終了
処理開始
処理終了
処理開始
処理終了
処理開始
処理終了
処理開始
処理終了
処理開始
処理終了
処理開始
処理終了
処理開始
処理終了
処理開始
処理終了
処理開始
処理終了
Discussion