Multithread programming on Rx.Net
リアクティブプログラミングはデータストリーム(順序立てて流れてくるデータ群)に対して宣言的に処理を記述していくプログラミングパラダイムであり、必然的に非同期処理をサポートしています。
しかし、マルチスレッド環境においてその威力を発揮するには仕組みを正しく理解している必要があります。
ここではC#のリアクティブプログラミングライブラリRx.Net(System.Reactive)を用いてマルチスレッドプログラミングする際に必要な知識を解説します。
スケジューラ
Rx.Netにおける非同期処理の基盤はSystem.Reactive.Concurrency
名前空間で提供されているIScheduler
インターフェースとその実装です。
標準で提供されているスケジューラ実装には以下のようなものがあります。
スケジューラの実験コード
using System.Reactive.Concurrency;
using System.Threading;
using static System.Console;
static int CurrentThreadId() => Thread.CurrentThread.ManagedThreadId;
static void ExperimentScheduler(IScheduler scheduler)
{
WriteLine($"scope0 ... {CurrentThreadId()}");
scheduler.Schedule(() =>
{
WriteLine($" scope1 >>> {CurrentThreadId()}");
Thread.Sleep(1);
scheduler.Schedule(() =>
{
WriteLine($" scope2 >>> {CurrentThreadId()}");
Thread.Sleep(1);
scheduler.Schedule(() =>
{
WriteLine($" scope3-1 ... {CurrentThreadId()}");
Thread.Sleep(1);
});
WriteLine($" scope2 ... {CurrentThreadId()}");
Thread.Sleep(1);
scheduler.Schedule(() =>
{
WriteLine($" scope3-2 ... {CurrentThreadId()}");
Thread.Sleep(1);
});
WriteLine($" scope2 <<< {CurrentThreadId()}");
Thread.Sleep(1);
});
WriteLine($" scope1 <<< {CurrentThreadId()}");
});
}
-
ImmediateScheduler
現在のスレッドで処理を実行するスケジューラです。
処理がスケジュールされた時点で即時評価を行います。
普通にメソッドを呼び出すのと同等の振る舞いになります。実験結果
// ExperimentScheduler(ImmediateScheduler.Instance); scope0 ... 1 scope1 >>> 1 scope2 >>> 1 scope3-1 ... 1 scope2 ... 1 scope3-2 ... 1 scope2 <<< 1 scope1 <<< 1
-
CurrentThreadScheduler
現在のスレッドで処理を実行するスケジューラです。
一時的なメッセージキューを生成して処理をキューイングします。
そのため再帰的にスケジュールされた処理は実行が後回しにされます。実験結果
// ExperimentScheduler(CurrentThreadScheduler.Instance); scope0 ... 1 scope1 >>> 1 scope1 <<< 1 scope2 >>> 1 scope2 ... 1 scope2 <<< 1 scope3-1 ... 1 scope3-2 ... 1
-
ThreadPoolScheduler
.Netの標準スレッドプールに処理を実行させるスケジューラです。
処理がスケジュールされるたびにスレッドプールへキューイングするため、実行されるスレッドにいかなる仮定もしてはいけません。(2つの処理が同じスレッドで実行されることも異なるスレッドで実行されることもある)実験結果
// ExperimentScheduler(ThreadPoolScheduler.Instance); scope0 ... 1 scope1 >>> 4 scope1 <<< 4 scope2 >>> 5 scope2 ... 5 scope3-1 ... 4 scope2 <<< 5 scope3-2 ... 4
scope3-1
の呼び出しタイミングが遅れていることからわかるように処理の実行順序が予想できなくなっています。また、scope1
の実行が終わった時点でワーカースレッド4が空いているため、scope3-1
とscope3-2
が同じスレッドで実行されたことがわかります。 -
TaskPoolScheduler
指定したタスクファクトリを用いて.NetのTPLに処理を実行させるスケジューラです。
デフォルトの振る舞いはThreadPoolScheduler
とほぼ同じですが、タスクファクトリを通じてより柔軟な非同期スケジューリングが可能です。 -
NewThreadScheduler
処理がスケジュールされるたびに新しいスレッドを生成して割り当てるスケジューラです。
このスケジューラでは、別個にスケジュールされた処理は常に異なるスレッドが割り当てられることが保証されています。実験結果
// ExperimentScheduler(NewThreadScheduler.Default); scope0 ... 1 scope1 >>> 5 scope1 <<< 5 scope2 >>> 6 scope2 ... 6 scope3-1 ... 7 scope2 <<< 6 scope3-2 ... 8
すべての処理が別個のスレッドを割り当てられていることがわかります。
処理の実行順序はThreadPoolScheduler
同様予想できなくなっています。 -
EventLoopScheduler
スケジューラインスタンスが内部で1つのスレッドを生成し、スケジュールされた処理をそのスレッドで実行するスケジューラです。
同じスケジューラインスタンスに対してスケジュールされた処理はすべて同じスレッドで処理されますが、そのスレッドは現在のスレッドとも他のEventLoopScheduler
スレッドとも異なります。
このスケジューラは割り当てられたスレッドを終了するためにIDisposable
も実装しています。実験結果
// for(var i = 0; i < 3; ++i) // { // ExperimentScheduler(new EventLoopScheduler()); // Thread.Sleep(100); // WriteLine(); // } scope0 ... 1 scope1 >>> 5 scope1 <<< 5 scope2 >>> 5 scope2 ... 5 scope2 <<< 5 scope3-1 ... 5 scope3-2 ... 5 scope0 ... 1 scope1 >>> 6 scope1 <<< 6 scope2 >>> 6 scope2 ... 6 scope2 <<< 6 scope3-1 ... 6 scope3-2 ... 6 scope0 ... 1 scope1 >>> 7 scope1 <<< 7 scope2 >>> 7 scope2 ... 7 scope2 <<< 7 scope3-1 ... 7 scope3-2 ... 7
ExperimentScheduler
の呼び出しごとに新しいスレッドが割り当てられていることがわかります。 -
SynchronizationContextScheduler
-
ControlScheduler
-
DispatcherScheduler
それぞれ指定したSynchronizationContext、WinFormコントロール、WPF Dispatcherに同期して処理を実行します。UIスレッド制約のある環境で利用します。
本稿では便宜的にCurrentThreadScheduler
とImmediateScheduler
をシングルスレッド型スケジューラ、ThreadPoolScheduler
・TaskPoolScheduler
・NewThreadScheduler
・EventLoopScheduler
をマルチスレッド型スケジューラと呼ぶことにします。
ストリームの処理スケジューリングを操作するオペレータ
Rx.Netにおける処理のスケジューリング
Rx.Netでは、原則としてオペレータの処理はスケジューラを通して実行されるようになっています。
その中でも時間制御を伴わなずOnNext
で与えられた値のみで直ちに動作を判定できるオペレータ(Select
やWhere
など)はCurrentThreadScheduler
を内部的に利用しています。
多くの場合気にする必要はありませんが、既存のオペレータに依存せずに自作オペレータを実装する場合には注意する必要があるでしょう。
時間操作を伴うオペレータ(Delay
やSkipUntil
など)ではオーバーロードによりIScheduler
を切り替えられるようになっています。
指定しない場合はデフォルトでThreadPoolScheduler
が選択されます。
なお、このようなオペレータではマルチスレッド型スケジューラとシングルスレッド型スケジューラでは振る舞いが大きく変化します。
例
using System;
using System.Diagnostics;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Threading;
using static System.Console;
static int CurrentThreadId() => Thread.CurrentThread.ManagedThreadId;
var watch = new Stopwatch();
watch.Start();
WriteLine($"{CurrentThreadId()}: start with {scheduler.GetType().Name} ({watch.ElapsedMilliseconds}ms)");
Observable
.Range(0, 3)
.Delay(TimeSpan.FromSeconds(1), scheduler)
.Subscribe(x => WriteLine($"{CurrentThreadId()}: {x} ({watch.ElapsedMilliseconds}ms)"));
WriteLine($"{CurrentThreadId()}: end ({watch.ElapsedMilliseconds}ms)");
Thread.Sleep(3000);
/*
1: start with ThreadPoolScheduler (0ms)
1: end (129ms)
5: 0 (1153ms)
5: 1 (1154ms)
5: 2 (1154ms)
1: start with CurrentThreadScheduler (0ms)
1: 0 (1055ms)
1: 1 (1058ms)
1: 2 (1058ms)
1: 3 (1058ms)
1: 4 (1058ms)
1: end (1059ms)
1: start with ImmediateScheduler (0ms)
1: 0 (1077ms)
1: 1 (2086ms)
1: 2 (3093ms)
1: end (4100ms)
*/
イベントの実行順序
ほとんどのオペレータはイベントの追い越しを引き起こす効果を持ちません。オペレータに流れてきたOnNext
/OnCompleted
/OnError
は、どのスケジューラを使用しようともそのままの順番で出力されます。
例
using System;
using System.Diagnostics;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Threading;
using static System.Console;
static int CurrentThreadId() => Thread.CurrentThread.ManagedThreadId;
var scheduler = ThreadPoolScheduler.Instance;
var watch = new Stopwatch();
watch.Start();
Observable
.Range(0, 5)
.ObserveOn(scheduler)
.Select(x => { Thread.Sleep(TimeSpan.FromSeconds(5 - x)); return x; })
.Subscribe(x => WriteLine($"{CurrentThreadId()}: {x} ({watch.ElapsedMilliseconds}ms)"));
/*
5: 0 (5121ms)
5: 1 (9147ms)
5: 2 (12156ms)
5: 3 (14157ms)
5: 4 (15169ms)
*/
ただし、前のオペレータの時点で実行がマルチスレッド化されている場合やキューイングされた遅延処理を用いている場合この順序維持性は破れます。
例
using System;
using System.Diagnostics;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
using System.Threading.Tasks;
using static System.Console;
static int CurrentThreadId() => Thread.CurrentThread.ManagedThreadId;
var watch = new Stopwatch();
watch.Start();
var subject = new Subject<int>();
subject
.Select(x => { Thread.Sleep(TimeSpan.FromSeconds(5 - x)); return x; })
.Subscribe(x => WriteLine($"{CurrentThreadId()}: {x} ({watch.ElapsedMilliseconds}ms)"));
for(var i = 0; i < 5; ++i)
{
var j = i;
Task.Run(() => subject.OnNext(j));
}
Thread.Sleep(20000); WriteLine();
/*
ソースサブジェクトの`OnNext`がそれぞれ別のスレッドで実行されているならば、オペレータの実行速度により値の到着順序が入れ替わる
8: 4 (1110ms)
7: 3 (2104ms)
6: 2 (3110ms)
5: 1 (4118ms)
4: 0 (5110ms)
*/
watch.Restart();
Observable
.Range(0, 5)
.SelectMany(x => Observable.Return(x).Delay(TimeSpan.FromSeconds(5 - x)))
.Subscribe(x => WriteLine($"{CurrentThreadId()}: {x} ({watch.ElapsedMilliseconds}ms)"));
/*
`SelectMany`でマルチスレッド実行されているストリームを結合した場合も到着順序が入れ替わる
16: 4 (1041ms)
15: 3 (2045ms)
14: 2 (3034ms)
13: 1 (4031ms)
12: 0 (5028ms)
*/
watch.Restart();
Observable
.Range(0, 5)
.SelectMany(x => Observable.Return(x).Delay(TimeSpan.FromSeconds(5 - x), CurrentThreadScheduler.Instance))
.Subscribe(x => WriteLine($"{CurrentThreadId()}: {x} ({watch.ElapsedMilliseconds}ms)"));
Thread.Sleep(20000); WriteLine();
/*
同一スレッド上の実行であっても後続処理のキューイングを用いてディレイ処理が一斉起動されているならば到着順序が入れ替わる
1: 4 (1006ms)
1: 3 (2014ms)
1: 2 (3008ms)
1: 1 (4005ms)
1: 0 (5015ms)
*/
watch.Restart();
Observable
.Range(0, 5)
.SelectMany(x => Observable.Return(x).Delay(TimeSpan.FromSeconds(5 - x), ImmediateScheduler.Instance))
.Subscribe(x => WriteLine($"{CurrentThreadId()}: {x} ({watch.ElapsedMilliseconds}ms)"));
Thread.Sleep(50000); WriteLine();
/*
`ImmediateScheduler`では前のイベントが完了するまで次のイベントの処理に入れないので、順序が維持される代わりに処理時間が累積していく
1: 0 (5011ms)
1: 1 (14016ms)
1: 2 (21043ms)
1: 3 (26058ms)
1: 4 (29069ms)
*/
ObserveOn
Rx.Netにおいてスケジューリングを指定する最も基本的なオペレータはObserveOn
です。
ObserveOn
を使うと直後のオペレータを指定したスケジューラ上で実行します。
例
using System;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using static System.Console;
static int CurrentThreadId() => Thread.CurrentThread.ManagedThreadId;
var subject = new Subject<int>();
var observable = subject
.Select(x => new Item(x, new()))
.Select(item => item with { Text = item.Text.AppendLine($"{CurrentThreadId()}: Select1({item.Index})") })
.Select(item => item with { Text = item.Text.AppendLine($"{CurrentThreadId()}: Select2({item.Index})") })
.ObserveOn(ThreadPoolScheduler.Instance)
.Select(item => item with { Text = item.Text.AppendLine($"{CurrentThreadId()}: Select3({item.Index})") })
.Select(item => item with { Text = item.Text.AppendLine($"{CurrentThreadId()}: Select4({item.Index})") })
.ObserveOn(ThreadPoolScheduler.Instance)
.Select(item => item with { Text = item.Text.AppendLine($"{CurrentThreadId()}: Select5({item.Index})") })
.Select(item => item with { Text = item.Text.AppendLine($"{CurrentThreadId()}: Select6({item.Index})") });
observable.Subscribe(item => WriteLine(item.Text));
subject.OnNext(0);
subject.OnNext(1);
subject.OnNext(2);
record Item(int Index, StringBuilder Text);
/*
1: Select1(0)
1: Select2(0)
5: Select3(0) <-- ここでスケジューラ切り替わり
5: Select4(0)
6: Select5(0) <-- ここでスケジューラ切り替わり
6: Select6(0)
1: Select1(1)
1: Select2(1)
5: Select3(1)
5: Select4(1)
6: Select5(1)
6: Select6(1)
1: Select1(2)
1: Select2(2)
5: Select3(2)
5: Select4(2)
6: Select5(2)
6: Select6(2)
*/
ObserveOn
により実行スケジューラの切り替わりが起こるのは直後のオペレータだけなので、更に後続するオペレータがCurrentThreadScheduler
を使っているならば切り替わり先のスレッドで処理が継続します。
注意点として、ObserveOn
によるスケジューリングは1つのストリームにつき1度だけ、Subscribeが行われたときに実行されます。ストリームが生きている限り確保されたワーカースレッドは専有され続けます。
例
using System;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using static System.Console;
static int CurrentThreadId() => Thread.CurrentThread.ManagedThreadId;
var subject = new Subject<int>();
var observable = subject
.Select(x => new Item(x, new()))
.Select(item => item with { Text = item.Text.AppendLine($"{CurrentThreadId()}: Select1({item.Index})") })
.Select(item => item with { Text = item.Text.AppendLine($"{CurrentThreadId()}: Select2({item.Index})") })
.ObserveOn(ThreadPoolScheduler.Instance)
.Select(item => item with { Text = item.Text.AppendLine($"{CurrentThreadId()}: Select3({item.Index})") })
.Select(item => item with { Text = item.Text.AppendLine($"{CurrentThreadId()}: Select4({item.Index})") })
.ObserveOn(ThreadPoolScheduler.Instance)
.Select(item => item with { Text = item.Text.AppendLine($"{CurrentThreadId()}: Select5({item.Index})") })
.Select(item => item with { Text = item.Text.AppendLine($"{CurrentThreadId()}: Select6({item.Index})") });
observable.Subscribe(item => WriteLine(item.Text));
subject.OnNext(0);
subject.OnNext(1);
subject.OnNext(2);
// ここで長時間待機&スレッドプールの大量消費が入ったとする
var threadPoolWaste = Enumerable
.Range(0, 100)
.Select(_ => Task.Run(() => Thread.Sleep(20000)))
.ToArray();
Thread.Sleep(10000);
subject.OnNext(3);
Thread.Sleep(100);
record Item(int Index, StringBuilder Text);
/*
1: Select1(0)
1: Select2(0)
5: Select3(0)
5: Select4(0)
6: Select5(0)
6: Select6(0)
1: Select1(1)
1: Select2(1)
5: Select3(1)
5: Select4(1)
6: Select5(1)
6: Select6(1)
1: Select1(2)
1: Select2(2)
5: Select3(2)
5: Select4(2)
6: Select5(2)
6: Select6(2)
1: Select1(3)
1: Select2(3)
5: Select3(3) <-- スレッドプールの使用量によらず以前の`OnNext`と同じスレッドが使われている
5: Select4(3)
6: Select5(3)
6: Select6(3)
*/
この性質のため、長寿命を持つストリームに対してThreadPoolScheduler
を利用することはおすすめしません[1]。代わりにNewThreadScheduler
かEventLoopScheduler
を利用しましょう。
SubscribeOn
Rx.Net
におけるもう一つのスケジューラ操作オペレータはSubscribeOn
です。
このオペレータは最初のサブスクリプション処理自体をスケジューリングするオペレータです。
例
using System;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Threading;
using static System.Console;
static int CurrentThreadId() => Thread.CurrentThread.ManagedThreadId;
var observable = Observable.Range(0, 3)
.Do(x => WriteLine($"{CurrentThreadId()}: Do1({x})"))
.Do(x => WriteLine($"{CurrentThreadId()}: Do2({x})"))
.SubscribeOn(ThreadPoolScheduler.Instance)
.Do(x => WriteLine($"{CurrentThreadId()}: Do3({x})"))
.Do(x => WriteLine($"{CurrentThreadId()}: Do4({x})"))
.Subscribe(x => WriteLine($"{CurrentThreadId()}: Subscribe({x})"));
Thread.Sleep(100);
/*
4: Do1(0)
4: Do2(0)
4: Do3(0)
4: Do4(0)
4: Subscribe(0)
4: Do1(1)
4: Do2(1)
4: Do3(1)
4: Do4(1)
4: Subscribe(1)
4: Do1(2)
4: Do2(2)
4: Do3(2)
4: Do4(2)
4: Subscribe(2)
*/
この性質のため、ストリームの根本がhot observableである場合にはあまり用をなしません。
それどころかサブスクリプション処理の完了時期が予測できなくなるためイベントの取りこぼしを起こしやすくなります。
例
using System;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
using static System.Console;
static int CurrentThreadId() => Thread.CurrentThread.ManagedThreadId;
var subject = new Subject<int>();
var observable = subject
.Do(x => WriteLine($"{CurrentThreadId()}: Do1({x})"))
.Do(x => WriteLine($"{CurrentThreadId()}: Do2({x})"))
.SubscribeOn(ThreadPoolScheduler.Instance)
.Do(x => WriteLine($"{CurrentThreadId()}: Do3({x})"))
.Do(x => WriteLine($"{CurrentThreadId()}: Do4({x})"))
.Subscribe(x => WriteLine($"{CurrentThreadId()}: Subscribe({x})"));
// サブスクリプション処理自体がスレッドプール上で動いているため
// 待機しないと大抵の場合サブスクリプション前にsubject.OnNext(i)が実行されてしまう
Thread.Sleep(100);
for (var i = 0; i < 3; ++i)
{
subject.OnNext(i);
}
Thread.Sleep(100);
/*
1: Do1(0)
1: Do2(0)
1: Do3(0)
1: Do4(0)
1: Subscribe(0)
1: Do1(1)
1: Do2(1)
1: Do3(1)
1: Do4(1)
1: Subscribe(1)
1: Do1(2)
1: Do2(2)
1: Do3(2)
1: Do4(2)
1: Subscribe(2)
*/
サブスクリプション時に動作するスレッド依存の自作オペレータを用意すればhot observableに対してもその効果を実感できますが、恣意性が高くメリットのあるものではないでしょう。
このオペレータはもっぱらcold observableに対して使うことになると思います。
Rxにおける並列処理の実現
前述のとおり、ObserveOn
もSubscribeOn
もイベントストリームとそれ以外の処理をマルチスレッド化するオペレータであり、イベントの発行ごとに処理の並列化をするものではありません。
サーバーのリクエスト処理のように、イベントごとにスレッドを割り当てるには別の実装を用いる必要があります。
最も単純な解決策はサブスクリプションする処理そのものをFuture化してしまうことです。
例
using System;
using System.Diagnostics;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
using System.Threading.Tasks;
using static System.Console;
static int CurrentThreadId() => Thread.CurrentThread.ManagedThreadId;
var watch = new Stopwatch();
watch.Start();
var observable = Observable.Range(0, 3)
.Do(x => WriteLine($"{CurrentThreadId()}: Do({x}) [{watch.ElapsedMilliseconds}ms]"))
.Subscribe(x => Task.Run(async () => {
await Task.Delay(TimeSpan.FromSeconds(3 - x));
WriteLine($"{CurrentThreadId()}: Subscribe({x}) [{watch.ElapsedMilliseconds}ms]");
}));
Thread.Sleep(5000);
ただし、この方法では処理の実体が完了するタイミングを管理するために特別な配慮を必要とします。
実装に用いる場合には注意してください。
その他、ストリーム中で並列化を実現する方法を紹介します。
ストリーム中での変換:イベントの順序を保証しなくて良い場合
イベントの処理完了が時間的に前後してもいい場合にはSelectMany
オペレータを使います。
// 他にもオーバーロードはあるが基本形はこの2つ
// IObservable版
Observable.SelectMany<TSource, TResult>(
this IObservable<TSource> source,
Func<TSource, IObservable<TResult>> selector);
// Task版
Observable.SelectMany<TSource, TResult>(
this IObservable<TSource> source,
Func<TSource, Task<TResult>> selector);
変換先がIObservable<TResult>
であれTask<TResult>
であれ、処理のメイン部分をFutureパターンにより並列化することができます。
例
using System;
using System.Diagnostics;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
using static System.Console;
static int CurrentThreadId() => Thread.CurrentThread.ManagedThreadId;
var watch = new Stopwatch();
watch.Start();
var observable = Observable.Range(0, 3)
.Do(x => WriteLine($"{CurrentThreadId()}: Do({x}) [{watch.ElapsedMilliseconds}ms]"))
.SelectMany(async x => { await Task.Delay(TimeSpan.FromSeconds(3 - x)); return x; })
.Subscribe(x => WriteLine($"{CurrentThreadId()}: Subscribe({x}) [{watch.ElapsedMilliseconds}ms]"));
Thread.Sleep(5000);
/*
1: Do(0) [165ms]
1: Do(1) [251ms]
1: Do(2) [252ms]
5: Subscribe(2) [1277ms]
4: Subscribe(1) [2266ms]
6: Subscribe(0) [3260ms]
*/
Subscribe
されたアクションはCurrentThreadScheduler
で実行されますが、SelectMany
によってワーカースレッドに振り分けられた後なのでそれぞれ別スレッドで実行されることになります。
また、処理の軽いイベントは重いイベントに対して先着していることもわかります。
ストリーム中での変換:イベントの順序を維持する場合
途中の処理を並列化した上で結果の出力においてはイベントの順序を維持したい場合、現状coldオペレータだけで実現するのは難しいと思います。
2022-03-09追記
Select
によるFuture化とConcat
による結合によりイベント順序を維持した並列化が可能であることを確認しました。
混乱を招く情報を後悔してしまい申し訳ありません。
例
var wait = new[] { 2, 1, 0, 3, 4, };
var watch = new Stopwatch();
// Task化+ToObservableを使う方法
watch.Restart();
Observable.Range(0, 5)
.Do(x => WriteLine($"{CurrentThreadId()}: Do({x}) [{watch.ElapsedMilliseconds}ms]"))
.Select(x => Task.Delay(TimeSpan.FromSeconds(wait[x])).ContinueWith(_ => x).ToObservable())
.Concat()
.Subscribe(x => WriteLine($"{CurrentThreadId()}: Subscribe({x}) [{watch.ElapsedMilliseconds}ms]"));
Thread.Sleep(5000);
WriteLine();
/*
1: Do(0) [88ms]
1: Do(1) [156ms]
1: Do(2) [156ms]
1: Do(3) [157ms]
1: Do(4) [157ms]
4: Subscribe(0) [2155ms]
4: Subscribe(1) [2157ms]
8: Subscribe(2) [2159ms]
8: Subscribe(3) [3181ms]
7: Subscribe(4) [4171ms]
*/
// cold streamを.Replay().AutoConnect(0)でhot化する方法
watch.Restart();
Observable.Range(0, 5)
.Do(x => WriteLine($"{CurrentThreadId()}: Do({x}) [{watch.ElapsedMilliseconds}ms]"))
.Select(x => Observable.Return(x).Delay(TimeSpan.FromSeconds(wait[x])).Replay(1).AutoConnect(0))
.Concat()
.Subscribe(x => WriteLine($"{CurrentThreadId()}: Subscribe({x}) [{watch.ElapsedMilliseconds}ms]"));
Thread.Sleep(5000);
WriteLine();
/*
1: Do(0) [0ms]
1: Do(1) [14ms]
1: Do(2) [14ms]
1: Do(3) [17ms]
1: Do(4) [20ms]
9: Subscribe(0) [2030ms]
9: Subscribe(1) [2031ms]
9: Subscribe(2) [2031ms]
12: Subscribe(3) [3034ms]
13: Subscribe(4) [4033ms]
*/
2通りの方法を紹介していますが、重要なのはSelect
オペレータでイベントの即値をreplayableなhot observableに変換することです。
これはConcat
での結合が実行された瞬間からタスクを開始し、その結果を任意のタイミングで受け取れるようにするために必要です。
まとめ
- Rx.Netではオペレータ自身がイベントの追い越しを行うことはありません。前のイベントの処理が終わるまで次のイベントは待機することになります。
- 前のオペレータがイベント処理をマルチスレッド化している場合には追い越しの可能性があります。
- キューイングを利用している
IObservable
をSelectMany
で結合するなど場合には追い越しの可能性があります。
-
ObserveOn
でイベントの処理を、SubscribeOn
でサブスクリプション処理を別スレッドで動作させることができます。ただし、いずれのオペレータもイベントごとに並列タスクを割り当てるような動作はしません。- 寿命が長いと予想されるストリームに対して
ObserveOn
を用いる場合にはNewThreadScheduler
かEventLoopScheduler
の利用を推奨します。
- 寿命が長いと予想されるストリームに対して
- イベントごとに処理を並列化するには
Select
やSelectMany
を用いて変換処理をFuture化する必要があります。
変更履歴
- 2022-03-09
イベントの順序を維持する並列化がcold streamのまま実現可能であったことを確認したため修正。
-
実際には
ThreadPoolScheduler
は長期間スレッドを専有すると予想される処理に対してNewThreadScheduler
に切り替わる機能を有しており、v6.0.1においてはObserveOn
はこの機能を利用しています。そのため実用上問題を体験することはないと思われます。ただし、文書化された仕様ではないことから将来に渡り同様の実装である保証はありません。実装意図を明確化するためにもNewThreadScheduler
あるいはEventLoopScheduler
の利用をおすすめします。 ↩︎
Discussion