Multithread programming on Rx.Net

2022/02/07に公開

リアクティブプログラミングはデータストリーム(順序立てて流れてくるデータ群)に対して宣言的に処理を記述していくプログラミングパラダイムであり、必然的に非同期処理をサポートしています。
しかし、マルチスレッド環境においてその威力を発揮するには仕組みを正しく理解している必要があります。
ここではC#のリアクティブプログラミングライブラリRx.Net(System.Reactive)を用いてマルチスレッドプログラミングする際に必要な知識を解説します。

スケジューラ

Rx.Netにおける非同期処理の基盤はSystem.Reactive.Concurrency名前空間で提供されているISchedulerインターフェースとその実装です。
標準で提供されているスケジューラ実装には以下のようなものがあります。

スケジューラの実験コード
Program.cs
using System.Reactive.Concurrency;
using System.Threading;
using static System.Console;

static int CurrentThreadId() => Thread.CurrentThread.ManagedThreadId;

static void ExperimentScheduler(IScheduler scheduler)
{
    WriteLine($"scope0 ... {CurrentThreadId()}");
    scheduler.Schedule(() =>
    {
        WriteLine($"  scope1 >>> {CurrentThreadId()}");
        Thread.Sleep(1);
        scheduler.Schedule(() =>
        {
            WriteLine($"    scope2 >>> {CurrentThreadId()}");
            Thread.Sleep(1);
            scheduler.Schedule(() =>
            {
                WriteLine($"      scope3-1 ... {CurrentThreadId()}");
                Thread.Sleep(1);
            });
            WriteLine($"    scope2 ... {CurrentThreadId()}");
            Thread.Sleep(1);
            scheduler.Schedule(() =>
            {
                WriteLine($"      scope3-2 ... {CurrentThreadId()}");
                Thread.Sleep(1);
            });
            WriteLine($"    scope2 <<< {CurrentThreadId()}");
            Thread.Sleep(1);
        });
        WriteLine($"  scope1 <<< {CurrentThreadId()}");
    });
}
  • ImmediateScheduler
    現在のスレッドで処理を実行するスケジューラです。
    処理がスケジュールされた時点で即時評価を行います。
    普通にメソッドを呼び出すのと同等の振る舞いになります。

    実験結果
    //  ExperimentScheduler(ImmediateScheduler.Instance);
    scope0 ... 1
    scope1 >>> 1
      scope2 >>> 1
        scope3-1 ... 1
      scope2 ... 1
        scope3-2 ... 1
      scope2 <<< 1
    scope1 <<< 1
    
  • CurrentThreadScheduler
    現在のスレッドで処理を実行するスケジューラです。
    一時的なメッセージキューを生成して処理をキューイングします。
    そのため再帰的にスケジュールされた処理は実行が後回しにされます。

    実験結果
    //  ExperimentScheduler(CurrentThreadScheduler.Instance);
    scope0 ... 1
    scope1 >>> 1
    scope1 <<< 1
      scope2 >>> 1
      scope2 ... 1
      scope2 <<< 1
        scope3-1 ... 1
        scope3-2 ... 1
    
  • ThreadPoolScheduler
    .Netの標準スレッドプールに処理を実行させるスケジューラです。
    処理がスケジュールされるたびにスレッドプールへキューイングするため、実行されるスレッドにいかなる仮定もしてはいけません。(2つの処理が同じスレッドで実行されることも異なるスレッドで実行されることもある)

    実験結果
    //  ExperimentScheduler(ThreadPoolScheduler.Instance);
    scope0 ... 1
    scope1 >>> 4
    scope1 <<< 4
      scope2 >>> 5
      scope2 ... 5
        scope3-1 ... 4
      scope2 <<< 5
        scope3-2 ... 4
    

    scope3-1の呼び出しタイミングが遅れていることからわかるように処理の実行順序が予想できなくなっています。また、scope1の実行が終わった時点でワーカースレッド4が空いているため、scope3-1scope3-2が同じスレッドで実行されたことがわかります。

  • TaskPoolScheduler
    指定したタスクファクトリを用いて.NetのTPLに処理を実行させるスケジューラです。
    デフォルトの振る舞いはThreadPoolSchedulerとほぼ同じですが、タスクファクトリを通じてより柔軟な非同期スケジューリングが可能です。

  • NewThreadScheduler
    処理がスケジュールされるたびに新しいスレッドを生成して割り当てるスケジューラです。
    このスケジューラでは、別個にスケジュールされた処理は常に異なるスレッドが割り当てられることが保証されています。

    実験結果
    //  ExperimentScheduler(NewThreadScheduler.Default);
    scope0 ... 1
    scope1 >>> 5
    scope1 <<< 5
      scope2 >>> 6
      scope2 ... 6
        scope3-1 ... 7
      scope2 <<< 6
        scope3-2 ... 8
    

    すべての処理が別個のスレッドを割り当てられていることがわかります。
    処理の実行順序はThreadPoolScheduler同様予想できなくなっています。

  • EventLoopScheduler
    スケジューラインスタンスが内部で1つのスレッドを生成し、スケジュールされた処理をそのスレッドで実行するスケジューラです。
    同じスケジューラインスタンスに対してスケジュールされた処理はすべて同じスレッドで処理されますが、そのスレッドは現在のスレッドとも他のEventLoopSchedulerスレッドとも異なります。
    このスケジューラは割り当てられたスレッドを終了するためにIDisposableも実装しています。

    実験結果
    //  for(var i = 0; i < 3; ++i)
    //  {
    //      ExperimentScheduler(new EventLoopScheduler());
    //      Thread.Sleep(100);
    //      WriteLine();
    //  }
    scope0 ... 1
    scope1 >>> 5
    scope1 <<< 5
      scope2 >>> 5
      scope2 ... 5
      scope2 <<< 5
        scope3-1 ... 5
        scope3-2 ... 5
    
    scope0 ... 1
      scope1 >>> 6
      scope1 <<< 6
        scope2 >>> 6
        scope2 ... 6
        scope2 <<< 6
          scope3-1 ... 6
          scope3-2 ... 6
    
    scope0 ... 1
      scope1 >>> 7
      scope1 <<< 7
        scope2 >>> 7
        scope2 ... 7
        scope2 <<< 7
          scope3-1 ... 7
          scope3-2 ... 7
    

    ExperimentSchedulerの呼び出しごとに新しいスレッドが割り当てられていることがわかります。

  • SynchronizationContextScheduler

  • ControlScheduler

  • DispatcherScheduler
    それぞれ指定したSynchronizationContext、WinFormコントロール、WPF Dispatcherに同期して処理を実行します。UIスレッド制約のある環境で利用します。

本稿では便宜的にCurrentThreadSchedulerImmediateSchedulerシングルスレッド型スケジューラThreadPoolSchedulerTaskPoolSchedulerNewThreadSchedulerEventLoopSchedulerマルチスレッド型スケジューラと呼ぶことにします。

ストリームの処理スケジューリングを操作するオペレータ

Rx.Netにおける処理のスケジューリング

Rx.Netでは、原則としてオペレータの処理はスケジューラを通して実行されるようになっています。

その中でも時間制御を伴わなずOnNextで与えられた値のみで直ちに動作を判定できるオペレータ(SelectWhereなど)はCurrentThreadSchedulerを内部的に利用しています。
多くの場合気にする必要はありませんが、既存のオペレータに依存せずに自作オペレータを実装する場合には注意する必要があるでしょう。

時間操作を伴うオペレータ(DelaySkipUntilなど)ではオーバーロードによりISchedulerを切り替えられるようになっています。
指定しない場合はデフォルトでThreadPoolSchedulerが選択されます。
なお、このようなオペレータではマルチスレッド型スケジューラとシングルスレッド型スケジューラでは振る舞いが大きく変化します。

using System;
using System.Diagnostics;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Threading;
using static System.Console;
static int CurrentThreadId() => Thread.CurrentThread.ManagedThreadId;

var watch = new Stopwatch();
watch.Start();
WriteLine($"{CurrentThreadId()}: start with {scheduler.GetType().Name} ({watch.ElapsedMilliseconds}ms)");
Observable
    .Range(0, 3)
    .Delay(TimeSpan.FromSeconds(1), scheduler)
    .Subscribe(x => WriteLine($"{CurrentThreadId()}: {x} ({watch.ElapsedMilliseconds}ms)"));
WriteLine($"{CurrentThreadId()}: end ({watch.ElapsedMilliseconds}ms)");
Thread.Sleep(3000);

/*
1: start with ThreadPoolScheduler (0ms)
1: end (129ms)
5: 0 (1153ms)
5: 1 (1154ms)
5: 2 (1154ms)

1: start with CurrentThreadScheduler (0ms)
1: 0 (1055ms)
1: 1 (1058ms)
1: 2 (1058ms)
1: 3 (1058ms)
1: 4 (1058ms)
1: end (1059ms)

1: start with ImmediateScheduler (0ms)
1: 0 (1077ms)
1: 1 (2086ms)
1: 2 (3093ms)
1: end (4100ms)
*/

イベントの実行順序

ほとんどのオペレータはイベントの追い越しを引き起こす効果を持ちません。オペレータに流れてきたOnNext/OnCompleted/OnErrorは、どのスケジューラを使用しようともそのままの順番で出力されます。

using System;
using System.Diagnostics;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Threading;
using static System.Console;
static int CurrentThreadId() => Thread.CurrentThread.ManagedThreadId;

var scheduler = ThreadPoolScheduler.Instance;
var watch = new Stopwatch();
watch.Start();
Observable
    .Range(0, 5)
    .ObserveOn(scheduler)
    .Select(x => { Thread.Sleep(TimeSpan.FromSeconds(5 - x)); return x; })
    .Subscribe(x => WriteLine($"{CurrentThreadId()}: {x} ({watch.ElapsedMilliseconds}ms)"));
/*
5: 0 (5121ms)
5: 1 (9147ms)
5: 2 (12156ms)
5: 3 (14157ms)
5: 4 (15169ms)
*/

ただし、前のオペレータの時点で実行がマルチスレッド化されている場合キューイングされた遅延処理を用いている場合この順序維持性は破れます。

using System;
using System.Diagnostics;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
using System.Threading.Tasks;
using static System.Console;
static int CurrentThreadId() => Thread.CurrentThread.ManagedThreadId;

var watch = new Stopwatch();
watch.Start();

var subject = new Subject<int>();
subject
    .Select(x => { Thread.Sleep(TimeSpan.FromSeconds(5 - x)); return x; })
    .Subscribe(x => WriteLine($"{CurrentThreadId()}: {x} ({watch.ElapsedMilliseconds}ms)"));
for(var i = 0; i < 5; ++i)
{
    var j = i;
    Task.Run(() => subject.OnNext(j));
}
Thread.Sleep(20000); WriteLine();
/*
ソースサブジェクトの`OnNext`がそれぞれ別のスレッドで実行されているならば、オペレータの実行速度により値の到着順序が入れ替わる
8: 4 (1110ms)
7: 3 (2104ms)
6: 2 (3110ms)
5: 1 (4118ms)
4: 0 (5110ms)
*/

watch.Restart();
Observable
    .Range(0, 5)
    .SelectMany(x => Observable.Return(x).Delay(TimeSpan.FromSeconds(5 - x)))
    .Subscribe(x => WriteLine($"{CurrentThreadId()}: {x} ({watch.ElapsedMilliseconds}ms)"));
/*
`SelectMany`でマルチスレッド実行されているストリームを結合した場合も到着順序が入れ替わる
16: 4 (1041ms)
15: 3 (2045ms)
14: 2 (3034ms)
13: 1 (4031ms)
12: 0 (5028ms)
*/

watch.Restart();
Observable
    .Range(0, 5)
    .SelectMany(x => Observable.Return(x).Delay(TimeSpan.FromSeconds(5 - x), CurrentThreadScheduler.Instance))
    .Subscribe(x => WriteLine($"{CurrentThreadId()}: {x} ({watch.ElapsedMilliseconds}ms)"));
Thread.Sleep(20000); WriteLine();
/*
同一スレッド上の実行であっても後続処理のキューイングを用いてディレイ処理が一斉起動されているならば到着順序が入れ替わる
1: 4 (1006ms)
1: 3 (2014ms)
1: 2 (3008ms)
1: 1 (4005ms)
1: 0 (5015ms)
*/

watch.Restart();
Observable
    .Range(0, 5)
    .SelectMany(x => Observable.Return(x).Delay(TimeSpan.FromSeconds(5 - x), ImmediateScheduler.Instance))
    .Subscribe(x => WriteLine($"{CurrentThreadId()}: {x} ({watch.ElapsedMilliseconds}ms)"));
Thread.Sleep(50000); WriteLine();
/*
`ImmediateScheduler`では前のイベントが完了するまで次のイベントの処理に入れないので、順序が維持される代わりに処理時間が累積していく
1: 0 (5011ms)
1: 1 (14016ms)
1: 2 (21043ms)
1: 3 (26058ms)
1: 4 (29069ms)
*/

ObserveOn

Rx.Netにおいてスケジューリングを指定する最も基本的なオペレータはObserveOnです。
ObserveOnを使うと直後のオペレータを指定したスケジューラ上で実行します。

using System;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using static System.Console;
static int CurrentThreadId() => Thread.CurrentThread.ManagedThreadId;

var subject = new Subject<int>();
var observable = subject
    .Select(x => new Item(x, new()))
    .Select(item => item with { Text = item.Text.AppendLine($"{CurrentThreadId()}: Select1({item.Index})") })
    .Select(item => item with { Text = item.Text.AppendLine($"{CurrentThreadId()}: Select2({item.Index})") })
    .ObserveOn(ThreadPoolScheduler.Instance)
    .Select(item => item with { Text = item.Text.AppendLine($"{CurrentThreadId()}: Select3({item.Index})") })
    .Select(item => item with { Text = item.Text.AppendLine($"{CurrentThreadId()}: Select4({item.Index})") })
    .ObserveOn(ThreadPoolScheduler.Instance)
    .Select(item => item with { Text = item.Text.AppendLine($"{CurrentThreadId()}: Select5({item.Index})") })
    .Select(item => item with { Text = item.Text.AppendLine($"{CurrentThreadId()}: Select6({item.Index})") });
observable.Subscribe(item => WriteLine(item.Text));

subject.OnNext(0);
subject.OnNext(1);
subject.OnNext(2);

record Item(int Index, StringBuilder Text);
/*
1: Select1(0)
1: Select2(0)
5: Select3(0)  <-- ここでスケジューラ切り替わり
5: Select4(0)
6: Select5(0)  <-- ここでスケジューラ切り替わり
6: Select6(0)

1: Select1(1)
1: Select2(1)
5: Select3(1)
5: Select4(1)
6: Select5(1)
6: Select6(1)

1: Select1(2)
1: Select2(2)
5: Select3(2)
5: Select4(2)
6: Select5(2)
6: Select6(2)
*/

ObserveOnにより実行スケジューラの切り替わりが起こるのは直後のオペレータだけなので、更に後続するオペレータがCurrentThreadSchedulerを使っているならば切り替わり先のスレッドで処理が継続します。

注意点として、ObserveOnによるスケジューリングは1つのストリームにつき1度だけ、Subscribeが行われたときに実行されます。ストリームが生きている限り確保されたワーカースレッドは専有され続けます。

using System;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using static System.Console;
static int CurrentThreadId() => Thread.CurrentThread.ManagedThreadId;

var subject = new Subject<int>();
var observable = subject
    .Select(x => new Item(x, new()))
    .Select(item => item with { Text = item.Text.AppendLine($"{CurrentThreadId()}: Select1({item.Index})") })
    .Select(item => item with { Text = item.Text.AppendLine($"{CurrentThreadId()}: Select2({item.Index})") })
    .ObserveOn(ThreadPoolScheduler.Instance)
    .Select(item => item with { Text = item.Text.AppendLine($"{CurrentThreadId()}: Select3({item.Index})") })
    .Select(item => item with { Text = item.Text.AppendLine($"{CurrentThreadId()}: Select4({item.Index})") })
    .ObserveOn(ThreadPoolScheduler.Instance)
    .Select(item => item with { Text = item.Text.AppendLine($"{CurrentThreadId()}: Select5({item.Index})") })
    .Select(item => item with { Text = item.Text.AppendLine($"{CurrentThreadId()}: Select6({item.Index})") });
observable.Subscribe(item => WriteLine(item.Text));

subject.OnNext(0);
subject.OnNext(1);
subject.OnNext(2);

// ここで長時間待機&スレッドプールの大量消費が入ったとする
var threadPoolWaste = Enumerable
    .Range(0, 100)
    .Select(_ => Task.Run(() => Thread.Sleep(20000)))
    .ToArray();
Thread.Sleep(10000);

subject.OnNext(3);
Thread.Sleep(100);

record Item(int Index, StringBuilder Text);
/*
1: Select1(0)
1: Select2(0)
5: Select3(0)
5: Select4(0)
6: Select5(0)
6: Select6(0)

1: Select1(1)
1: Select2(1)
5: Select3(1)
5: Select4(1)
6: Select5(1)
6: Select6(1)

1: Select1(2)
1: Select2(2)
5: Select3(2)
5: Select4(2)
6: Select5(2)
6: Select6(2)

1: Select1(3)
1: Select2(3)
5: Select3(3)  <-- スレッドプールの使用量によらず以前の`OnNext`と同じスレッドが使われている
5: Select4(3)
6: Select5(3)
6: Select6(3)
*/

この性質のため、長寿命を持つストリームに対してThreadPoolSchedulerを利用することはおすすめしません[1]。代わりにNewThreadSchedulerEventLoopSchedulerを利用しましょう。

SubscribeOn

Rx.Netにおけるもう一つのスケジューラ操作オペレータはSubscribeOnです。
このオペレータは最初のサブスクリプション処理自体をスケジューリングするオペレータです。

using System;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Threading;
using static System.Console;
static int CurrentThreadId() => Thread.CurrentThread.ManagedThreadId;

var observable = Observable.Range(0, 3)
    .Do(x => WriteLine($"{CurrentThreadId()}: Do1({x})"))
    .Do(x => WriteLine($"{CurrentThreadId()}: Do2({x})"))
    .SubscribeOn(ThreadPoolScheduler.Instance)
    .Do(x => WriteLine($"{CurrentThreadId()}: Do3({x})"))
    .Do(x => WriteLine($"{CurrentThreadId()}: Do4({x})"))
    .Subscribe(x => WriteLine($"{CurrentThreadId()}: Subscribe({x})"));
Thread.Sleep(100);

/*
4: Do1(0)
4: Do2(0)
4: Do3(0)
4: Do4(0)
4: Subscribe(0)
4: Do1(1)
4: Do2(1)
4: Do3(1)
4: Do4(1)
4: Subscribe(1)
4: Do1(2)
4: Do2(2)
4: Do3(2)
4: Do4(2)
4: Subscribe(2)
*/

この性質のため、ストリームの根本がhot observableである場合にはあまり用をなしません。
それどころかサブスクリプション処理の完了時期が予測できなくなるためイベントの取りこぼしを起こしやすくなります。

using System;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
using static System.Console;
static int CurrentThreadId() => Thread.CurrentThread.ManagedThreadId;

var subject = new Subject<int>();
var observable = subject
    .Do(x => WriteLine($"{CurrentThreadId()}: Do1({x})"))
    .Do(x => WriteLine($"{CurrentThreadId()}: Do2({x})"))
    .SubscribeOn(ThreadPoolScheduler.Instance)
    .Do(x => WriteLine($"{CurrentThreadId()}: Do3({x})"))
    .Do(x => WriteLine($"{CurrentThreadId()}: Do4({x})"))
    .Subscribe(x => WriteLine($"{CurrentThreadId()}: Subscribe({x})"));

// サブスクリプション処理自体がスレッドプール上で動いているため
// 待機しないと大抵の場合サブスクリプション前にsubject.OnNext(i)が実行されてしまう
Thread.Sleep(100);
for (var i = 0; i < 3; ++i)
{
    subject.OnNext(i);
}
Thread.Sleep(100);

/*
1: Do1(0)
1: Do2(0)
1: Do3(0)
1: Do4(0)
1: Subscribe(0)
1: Do1(1)
1: Do2(1)
1: Do3(1)
1: Do4(1)
1: Subscribe(1)
1: Do1(2)
1: Do2(2)
1: Do3(2)
1: Do4(2)
1: Subscribe(2)
*/

サブスクリプション時に動作するスレッド依存の自作オペレータを用意すればhot observableに対してもその効果を実感できますが、恣意性が高くメリットのあるものではないでしょう。
このオペレータはもっぱらcold observableに対して使うことになると思います。

Rxにおける並列処理の実現

前述のとおり、ObserveOnSubscribeOnもイベントストリームとそれ以外の処理をマルチスレッド化するオペレータであり、イベントの発行ごとに処理の並列化をするものではありません。
サーバーのリクエスト処理のように、イベントごとにスレッドを割り当てるには別の実装を用いる必要があります。

最も単純な解決策はサブスクリプションする処理そのものをFuture化してしまうことです。

using System;
using System.Diagnostics;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
using System.Threading.Tasks;
using static System.Console;
static int CurrentThreadId() => Thread.CurrentThread.ManagedThreadId;

var watch = new Stopwatch();
watch.Start();
var observable = Observable.Range(0, 3)
    .Do(x => WriteLine($"{CurrentThreadId()}: Do({x}) [{watch.ElapsedMilliseconds}ms]"))
    .Subscribe(x => Task.Run(async () => {
        await Task.Delay(TimeSpan.FromSeconds(3 - x));
        WriteLine($"{CurrentThreadId()}: Subscribe({x}) [{watch.ElapsedMilliseconds}ms]");
    }));

Thread.Sleep(5000);

ただし、この方法では処理の実体が完了するタイミングを管理するために特別な配慮を必要とします。
実装に用いる場合には注意してください。

その他、ストリーム中で並列化を実現する方法を紹介します。

ストリーム中での変換:イベントの順序を保証しなくて良い場合

イベントの処理完了が時間的に前後してもいい場合にはSelectManyオペレータを使います。

// 他にもオーバーロードはあるが基本形はこの2つ

// IObservable版
Observable.SelectMany<TSource, TResult>(
    this IObservable<TSource> source,
     Func<TSource, IObservable<TResult>> selector);

// Task版
Observable.SelectMany<TSource, TResult>(
    this IObservable<TSource> source,
     Func<TSource, Task<TResult>> selector);

変換先がIObservable<TResult>であれTask<TResult>であれ、処理のメイン部分をFutureパターンにより並列化することができます。

using System;
using System.Diagnostics;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
using static System.Console;
static int CurrentThreadId() => Thread.CurrentThread.ManagedThreadId;

var watch = new Stopwatch();
watch.Start();
var observable = Observable.Range(0, 3)
    .Do(x => WriteLine($"{CurrentThreadId()}: Do({x}) [{watch.ElapsedMilliseconds}ms]"))
    .SelectMany(async x => { await Task.Delay(TimeSpan.FromSeconds(3 - x)); return x; })
    .Subscribe(x => WriteLine($"{CurrentThreadId()}: Subscribe({x}) [{watch.ElapsedMilliseconds}ms]"));
Thread.Sleep(5000);

/*
1: Do(0) [165ms]
1: Do(1) [251ms]
1: Do(2) [252ms]
5: Subscribe(2) [1277ms]
4: Subscribe(1) [2266ms]
6: Subscribe(0) [3260ms]
*/

SubscribeされたアクションはCurrentThreadSchedulerで実行されますが、SelectManyによってワーカースレッドに振り分けられた後なのでそれぞれ別スレッドで実行されることになります。
また、処理の軽いイベントは重いイベントに対して先着していることもわかります。

ストリーム中での変換:イベントの順序を維持する場合

途中の処理を並列化した上で結果の出力においてはイベントの順序を維持したい場合、現状coldオペレータだけで実現するのは難しいと思います。

2022-03-09追記

SelectによるFuture化とConcatによる結合によりイベント順序を維持した並列化が可能であることを確認しました。
混乱を招く情報を後悔してしまい申し訳ありません。

var wait = new[] { 2, 1, 0, 3, 4, };
var watch = new Stopwatch();

// Task化+ToObservableを使う方法
watch.Restart();
Observable.Range(0, 5)
    .Do(x => WriteLine($"{CurrentThreadId()}: Do({x}) [{watch.ElapsedMilliseconds}ms]"))
    .Select(x => Task.Delay(TimeSpan.FromSeconds(wait[x])).ContinueWith(_ => x).ToObservable())
    .Concat()
    .Subscribe(x => WriteLine($"{CurrentThreadId()}: Subscribe({x}) [{watch.ElapsedMilliseconds}ms]"));
Thread.Sleep(5000);
WriteLine();
/*
1: Do(0) [88ms]
1: Do(1) [156ms]
1: Do(2) [156ms]
1: Do(3) [157ms]
1: Do(4) [157ms]
4: Subscribe(0) [2155ms]
4: Subscribe(1) [2157ms]
8: Subscribe(2) [2159ms]
8: Subscribe(3) [3181ms]
7: Subscribe(4) [4171ms]
*/

// cold streamを.Replay().AutoConnect(0)でhot化する方法
watch.Restart();
Observable.Range(0, 5)
    .Do(x => WriteLine($"{CurrentThreadId()}: Do({x}) [{watch.ElapsedMilliseconds}ms]"))
    .Select(x => Observable.Return(x).Delay(TimeSpan.FromSeconds(wait[x])).Replay(1).AutoConnect(0))
    .Concat()
    .Subscribe(x => WriteLine($"{CurrentThreadId()}: Subscribe({x}) [{watch.ElapsedMilliseconds}ms]"));
Thread.Sleep(5000);
WriteLine();
/*
1: Do(0) [0ms]
1: Do(1) [14ms]
1: Do(2) [14ms]
1: Do(3) [17ms]
1: Do(4) [20ms]
9: Subscribe(0) [2030ms]
9: Subscribe(1) [2031ms]
9: Subscribe(2) [2031ms]
12: Subscribe(3) [3034ms]
13: Subscribe(4) [4033ms]
 */

2通りの方法を紹介していますが、重要なのはSelectオペレータでイベントの即値をreplayableなhot observableに変換することです。
これはConcatでの結合が実行された瞬間からタスクを開始し、その結果を任意のタイミングで受け取れるようにするために必要です。

まとめ

  • Rx.Netではオペレータ自身がイベントの追い越しを行うことはありません。前のイベントの処理が終わるまで次のイベントは待機することになります。
    • 前のオペレータがイベント処理をマルチスレッド化している場合には追い越しの可能性があります。
    • キューイングを利用しているIObservableSelectManyで結合するなど場合には追い越しの可能性があります。
  • ObserveOnでイベントの処理を、SubscribeOnでサブスクリプション処理を別スレッドで動作させることができます。ただし、いずれのオペレータもイベントごとに並列タスクを割り当てるような動作はしません。
    • 寿命が長いと予想されるストリームに対してObserveOnを用いる場合にはNewThreadSchedulerEventLoopSchedulerの利用を推奨します。
  • イベントごとに処理を並列化するにはSelectSelectManyを用いて変換処理をFuture化する必要があります。

変更履歴

  • 2022-03-09
    イベントの順序を維持する並列化がcold streamのまま実現可能であったことを確認したため修正。
脚注
  1. 実際にはThreadPoolSchedulerは長期間スレッドを専有すると予想される処理に対してNewThreadSchedulerに切り替わる機能を有しており、v6.0.1においてはObserveOnはこの機能を利用しています。そのため実用上問題を体験することはないと思われます。ただし、文書化された仕様ではないことから将来に渡り同様の実装である保証はありません。実装意図を明確化するためにもNewThreadSchedulerあるいはEventLoopSchedulerの利用をおすすめします。 ↩︎

Discussion