StreamQueue で非同期 Stream のテストを行う
Stream に関係した API で、意外と使われていないクラスに async パッケージ の StreamQueue があります。使われていないというか、モバイルアプリやデスクトップアプリではあまり使い道がないのですが、非同期 Stream のテストを行うのに便利です。
StreamQueue とは
StreamQueue は、 Stream のイベントをキューで扱えるようにするオブジェクトです。 StreamQueue の挙動は Stream や StreamIterator と似ていますが、以下の点で異なります。
- 次のイベントが来るまで待つことができる。 Stream で
listen
でコールバックを指定するより簡潔に書けます。 StreamIterator ではmoveNext
で次のイベントの有無を調べられますが、 StreamQueue はより簡単です。 - イベントをため込む。 1 つ以上の先読みができるようになり、
skip
やtake
で複数のイベントをまとめてリストとして操作できます。skip
とtake
は Stream にもありますが、そちらは Stream を返すので少し手間がかかります。注意点として、ストリームによっては StreamQueue による一時停止中にメモリの使用量が増える可能性があります。
以下はドキュメントに掲載されているサンプルコードです。
// テキスト行をイベントとする Stream を受け取る
var events = StreamQueue<String>(someStreamOfLines);
// 次のイベントを要求する
// すでにイベントがキャッシュされていればそれを取得する
// キャッシュされているイベントがなければ次のイベントが来るまで待つ
var first = await events.next;
while (first.startsWith('#')) {
// 先頭の文字が # であれば飛ばして次のイベントを要求する
first = await events.next;
}
// 先頭の文字が MAGIC_MARKER である場合
// MAGIC_MARKER が何かは書かれてないので不明
if (first.startsWith(MAGIC_MARKER)) {
// MAGIC_MARKER 後に続く整数の数だけイベントを取得し、それらの行をヘッダーとする
// ヘッダー以降の行をボディとして handleMessage に渡す
var headerCount =
first.parseInt(first.substring(MAGIC_MARKER.length + 1));
handleMessage(headers: await events.take(headerCount),
body: events.rest);
return;
}
// エラー処理
Stream のイベントをコールバックなしで扱えます。もちろん Stream や StreamIterator でも実装できますが、余分なコードが必要になるのが想像できます。
非同期 Stream のテスト
StreamQueue は非同期の Stream をテストするのに便利です。例として、非同期 Stream で実行される FizzBuzz のユニットテストを実装してみます。最後に完全なソースコードを掲載します。
FizzBuzz の実装は以下の通りです。 start
を呼ぶと Stream に FizzBuzz の値を 15 回出力します。 close
で Stream を閉じます。
class FizzBuzz {
FizzBuzz() {
_streamController = StreamController.broadcast();
}
late StreamController<dynamic> _streamController;
Stream<dynamic> get stream => _streamController.stream;
Future<void> start() async {
for (var i = 1; i <= 15; i++) {
if (i % 3 == 0 && i % 5 == 0) {
_streamController.add('FizzBuzz');
} else if (i % 3 == 0) {
_streamController.add('Fizz');
} else if (i % 5 == 0) {
_streamController.add('Buzz');
} else {
_streamController.add(i);
}
}
}
void close() {
_streamController.close();
}
}
非同期のアサーションには expectLater
を使います。まず、 Stream を直接使うケースです。 expected
には期待する値のリストが入ります。詳細は最後にあるソースコードを参照してください。
test('Stream 1', () async {
final fizzBuzz = FizzBuzz();
fizzBuzz.start();
// 終わらない
await expectLater(fizzBuzz.stream, emitsInOrder(expected));
fizzBuzz.close();
});
このテストはいつまで経っても終了しません。 await
をつけると expectLater
は Stream が閉じるまで待ち続けます。しかし close
は expectLater
が終わらないと実行されないので堂々巡りになります。
そこで、次は await
を外してみます。するとテストは無事終了しますが、期待通りの結果にはなりません。
test('Stream 2', () async {
final fizzBuzz = FizzBuzz();
fizzBuzz.start();
// この間のイベントを受けそびれる
expectLater(fizzBuzz.stream, emitsInOrder(expected));
fizzBuzz.close();
});
エラー内容は以下の通りです。
Expected: should do the following in order:
• emit an event that <1>
• emit an event that <2>
• emit an event that 'Fizz'
• emit an event that <4>
• emit an event that 'Buzz'
• emit an event that 'Fizz'
• emit an event that <7>
• emit an event that <8>
• emit an event that 'Fizz'
• emit an event that 'Buzz'
• emit an event that <11>
• emit an event that 'Fizz'
• emit an event that <13>
• emit an event that <14>
• emit an event that 'FizzBuzz'
Actual: <Instance of '_BroadcastStream<dynamic>'>
Which: emitted x Stream closed.
which didn't emit an event that <1>
1
という内容のイベントを受け取っていない旨のエラーです。 start
の実行後に expectLater
を呼んでいるので、一瞬の差ですが expectLater
はそれまでに送信されたイベントを受けそびれています。
そこで、次は expectLater
と start
の順序を逆にしてみます。これでやっと成功します。
test('Stream 3', () async {
final fizzBuzz = FizzBuzz();
// start の前にイベントを受け付け始める
expectLater(fizzBuzz.stream, emitsInOrder(expected));
// 開始
fizzBuzz.start();
fizzBuzz.close();
});
ただ、たった数行なのに非同期で実行されるタイミングを考慮しなければならないのは面倒ですし、メンテナンスコストも上がります。 StreamQueue を使うと同期的な処理のように書けます。
test('StreamQueue', () async {
final fizzBuzz = FizzBuzz();
// Stream の操作を先に書ける
final queue = StreamQueue(fizzBuzz.stream);
fizzBuzz.start();
fizzBuzz.close();
// StreamQueue がイベントをキャッシュしているので、最後にアサーションすればよい
await expectLater(queue, emitsInOrder(expected));
});
expectLater
の前に Stream を閉じる必要がありますが、前記のコードより読み書きの負荷が軽いはずです。 StreamQueue ならイベントを受けそびれることがないので、 Stream の処理を終えてからアサーションが可能です。コールバックを持つ API をテストしたい場合は、コールバックの呼び出しをイベントとする Stream を用意すれば StreamQueue を使って同様にテストできます。
以上です。
完全なソースコード
import 'dart:async';
import 'package:async/async.dart';
import 'package:flutter_test/flutter_test.dart';
class FizzBuzz {
FizzBuzz() {
_streamController = StreamController.broadcast();
}
late StreamController<dynamic> _streamController;
Stream<dynamic> get stream => _streamController.stream;
Future<void> start() async {
for (var i = 1; i <= 15; i++) {
if (i % 3 == 0 && i % 5 == 0) {
_streamController.add('FizzBuzz');
} else if (i % 3 == 0) {
_streamController.add('Fizz');
} else if (i % 5 == 0) {
_streamController.add('Buzz');
} else {
_streamController.add(i);
}
}
}
void close() {
_streamController.close();
}
}
void main() {
final expected = [
1,
2,
'Fizz',
4,
'Buzz',
'Fizz',
7,
8,
'Fizz',
'Buzz',
11,
'Fizz',
13,
14,
'FizzBuzz'
];
test('Stream 1', () async {
final fizzBuzz = FizzBuzz();
fizzBuzz.start();
await expectLater(fizzBuzz.stream, emitsInOrder(expected));
fizzBuzz.close();
});
test('Stream 2', () async {
final fizzBuzz = FizzBuzz();
fizzBuzz.start();
expectLater(fizzBuzz.stream, emitsInOrder(expected));
fizzBuzz.close();
});
test('Stream 3', () async {
final fizzBuzz = FizzBuzz();
expectLater(fizzBuzz.stream, emitsInOrder(expected));
fizzBuzz.start();
fizzBuzz.close();
});
test('StreamQueue', () async {
final fizzBuzz = FizzBuzz();
final queue = StreamQueue(fizzBuzz.stream);
fizzBuzz.start();
fizzBuzz.close();
await expectLater(queue, emitsInOrder(expected));
});
}
Discussion