iTranslated by AI

The content below is an AI-generated translation. This is an experimental feature, and may contain errors. View original article
🌊

[Flutter] Overcoming Stream Testing Challenges with StreamQueue

に公開

Introduction

What I struggled with most when I started working was testing involving Streams.
For beginners, Streams can be quite challenging to understand and master, even just to implement them.
And even if you manage to implement them, testing often proves difficult...
That's when I encountered StreamQueue, one of Dart's package features.

https://api.flutter.dev/flutter/async/StreamQueue-class.html

I was able to overcome these challenges using StreamQueue, so I'd like to document it as a memo. This article also touches on stubbing Stream methods when using mockito.

Target Audience

  • Those who can implement using Stream
  • Those struggling with testing implementations involving Streams
  • Those who want to know how to test Streams with mockito involved
  • Those with a certain level of knowledge of Riverpod
  • Those with a certain level of knowledge of mockito

Author's environment at the time of writing

[✓] Flutter (Channel stable, 3.27.1, on macOS 15.1 24B2082 darwin-arm64, locale ja-JP)
[✓] Android toolchain - develop for Android devices (Android SDK version 35.0.0)
[✓] Xcode - develop for iOS and macOS (Xcode 16.1)
[✓] Chrome - develop for the web
[✓] Android Studio (version 2024.2)
[✓] VS Code (version 1.96.2)

Source Code

I've included code snippets for implementation in the article as appropriate, but if they are hard to read, please refer to the link below.

https://github.com/HaruhikoMotokawa/stream_test_sample/tree/main

Prerequisites

  • Implementing DI (Dependency Injection) using riverpod and layered architecture
  • riverpod is implemented with code generation
  • If multiple methods defined in the data layer's repository are combined, they are defined in the application layer's service
  • Want to test methods that are not pure Stream methods, but internally use Future methods and finally return a Stream
  • mockito is used for creating mocks

Packages Used

  • riverpod
  • mockito

https://riverpod.dev

https://pub.dev/packages/riverpod

https://pub.dev/packages/mockito

About the Implementation

As an example, we will implement the following features.
I apologize if the content is a bit forced.

  1. Can create TODOs (assumed)
  2. User information will eventually be saved in TODOs (assumed)
  3. A list of created usernames can be obtained in real-time each time a TODO is created

Structure

lib
├── applications
│   └── services
│       └── user_todo
│           ├── provider.dart
│           ├── provider.g.dart
│           └── service.dart
├── data
│   └── repositories
│       ├── todo
│       │   ├── provider.dart
│       │   ├── provider.g.dart
│       │   └── repository.dart
│       └── user
│           ├── provider.dart
│           ├── provider.g.dart
│           └── repository.dart
└── main.dart

Stream Testing

There are several ways to test Streams, but this time, to understand the benefits of StreamQueue mentioned in the title, I will introduce Stream testing using expectLater.

As an example, I will explain using the watch method test of TodoRepository.

Preparation like setUp

void main() {
  late ProviderContainer container;

  setUp(() {
    container = ProviderContainer();
  });

  tearDown(() {
    container.dispose();
  });

  group('watch', () {
    // test
  });
}

When using riverpod for DI, you can access classes via ProviderContainer.

Stream Testing using expectLater

TodoRepository Implementation

@riverpod
TodoRepository todoRepository(Ref ref) => TodoRepository(ref);
class TodoRepository {
  TodoRepository(this.ref);

  final Ref ref;

  final _controller = StreamController<List<int>>.broadcast();

  void streamClose() => _controller.close();

  /// Monitors todos and returns the IDs of created todos
  Stream<List<int>> watch() => _controller.stream;

  /// Creates a todo
  ///
  /// For testing purposes, the created TODO ID and existing IDs are passed as arguments
  void create({required int createId, required List<int> currentTodoIds}) {
    // Create and save TODO, generate ID
    final createTodoId = createId;

    // Assume this is the entire list
    final currentTodos = currentTodoIds;

    // All created todos
    final updateTodos = [...currentTodos, createTodoId];
    // Emit the created todo
    _controller.add(updateTodos);
  }
}

Test Code

    test('【Successful Test expectLater】Can receive created todo', () {
      final repository = container.read(todoRepositoryProvider);

      final stream = repository.watch();

      expectLater(
        stream,
        emitsInOrder([
          [0],
          [0, 1],
        ]),
      );

      repository
        ..create(createId: 0, currentTodoIds: [])
        ..create(createId: 1, currentTodoIds: [0])
        ..streamClose();
    });

Explanation

First, let's look at the case using expectLater. The test flow is as follows:

  1. Start subscribing to the Stream
  2. Declare verification with expectLater
  3. Emit values (in this case, values are emitted by creating them with the create method)
  4. Close the Stream

expectLater literally means to expect something Later. It means to verify later.
Normal expect executes something and then verifies the result.
However, in the case of Streams, since values flow, it is necessary to declare the verification results in advance.

Also, the value obtained by watch is List<int>, but the verification content cannot be handled directly by Stream, so it must be enclosed by an emitsInOrder matcher.
Since this matcher is enclosed in an array, you just need to put the values to be verified in the order they flow into the Stream.

By the way, if you execute synchronously, i.e., in order, it will fail.
Let's say you try the following order:

  1. Start subscribing to the Stream
  2. Emit values (in this case, values are emitted by creating them with the create method)
  3. Declare verification with expect
  4. Close the Stream
    test('【Failing Test】Can receive created todo', () {
      final repository = container.read(todoRepositoryProvider);

      final stream = repository.watch();

      repository.create(createId: 0, currentTodoIds: []);

      expect(stream, emits([0]));

      repository.streamClose();

      // Expected: should emit an event that [0]
      // Actual: <Instance of '_BroadcastStream<List<int>>>'
      // Which: emitted x Stream closed.
    });

The error message says "Stream was closed before verification and could not be verified."

But wouldn't it be easier to understand the test if the order was emit values and then verify?
That's where StreamQueue comes in.

Stream Testing using StreamQueue

TodoRepository Implementation (reprint)
@riverpod
TodoRepository todoRepository(Ref ref) => TodoRepository(ref);
class TodoRepository {
  TodoRepository(this.ref);

  final Ref ref;

  final _controller = StreamController<List<int>>.broadcast();

  void streamClose() => _controller.close();

  /// Monitors todos and returns the IDs of created todos
  Stream<List<int>> watch() => _controller.stream;

  /// Creates a todo
  ///
  /// For testing purposes, the created TODO ID and existing IDs are passed as arguments
  void create({required int createId, required List<int> currentTodoIds}) {
    // Create and save TODO, generate ID
    final createTodoId = createId;

    // Assume this is the entire list
    final currentTodos = currentTodoIds;

    // All created todos
    final updateTodos = [...currentTodos, createTodoId];
    // Emit the created todo
    _controller.add(updateTodos);
  }
}

Test Code

    test('【Successful Test StreamQueue】Can receive created todo', () async {
      final repository = container.read(todoRepositoryProvider);

      final streamQueue = StreamQueue(repository.watch());

      var currentTodoIds = <int>[];

      repository.create(createId: 0, currentTodoIds: currentTodoIds);
      currentTodoIds = [0];

      final result = await streamQueue.next;

      expect(result, currentTodoIds);

      repository.create(createId: 1, currentTodoIds: currentTodoIds);
      currentTodoIds = [0, 1];

      final result2 = await streamQueue.next;

      expect(result2, currentTodoIds);

      repository.streamClose();
      await streamQueue.cancel();
    });

Explanation

The verification flow is as follows:

  1. Start subscribing to the Stream within StreamQueue
  2. Emit values
  3. Advance the world within StreamQueue to receive the emitted values, and store the result in a variable
  4. Perform the first verification
  5. Emit the next values
  6. Advance the world within StreamQueue again to receive the next values, and store the result in a variable
  7. Perform the second verification
  8. Close the Stream
  9. End subscription with StreamQueue

StreamQueue is a utility included in Dart's async package, a class for treating Stream data as a "Queue."
While regular Streams require listening every time data flows, StreamQueue allows you to retrieve elements on request.

Compared to the initial expectLater approach, the test code is now much closer to the actual flow, isn't it?
A downside is the increased code volume.
However, I personally prefer this as the order of processing and verification is easier to understand.

Testing a Stream that contains asynchronous processing

Here, we will use the watch method of UserTodoService as an example to test a Stream that contains asynchronous processing.
The difference from the watch method test of TodoRepository in the previous section is that we will mock the repository and stub the method's processing.

Preparing Mocks

@GenerateNiceMocks([
  MockSpec<TodoRepository>(),
  MockSpec<UserRepository>(),
])
void main() {}

It is recommended to prepare a dedicated file for mocks so that they can be generated from a single location.

Preparing the Test

void main() {
  late ProviderContainer container;
  final userRepository = MockUserRepository();
  final todoRepository = MockTodoRepository();

  setUp(() {
    reset(userRepository);
    reset(todoRepository);

    container = ProviderContainer(
      overrides: [
        userRepositoryProvider.overrideWithValue(userRepository),
        todoRepositoryProvider.overrideWithValue(todoRepository),
      ],
    );
  });

  tearDown(() {
    container.dispose();
  });

  group('watch', () {
    // Test implementation
  });
}

Test using StreamQueue and mockito together

Implementation Code

UserRepository Implementation
@riverpod
UserRepository userRepository(Ref ref) => UserRepository(ref);
class UserRepository {
  UserRepository(this.ref);

  final Ref ref;
  Future<List<String>> findAll({required List<int> todoIds}) async {
    // 100 milliseconds delay
    await Future<void>.delayed(const Duration(milliseconds: 100));
    // Returns usernames corresponding to IDs
    final users = todoIds.map(_getUser).toList();

    return users;
  }

  String _getUser(int id) {
    switch (id) {
      case 0:
        return 'Alice';
      case 1:
        return 'Bob';
      default:
        return 'Unknown';
    }
  }
}
TodoRepository Implementation (reprint)
@riverpod
TodoRepository todoRepository(Ref ref) => TodoRepository(ref);
class TodoRepository {
  TodoRepository(this.ref);

  final Ref ref;

  final _controller = StreamController<List<int>>.broadcast();

  void streamClose() => _controller.close();

  /// Monitors todos and returns the IDs of created todos
  Stream<List<int>> watch() => _controller.stream;

  /// Creates a todo
  ///
  /// For testing purposes, the created TODO ID and existing IDs are passed as arguments
  void create({required int createId, required List<int> currentTodoIds}) {
    // Create and save TODO, generate ID
    final createTodoId = createId;

    // Assume this is the entire list
    final currentTodos = currentTodoIds;

    // All created todos
    final updateTodos = [...currentTodos, createTodoId];
    // Emit the created todo
    _controller.add(updateTodos);
  }
}
@riverpod
UserTodoService userTodoService(Ref ref) => UserTodoService(ref);
class UserTodoService {
  UserTodoService(this.ref);

  final Ref ref;

  UserRepository get userRepository => ref.read(userRepositoryProvider);

  TodoRepository get todoRepository => ref.read(todoRepositoryProvider);

  /// This is not a simple Stream, but flows the result of asynchronous processing internally to the Stream
  Stream<List<String>> watch() async* {
    // Stream subscription
    final todoStream = todoRepository.watch();

    // Await for the Stream to flow with await for
    await for (final todoIds in todoStream) {
      // Perform asynchronous processing using the flowed Stream values
      final users = await userRepository.findAll(todoIds: todoIds);
      // Return the result
      yield users;
    }
  }
}
    test('Can get the latest information of the User who created the Todo', () async {
      final todoController = StreamController<List<int>>();

      when(todoRepository.watch()).thenAnswer((_) => todoController.stream);
      when(userRepository.findAll(todoIds: anyNamed('todoIds')))
          .thenAnswer((invocation) async {
        final todoIds =
            invocation.namedArguments[const Symbol('todoIds')] as List<int>;
        final users = todoIds.map((id) {
          switch (id) {
            case 0:
              return 'Alice';
            case 1:
              return 'Bob';
            default:
              return 'Unknown';
          }
        }).toList();
        return users;
      });

      final service = container.read(userTodoServiceProvider);

      final streamQueue = StreamQueue(service.watch());

      todoController.add([0]);

      final result = await streamQueue.next;

      expect(result, ['Alice']);

      todoController.add([0, 1]);

      final result2 = await streamQueue.next;

      expect(result2, ['Alice', 'Bob']);

      await streamQueue.cancel();
      await todoController.close();

      verify(todoRepository.watch()).called(1);
      verify(userRepository.findAll(todoIds: [0])).called(1);
      verify(userRepository.findAll(todoIds: [0, 1])).called(1);
    });

Explanation

First, the general flow of verification is as follows:

  1. Create a StreamController to mimic the Stream for the watch method, which will be stubbed in step 2
  2. Create a stub for the TodoRepository's watch method
  3. Create a stub for the UserRepository's findAll method
  4. Wrap the subscription of service.watch with StreamQueue
  5. Emit values using StreamController
  6. Advance the world within StreamQueue to receive the emitted values, and store the result in a variable
  7. Perform the first verification
  8. Emit the next values using StreamController
  9. Advance the world within StreamQueue again to receive the next emitted values, and store the result in a variable
  10. Perform the second verification
  11. End subscription with StreamQueue
  12. Close the Stream
  13. Verify that the stubbed methods were called using verify

The content of steps 5-10 is the same as in the previous section, so I will omit the explanation.
I will explain other key points and precautions.

Stubbing

TodoRepository's watch
The difference from the previous TodoRepository watch method test is that this time the implementation is mocked.
While it's possible to fix the Stream of the stubbed method, it prevents dynamic verification.
Dynamic verification refers to confirming changes by adding values.

// Value becomes fixed
when(todoRepository.watch()).thenAnswer((_) => Stream.value([0]));

Therefore, we create a StreamController and assign its stream to the return value of the watch method.
This allows us to emit values arbitrarily within the test.

final todoController = StreamController<List<int>>();

when(todoRepository.watch()).thenAnswer((_) => todoController.stream);

UserRepository's findAll

Next, for UserRepository's findAll, we want its return value to also be dynamically changeable.
findAll is set up so that its return value, List<String> (a list of User names), changes dynamically based on the argument values.
Looking at the implementation of UserTodoService's watch method, it flows the todoIds obtained from TodoRepository's watch method, performs asynchronous processing with findAll using those todoIds, and finally streams the result.

  Stream<List<String>> watch() async* {
    final todoStream = todoRepository.watch();

    // When TODO is updated, the latest TODO IDs will flow
    await for (final todoIds in todoStream) {
      // Returns a list of Users using the TODO IDs
      final users = await userRepository.findAll(todoIds: todoIds);
      yield users;
    }
  }

Therefore, to change the return value depending on what argument values are passed, we access the argument values using thenAnswer's invocation.

when(userRepository.findAll(todoIds: anyNamed('todoIds')))
  .thenAnswer((invocation) async {
final todoIds =
    invocation.namedArguments[const Symbol('todoIds')] as List<int>;

    // omitted
});

namedArguments specifies named arguments. We pass the argument name wrapped with Symbol here.
In this case, we wrap and pass the argument name "todoIds".
Furthermore, the return value is List<int>, so we can get it by casting the type.
Since we are returning an array this time, mapping its return value completes the dynamic stubbing.

As a side note, for positional arguments (unnamed arguments), you can access them using positionalArguments.

when(userRepository.findAll(any)).thenAnswer((invocation) async {
  // Access by order using positionalArguments and cast the type
  final todoIds = invocation.positionalArguments[0] as List<int>;

  // omitted
});

Checkpoints when tests time out

StreamController

One mistake I made was defining it as broadcast.

final todoController = StreamController<List<int>>.broadcast(); // ❌Bad

For some reason, the stream doesn't flow correctly with this.
If I set a breakpoint, the implementation side reaches await for, but then no values flow from there.

  Stream<List<String>> watch() async* {
    final todoStream = todoRepository.watch();

    await for (final todoIds in todoStream) {
      // Before entering here, it gets stuck in loading and doesn't proceed
      final users = await userRepository.findAll(todoIds: todoIds);
      yield users;
    }
  }

This is probably because when mocking, there is only one Stream subscriber.
Since the todoController's Stream is only subscribed to within the test world, there's no need to make it a broadcast for multiple subscribers.

Order of closing Stream and StreamQueue

While it wasn't a concern during the TodoRepository test, because this is a Stream without broadcast, if the closing order is incorrect, the test will fail with a timeout.
This is because if the stream is closed first, the StreamQueue suddenly loses the stream, making it impossible to read, and causing it to get stuck in an eternal read state.

// ⭕️Good

// Close the subscriber first
await streamQueue.cancel();
// Close the stream publisher afterwards
await todoController.close();

Conclusion

In this article, I explained how to test Streams using Dart's standard StreamQueue and a testing method involving stubbing with mockito.

It is difficult for beginners to write tests, and Stream testing is even more complex and hard to understand. However, using StreamQueue makes it possible to implement readable test code not only for beginners but for many engineers. I encourage everyone to adopt it.

I hope this article is helpful to someone

Discussion