🎱

StreamQueue で非同期 Stream のテストを行う

2023/03/06に公開

Stream に関係した API で、意外と使われていないクラスに async パッケージStreamQueue があります。使われていないというか、モバイルアプリやデスクトップアプリではあまり使い道がないのですが、非同期 Stream のテストを行うのに便利です。

StreamQueue とは

StreamQueue は、 Stream のイベントをキューで扱えるようにするオブジェクトです。 StreamQueue の挙動は Stream や StreamIterator と似ていますが、以下の点で異なります。

  • 次のイベントが来るまで待つことができる。 Stream で listen でコールバックを指定するより簡潔に書けます。 StreamIterator では moveNext で次のイベントの有無を調べられますが、 StreamQueue はより簡単です。
  • イベントをため込む。 1 つ以上の先読みができるようになり、 skiptake で複数のイベントをまとめてリストとして操作できます。 skiptake は Stream にもありますが、そちらは Stream を返すので少し手間がかかります。注意点として、ストリームによっては StreamQueue による一時停止中にメモリの使用量が増える可能性があります。

以下はドキュメントに掲載されているサンプルコードです。

// テキスト行をイベントとする Stream を受け取る
var events = StreamQueue<String>(someStreamOfLines);
// 次のイベントを要求する
// すでにイベントがキャッシュされていればそれを取得する
// キャッシュされているイベントがなければ次のイベントが来るまで待つ
var first = await events.next;
while (first.startsWith('#')) {
  // 先頭の文字が # であれば飛ばして次のイベントを要求する
  first = await events.next;
}

// 先頭の文字が MAGIC_MARKER である場合
// MAGIC_MARKER が何かは書かれてないので不明
if (first.startsWith(MAGIC_MARKER)) {
  // MAGIC_MARKER 後に続く整数の数だけイベントを取得し、それらの行をヘッダーとする
  // ヘッダー以降の行をボディとして handleMessage に渡す
  var headerCount =
      first.parseInt(first.substring(MAGIC_MARKER.length + 1));
  handleMessage(headers: await events.take(headerCount),
                body: events.rest);
  return;
}
// エラー処理

Stream のイベントをコールバックなしで扱えます。もちろん Stream や StreamIterator でも実装できますが、余分なコードが必要になるのが想像できます。

非同期 Stream のテスト

StreamQueue は非同期の Stream をテストするのに便利です。例として、非同期 Stream で実行される FizzBuzz のユニットテストを実装してみます。最後に完全なソースコードを掲載します。

FizzBuzz の実装は以下の通りです。 start を呼ぶと Stream に FizzBuzz の値を 15 回出力します。 close で Stream を閉じます。

class FizzBuzz {
  FizzBuzz() {
    _streamController = StreamController.broadcast();
  }

  late StreamController<dynamic> _streamController;
  Stream<dynamic> get stream => _streamController.stream;

  Future<void> start() async {
    for (var i = 1; i <= 15; i++) {
      if (i % 3 == 0 && i % 5 == 0) {
        _streamController.add('FizzBuzz');
      } else if (i % 3 == 0) {
        _streamController.add('Fizz');
      } else if (i % 5 == 0) {
        _streamController.add('Buzz');
      } else {
        _streamController.add(i);
      }
    }
  }

  void close() {
    _streamController.close();
  }
}

非同期のアサーションには expectLater を使います。まず、 Stream を直接使うケースです。 expected には期待する値のリストが入ります。詳細は最後にあるソースコードを参照してください。

test('Stream 1', () async {
  final fizzBuzz = FizzBuzz();
  fizzBuzz.start();
  // 終わらない
  await expectLater(fizzBuzz.stream, emitsInOrder(expected));
  fizzBuzz.close();
});

このテストはいつまで経っても終了しません。 await をつけると expectLater は Stream が閉じるまで待ち続けます。しかし closeexpectLater が終わらないと実行されないので堂々巡りになります。

そこで、次は await を外してみます。するとテストは無事終了しますが、期待通りの結果にはなりません。

test('Stream 2', () async {
  final fizzBuzz = FizzBuzz();
  fizzBuzz.start();
  // この間のイベントを受けそびれる
  expectLater(fizzBuzz.stream, emitsInOrder(expected));
  fizzBuzz.close();
});

エラー内容は以下の通りです。

Expected: should do the following in order:
          • emit an event that <1>
          • emit an event that <2>
          • emit an event that 'Fizz'
          • emit an event that <4>
          • emit an event that 'Buzz'
          • emit an event that 'Fizz'
          • emit an event that <7>
          • emit an event that <8>
          • emit an event that 'Fizz'
          • emit an event that 'Buzz'
          • emit an event that <11>
          • emit an event that 'Fizz'
          • emit an event that <13>
          • emit an event that <14>
          • emit an event that 'FizzBuzz'
  Actual: <Instance of '_BroadcastStream<dynamic>'>
   Which: emitted x Stream closed.
            which didn't emit an event that <1>

1 という内容のイベントを受け取っていない旨のエラーです。 start の実行後に expectLater を呼んでいるので、一瞬の差ですが expectLater はそれまでに送信されたイベントを受けそびれています。

そこで、次は expectLaterstart の順序を逆にしてみます。これでやっと成功します。

test('Stream 3', () async {
  final fizzBuzz = FizzBuzz();

  // start の前にイベントを受け付け始める
  expectLater(fizzBuzz.stream, emitsInOrder(expected));

  // 開始
  fizzBuzz.start();
  fizzBuzz.close();
});

ただ、たった数行なのに非同期で実行されるタイミングを考慮しなければならないのは面倒ですし、メンテナンスコストも上がります。 StreamQueue を使うと同期的な処理のように書けます。

test('StreamQueue', () async {
  final fizzBuzz = FizzBuzz();

  // Stream の操作を先に書ける
  final queue = StreamQueue(fizzBuzz.stream);
  fizzBuzz.start();
  fizzBuzz.close();

  // StreamQueue がイベントをキャッシュしているので、最後にアサーションすればよい
  await expectLater(queue, emitsInOrder(expected));
});

expectLater の前に Stream を閉じる必要がありますが、前記のコードより読み書きの負荷が軽いはずです。 StreamQueue ならイベントを受けそびれることがないので、 Stream の処理を終えてからアサーションが可能です。コールバックを持つ API をテストしたい場合は、コールバックの呼び出しをイベントとする Stream を用意すれば StreamQueue を使って同様にテストできます。

以上です。

完全なソースコード

import 'dart:async';

import 'package:async/async.dart';
import 'package:flutter_test/flutter_test.dart';

class FizzBuzz {
  FizzBuzz() {
    _streamController = StreamController.broadcast();
  }

  late StreamController<dynamic> _streamController;
  Stream<dynamic> get stream => _streamController.stream;

  Future<void> start() async {
    for (var i = 1; i <= 15; i++) {
      if (i % 3 == 0 && i % 5 == 0) {
        _streamController.add('FizzBuzz');
      } else if (i % 3 == 0) {
        _streamController.add('Fizz');
      } else if (i % 5 == 0) {
        _streamController.add('Buzz');
      } else {
        _streamController.add(i);
      }
    }
  }

  void close() {
    _streamController.close();
  }
}

void main() {
  final expected = [
    1,
    2,
    'Fizz',
    4,
    'Buzz',
    'Fizz',
    7,
    8,
    'Fizz',
    'Buzz',
    11,
    'Fizz',
    13,
    14,
    'FizzBuzz'
  ];
  test('Stream 1', () async {
    final fizzBuzz = FizzBuzz();
    fizzBuzz.start();
    await expectLater(fizzBuzz.stream, emitsInOrder(expected));
    fizzBuzz.close();
  });

  test('Stream 2', () async {
    final fizzBuzz = FizzBuzz();
    fizzBuzz.start();
    expectLater(fizzBuzz.stream, emitsInOrder(expected));
    fizzBuzz.close();
  });

  test('Stream 3', () async {
    final fizzBuzz = FizzBuzz();
    expectLater(fizzBuzz.stream, emitsInOrder(expected));
    fizzBuzz.start();
    fizzBuzz.close();
  });

  test('StreamQueue', () async {
    final fizzBuzz = FizzBuzz();
    final queue = StreamQueue(fizzBuzz.stream);
    fizzBuzz.start();
    fizzBuzz.close();
    await expectLater(queue, emitsInOrder(expected));
  });
}

Discussion