Zenn
🌊

【Flutter】StreamのtestをStreamQueueで克服する

2025/02/02に公開
1

はじめに

私が実務に入って一番苦戦していたのがStreamが絡んでくるテストです。
初学者にとってこのStreamは実装するだけでも理解、習得するのにかなり苦労するのではないでしょうか?
そしていざ実装することはできても、テストをしようとするとうまくできない。。。
そんな時に出会ったのが、Dartのパッケージ機能の一つであるStreamQueueです。

https://api.flutter.dev/flutter/async/StreamQueue-class.html

このStreamQueueを使ってどうにか克服することができたので備忘録として記事にまとめていきたいと思います。また、mockitoを使った場合のStreamメソッドのスタブ化についても触れています。

記事の対象者

  • Streamを使った実装はできている方
  • Streamが絡む実装のテストで苦戦している方
  • mockitoも絡めたStreamのテスト方法を知りたい方
  • riverpodの知識がある程度ある方
  • mockitoの知識がある程度ある方

記事を執筆時点での筆者の環境

[✓] Flutter (Channel stable, 3.27.1, on macOS 15.1 24B2082 darwin-arm64, locale ja-JP)
[✓] Android toolchain - develop for Android devices (Android SDK version 35.0.0)
[✓] Xcode - develop for iOS and macOS (Xcode 16.1)
[✓] Chrome - develop for the web
[✓] Android Studio (version 2024.2)
[✓] VS Code (version 1.96.2)

ソースコード

記事内では的時実装のコードを張っていますが、みにくい場合は以下をご覧ください。

https://github.com/HaruhikoMotokawa/stream_test_sample/tree/main

前提

  • riverpodを使ったDI(依存性注入)を行い、レイヤードアーキテクチャで実装している
  • riverpodは自動生成で実装している
  • data層のrepositoryで定義したメソッドを複数組み合わせる場合はapplication層のserviceで定義する
  • 純粋なStreamメソッドではなく、内部でFutureなメソッドを使用した上で最後にStreamを返すメソッドのテストをしたい
  • mockの作成にはmockitoを使用する

使用パッケージ

  • riverpod
  • mockito

https://riverpod.dev

https://pub.dev/packages/riverpod

https://pub.dev/packages/mockito

実装について

例題として、以下の機能を実装します。
ちょっと無理やりな内容で恐縮です。

  1. TODOを作成できる(と仮定する)
  2. TODOには最終的にユーザー情報を保存する(と仮定する)
  3. TODOが作成されるたびに、作成したユーザー名リストをリアルタイムで取得できる

構成

lib
├── applications
│   └── services
│       └── user_todo
│           ├── provider.dart
│           ├── provider.g.dart
│           └── service.dart
├── data
│   └── repositories
│       ├── todo
│       │   ├── provider.dart
│       │   ├── provider.g.dart
│       │   └── repository.dart
│       └── user
│           ├── provider.dart
│           ├── provider.g.dart
│           └── repository.dart
└── main.dart

Streamのテスト

Streamのテストを行う際にはいくつか手法があるのですが、今回は表題のStreamQueueのメリットを理解するためにもそれ以外の方法であるexpectLaterを使ったStreamのテストを紹介します。

例題としてはTodoRepositorywatchメソッドのテストを使って解説します。

setUpなどの下準備

void main() {
  late ProviderContainer container;

  setUp(() {
    container = ProviderContainer();
  });

  tearDown(() {
    container.dispose();
  });

  group('watch', () {
    // test
  });
}

riverpodを使ってDIを行っている場合ProviderContainer経由でクラスにアクセスすることができます。

expectLaterを使ったStreamのテスト

TodoRepositoryの実装


TodoRepository todoRepository(Ref ref) => TodoRepository(ref);
class TodoRepository {
  TodoRepository(this.ref);

  final Ref ref;

  final _controller = StreamController<List<int>>.broadcast();

  void streamClose() => _controller.close();

  /// todoを監視して、作成されたtodoのIDを返す
  Stream<List<int>> watch() => _controller.stream;

  /// todoを作成する
  ///
  /// テスト用として、作成したTODOのIDと作成済みのIDは引数で入れる
  void create({required int createId, required List<int> currentTodoIds}) {
    // TODOを作成して保存し、IDを生成
    final createTodoId = createId;

    // 全体のリストと仮定
    final currentTodos = currentTodoIds;

    // 作成した全体のtodo
    final updateTodos = [...currentTodos, createTodoId];
    // 作成したtodoを流す
    _controller.add(updateTodos);
  }
}

テストコード

    test('【成功するテスト expectLater】作成されたtodoを受け取ることができる', () {
      final repository = container.read(todoRepositoryProvider);

      final stream = repository.watch();

      expectLater(
        stream,
        emitsInOrder([
          [0],
          [0, 1],
        ]),
      );

      repository
        ..create(createId: 0, currentTodoIds: [])
        ..create(createId: 1, currentTodoIds: [0])
        ..streamClose();
    });

解説

まずはexpectLaterを使った場合です。テストの流れは以下です。

  1. Streamの購読を開始する
  2. expectLaterで検証を宣言する
  3. 値を流す(今回はcreateメソッドで作成して値を流している)
  4. Streamを閉じる

expectLaterは文字通り、expectLaterするものです。後から検証するということですね。
通常のexpectは何かを実行してその後に結果を検証しています。
しかし、Streamの場合は値が流れていってしまうことから最初に検証結果を宣言しておく必要があります。

また、watchで取得できる値はList<int>なのですが、検証内容に関してはStreamはそのままでは扱えないのでemitsInOrderというマッチャーで囲む必要があります。
このマッチャーでは配列で囲んでいるので、Streamに流れてきた値の順番通りに検証する値を入れてあげれば良いです。

ちなみに同期的に、つまり順番に実行すると失敗します。
以下の順番でやってみたとします。

  1. Streamの購読を開始する
  2. 値を流す(今回はcreateメソッドで作成して値を流している)
  3. expectで検証を宣言する
  4. Streamを閉じる
    test('【失敗するテスト】作成されたtodoを受け取ることができる', () {
      final repository = container.read(todoRepositoryProvider);

      final stream = repository.watch();

      repository.create(createId: 0, currentTodoIds: []);

      expect(stream, emits([0]));

      repository.streamClose();

      // Expected: should emit an event that [0]
      // Actual: <Instance of '_BroadcastStream<List<int>>'>
      // Which: emitted x Stream closed.
    });

エラーメッセージは「検証する前にStreamが閉じられて検証できなかった」と出ています。

でもできればテストの順番としては値を流して検証するとした方が理解しやすいですよね?
そこで登場するのがStreamQueueです。

StreamQueueを使ったStreamのテスト

TodoRepositoryの実装(再掲)

TodoRepository todoRepository(Ref ref) => TodoRepository(ref);
class TodoRepository {
  TodoRepository(this.ref);

  final Ref ref;

  final _controller = StreamController<List<int>>.broadcast();

  void streamClose() => _controller.close();

  /// todoを監視して、作成されたtodoのIDを返す
  Stream<List<int>> watch() => _controller.stream;

  /// todoを作成する
  ///
  /// テスト用として、作成したTODOのIDと作成済みのIDは引数で入れる
  void create({required int createId, required List<int> currentTodoIds}) {
    // TODOを作成して保存し、IDを生成
    final createTodoId = createId;

    // 全体のリストと仮定
    final currentTodos = currentTodoIds;

    // 作成した全体のtodo
    final updateTodos = [...currentTodos, createTodoId];
    // 作成したtodoを流す
    _controller.add(updateTodos);
  }
}

テストコード

    test('【成功するテスト StreamQueue】作成されたtodoを受け取ることができる', () async {
      final repository = container.read(todoRepositoryProvider);

      final streamQueue = StreamQueue(repository.watch());

      var currentTodoIds = <int>[];

      repository.create(createId: 0, currentTodoIds: currentTodoIds);
      currentTodoIds = [0];

      final result = await streamQueue.next;

      expect(result, currentTodoIds);

      repository.create(createId: 1, currentTodoIds: currentTodoIds);
      currentTodoIds = [0, 1];

      final result2 = await streamQueue.next;

      expect(result2, currentTodoIds);

      repository.streamClose();
      await streamQueue.cancel();
    });

解説

検証の流れは以下です。

  1. StreamQueueの中でStreamの購読を開始する
  2. 値を流す
  3. 流された値を受け取るためにStreamQueueの中の世界を進めて、その結果を変数で受け取る
  4. 1回目の検証をする
  5. 更に次の値を流す
  6. 更に次の値を受け取るためにStreamQueueの中の世界を進めて、その結果を変数で受け取る
  7. 2回目の検証をする
  8. Streamを閉じる
  9. StreamQueueでの購読を終了する

StreamQueueはDartのasyncパッケージに含まれるユーティリティで、Streamのデータを「キュー(Queue)」として扱うためのクラスです。
通常のStreamはデータが流れてくるたびにリッスンする必要がありますが、StreamQueue を使うとリクエストに応じて要素を取得できるようになります。

最初のexpectLaterに比べてだいぶ実際の流れに沿ったテストコードになったのではないでしょうか?
デメリットとしてはコード量が増えることです。
しかし、私としては処理の順番と検証順番が理解しやすいのでこちらの方が好みです。

非同期な処理を内包するStreamをテストする

ここではUserTodoServicewatchメソッドを例に取って非同期の処理を内包したStreamをテストしてみます。
前項のTodoRepositorywatchメソッドのテストの時と違う点はrepositoryをmockをしてメソッドの処理をスタブ化することです。

mockの準備

([
  MockSpec<TodoRepository>(),
  MockSpec<UserRepository>(),
])
void main() {}

mockのファイルは専用のものを用意しておくと一箇所だけで生成すればいいのでおすすめです。

テストの準備

void main() {
  late ProviderContainer container;
  final userRepository = MockUserRepository();
  final todoRepository = MockTodoRepository();

  setUp(() {
    reset(userRepository);
    reset(todoRepository);

    container = ProviderContainer(
      overrides: [
        userRepositoryProvider.overrideWithValue(userRepository),
        todoRepositoryProvider.overrideWithValue(todoRepository),
      ],
    );
  });

  tearDown(() {
    container.dispose();
  });

  group('watch', () {
    // テストの実装
  });
}

StreamQueueとmockitoを併用したテスト

実装のコード

UserRepositoryの実装

UserRepository userRepository(Ref ref) => UserRepository(ref);
class UserRepository {
  UserRepository(this.ref);

  final Ref ref;
  Future<List<String>> findAll({required List<int> todoIds}) async {
    // 100ミリ秒の遅延
    await Future<void>.delayed(const Duration(milliseconds: 100));
    // IDに対応するユーザー名を返す
    final users = todoIds.map(_getUser).toList();

    return users;
  }

  String _getUser(int id) {
    switch (id) {
      case 0:
        return 'Alice';
      case 1:
        return 'Bob';
      default:
        return 'Unknown';
    }
  }
}
TodoRepositoryの実装(再掲)

TodoRepository todoRepository(Ref ref) => TodoRepository(ref);
class TodoRepository {
  TodoRepository(this.ref);

  final Ref ref;

  final _controller = StreamController<List<int>>.broadcast();

  void streamClose() => _controller.close();

  /// todoを監視して、作成されたtodoのIDを返す
  Stream<List<int>> watch() => _controller.stream;

  /// todoを作成する
  ///
  /// テスト用として、作成したTODOのIDと作成済みのIDは引数で入れる
  void create({required int createId, required List<int> currentTodoIds}) {
    // TODOを作成して保存し、IDを生成
    final createTodoId = createId;

    // 全体のリストと仮定
    final currentTodos = currentTodoIds;

    // 作成した全体のtodo
    final updateTodos = [...currentTodos, createTodoId];
    // 作成したtodoを流す
    _controller.add(updateTodos);
  }
}

UserTodoService userTodoService(Ref ref) => UserTodoService(ref);
class UserTodoService {
  UserTodoService(this.ref);

  final Ref ref;

  UserRepository get userRepository => ref.read(userRepositoryProvider);

  TodoRepository get todoRepository => ref.read(todoRepositoryProvider);

  /// 単純なStreamではなく、内部で非同期な処理の結果をStreamに流している
  Stream<List<String>> watch() async* {
    // Streamの購読
    final todoStream = todoRepository.watch();

    // await forでStreamが流れてくるのを待機
    await for (final todoIds in todoStream) {
      // 流れてきたStreamの値を使って非同期な処理を行い
      final users = await userRepository.findAll(todoIds: todoIds);
      // その結果を返す
      yield users;
    }
  }
}
    test('Todo情報を作ったUserの最新情報を取得できる', () async {
      final todoController = StreamController<List<int>>();

      when(todoRepository.watch()).thenAnswer((_) => todoController.stream);
      when(userRepository.findAll(todoIds: anyNamed('todoIds')))
          .thenAnswer((invocation) async {
        final todoIds =
            invocation.namedArguments[const Symbol('todoIds')] as List<int>;
        final users = todoIds.map((id) {
          switch (id) {
            case 0:
              return 'Alice';
            case 1:
              return 'Bob';
            default:
              return 'Unknown';
          }
        }).toList();
        return users;
      });

      final service = container.read(userTodoServiceProvider);

      final streamQueue = StreamQueue(service.watch());

      todoController.add([0]);

      final result = await streamQueue.next;

      expect(result, ['Alice']);

      todoController.add([0, 1]);

      final result2 = await streamQueue.next;

      expect(result2, ['Alice', 'Bob']);

      await streamQueue.cancel();
      await todoController.close();

      verify(todoRepository.watch()).called(1);
      verify(userRepository.findAll(todoIds: [0])).called(1);
      verify(userRepository.findAll(todoIds: [0, 1])).called(1);
    });

解説

まず、大まかな検証の流れは以下です。

  1. 2.でスタブ化するwatchメソッド用のStreamを模倣するためにStreamControllerを作る
  2. TodoRepositorywatchメソッドのスタブを作成する
  3. UserRepositoryfindAllメソッドのスタブを作成する
  4. StreamQueueservice.watchの購読を囲む
  5. StreamControllerで値を流す
  6. 流された値を受け取るためにStreamQueueの中の世界を進めて、その結果を変数で受け取る
  7. 1回目の検証をする
  8. StreamControllerで次の値を流す
  9. 次に流された値を受け取るためにStreamQueueの中の世界を進めて、その結果を変数で受け取る
  10. 2回目の検証をする
  11. StreamQueueでの購読を終了する
  12. Streamを閉じる
  13. スタブ化したメソッドたちが呼ばれているかverifyで検証する

5~10の内容は前項と同じなので説明は割愛します。
それ以外の要点や注意事項を解説します。

スタブ化

TodoRepositoryのwatch
前回のTodoRepositorywatchメソッドのテストと違うのは今回は実装をモックしている点です。
スタブ化したメソッドのStreamを固定してしまうこともできますが、そうすると動的な検証ができません。
動的な検証とは、値を追加して変化することを確認すること指しています。

// 値が固定されてしまう
when(todoRepository.watch()).thenAnswer((_) => Stream.value([0]));

そこでStreamControllerを作ってwatchメソッドの戻り値にそのstreamを割り当てます。
こうすることでテスト内で値を任意に流すことが可能になります。

final todoController = StreamController<List<int>>();

when(todoRepository.watch()).thenAnswer((_) => todoController.stream);

UserRepositoryのfindAll

次にUserRepositoryfindAllですが、こちらの戻り値も動的に変更できるようにしたいです。
findAllは引数の値によって戻り値であるList<String>(Userの名前のリスト)が動的に変わることになっています。
UserTodoServicewatchメソッドの実装をみると、先ほど挙げたTodoRepositorywatchメソッドで取得したtodoIdsを受けとったfindAllが非同期な処理を行って得た結果を最終的にはStreamとして流しています。

  Stream<List<String>> watch() async* {
    final todoStream = todoRepository.watch();

    // TODOが更新されると最新のTODOのIDが流れてくる
    await for (final todoIds in todoStream) {
      // TODOのIDを使ってUserのリストを返却する
      final users = await userRepository.findAll(todoIds: todoIds);
      yield users;
    }
  }

そこで引数にどんな値が入ってきたかによって返却する値を変えるためにthenAnswerinvocationを使って引数の値にアクセスします。

when(userRepository.findAll(todoIds: anyNamed('todoIds')))
  .thenAnswer((invocation) async {
final todoIds =
    invocation.namedArguments[const Symbol('todoIds')] as List<int>;

    // 省略
});

namedArguments名前付き引数を指定しています。ここにSymbolでラップした引数名を渡します。
ここでは引数名である"todoIds"をラップして渡しています。
更に戻り値はList<int>であること型変換してあげれば取得は完了です。
今回は配列で返すのでその戻り値をmapすれば動的なスタブ化の完了です。

余談ですが、位置引数(名前なしの引数)の場合はpositionalArgumentsを使ってアクセスできます。

when(userRepository.findAll(any)).thenAnswer((invocation) async {
  // positionalArgumentsでその順番にアクセスし、型をキャストする
  final todoIds = invocation.positionalArguments[0] as List<int>;

  // 省略
});

テストがタイムアウトしてまう時の確認項目

StreamController

筆者が陥った失敗の一つにbroadcastにして定義したことです。

final todoController = StreamController<List<int>>.broadcast(); // ❌Bad

これだとなぜかストリームがうまく流れなくなります。
ブレークポイントを張ってみると、実装側ではawait forのところまできてそこから値が流れてきません。

  Stream<List<String>> watch() async* {
    final todoStream = todoRepository.watch();

    await for (final todoIds in todoStream) {
      // この中に入る前にローディングしっぱなしで進まなくなる
      final users = await userRepository.findAll(todoIds: todoIds);
      yield users;
    }
  }

これはmockしている場合はStreamの購読対象は一つだけだからと思われます。
テストの世界の中だけtodoControllerのStreamを購読しているので、わざわざbroadcastにして複数に対して公開しなくて良いということでしょう。

StreamとStreamQueueを閉じる順番

TodoRepositoryのテストの時は気にしなくてよかったのですが、今回はbroadcastを外したStreamであることからか、閉じる順番を間違えるとテストがタイムアウトで失敗します。
ストリームを先に閉じてしまうと、StreamQueueの中でストリームが急になくなってしまうことで当然読み込めなくなり、永遠に読み込みに入ってしまう事象に陥ってしまうためです。

// ⭕️Good

// 購読している方を先に閉じる
await streamQueue.cancel();
// ストリームを配信している方を後から閉じる
await todoController.close();

終わりに

今回の記事ではStreamのテストをDart標準のStreamQueueで行う方法と、mockitoを併用したスタブ化によるテスト手法について解説しました。

初学者にとってテストを書くのは難しいのですが、更にStreamのテストは非常に複雑で理解がしづらいです。しかし、StreamQueueを使えば初学者だけに限らず多くのエンジニアにとって読みやすいテストコードを実装することが可能です。
ぜひ皆さんも取り入れてみてください。

この記事が誰かのお役に立てれば幸いです

1

Discussion

ログインするとコメントできます