iTranslated by AI
[Flutter] Overcoming Stream Testing Challenges with StreamQueue
Introduction
What I struggled with most when I started working was testing involving Streams.
For beginners, Streams can be quite challenging to understand and master, even just to implement them.
And even if you manage to implement them, testing often proves difficult...
That's when I encountered StreamQueue, one of Dart's package features.
I was able to overcome these challenges using StreamQueue, so I'd like to document it as a memo. This article also touches on stubbing Stream methods when using mockito.
Target Audience
- Those who can implement using Stream
- Those struggling with testing implementations involving Streams
- Those who want to know how to test Streams with mockito involved
- Those with a certain level of knowledge of Riverpod
- Those with a certain level of knowledge of mockito
Author's environment at the time of writing
[✓] Flutter (Channel stable, 3.27.1, on macOS 15.1 24B2082 darwin-arm64, locale ja-JP)
[✓] Android toolchain - develop for Android devices (Android SDK version 35.0.0)
[✓] Xcode - develop for iOS and macOS (Xcode 16.1)
[✓] Chrome - develop for the web
[✓] Android Studio (version 2024.2)
[✓] VS Code (version 1.96.2)
Source Code
I've included code snippets for implementation in the article as appropriate, but if they are hard to read, please refer to the link below.
Prerequisites
- Implementing DI (Dependency Injection) using riverpod and layered architecture
- riverpod is implemented with code generation
- If multiple methods defined in the
datalayer's repository are combined, they are defined in theapplicationlayer's service - Want to test methods that are not pure Stream methods, but internally use Future methods and finally return a Stream
- mockito is used for creating mocks
Packages Used
- riverpod
- mockito
About the Implementation
As an example, we will implement the following features.
I apologize if the content is a bit forced.
- Can create TODOs (assumed)
- User information will eventually be saved in TODOs (assumed)
- A list of created usernames can be obtained in real-time each time a TODO is created
Structure
lib
├── applications
│ └── services
│ └── user_todo
│ ├── provider.dart
│ ├── provider.g.dart
│ └── service.dart
├── data
│ └── repositories
│ ├── todo
│ │ ├── provider.dart
│ │ ├── provider.g.dart
│ │ └── repository.dart
│ └── user
│ ├── provider.dart
│ ├── provider.g.dart
│ └── repository.dart
└── main.dart
Stream Testing
There are several ways to test Streams, but this time, to understand the benefits of StreamQueue mentioned in the title, I will introduce Stream testing using expectLater.
As an example, I will explain using the watch method test of TodoRepository.
Preparation like setUp
void main() {
late ProviderContainer container;
setUp(() {
container = ProviderContainer();
});
tearDown(() {
container.dispose();
});
group('watch', () {
// test
});
}
When using riverpod for DI, you can access classes via ProviderContainer.
Stream Testing using expectLater
TodoRepository Implementation
@riverpod
TodoRepository todoRepository(Ref ref) => TodoRepository(ref);
class TodoRepository {
TodoRepository(this.ref);
final Ref ref;
final _controller = StreamController<List<int>>.broadcast();
void streamClose() => _controller.close();
/// Monitors todos and returns the IDs of created todos
Stream<List<int>> watch() => _controller.stream;
/// Creates a todo
///
/// For testing purposes, the created TODO ID and existing IDs are passed as arguments
void create({required int createId, required List<int> currentTodoIds}) {
// Create and save TODO, generate ID
final createTodoId = createId;
// Assume this is the entire list
final currentTodos = currentTodoIds;
// All created todos
final updateTodos = [...currentTodos, createTodoId];
// Emit the created todo
_controller.add(updateTodos);
}
}
Test Code
test('【Successful Test expectLater】Can receive created todo', () {
final repository = container.read(todoRepositoryProvider);
final stream = repository.watch();
expectLater(
stream,
emitsInOrder([
[0],
[0, 1],
]),
);
repository
..create(createId: 0, currentTodoIds: [])
..create(createId: 1, currentTodoIds: [0])
..streamClose();
});
Explanation
First, let's look at the case using expectLater. The test flow is as follows:
- Start subscribing to the Stream
- Declare verification with
expectLater - Emit values (in this case, values are emitted by creating them with the
createmethod) - Close the Stream
expectLater literally means to expect something Later. It means to verify later.
Normal expect executes something and then verifies the result.
However, in the case of Streams, since values flow, it is necessary to declare the verification results in advance.
Also, the value obtained by watch is List<int>, but the verification content cannot be handled directly by Stream, so it must be enclosed by an emitsInOrder matcher.
Since this matcher is enclosed in an array, you just need to put the values to be verified in the order they flow into the Stream.
By the way, if you execute synchronously, i.e., in order, it will fail.
Let's say you try the following order:
- Start subscribing to the Stream
- Emit values (in this case, values are emitted by creating them with the
createmethod) - Declare verification with
expect - Close the Stream
test('【Failing Test】Can receive created todo', () {
final repository = container.read(todoRepositoryProvider);
final stream = repository.watch();
repository.create(createId: 0, currentTodoIds: []);
expect(stream, emits([0]));
repository.streamClose();
// Expected: should emit an event that [0]
// Actual: <Instance of '_BroadcastStream<List<int>>>'
// Which: emitted x Stream closed.
});
The error message says "Stream was closed before verification and could not be verified."
But wouldn't it be easier to understand the test if the order was emit values and then verify?
That's where StreamQueue comes in.
Stream Testing using StreamQueue
TodoRepository Implementation (reprint)
@riverpod
TodoRepository todoRepository(Ref ref) => TodoRepository(ref);
class TodoRepository {
TodoRepository(this.ref);
final Ref ref;
final _controller = StreamController<List<int>>.broadcast();
void streamClose() => _controller.close();
/// Monitors todos and returns the IDs of created todos
Stream<List<int>> watch() => _controller.stream;
/// Creates a todo
///
/// For testing purposes, the created TODO ID and existing IDs are passed as arguments
void create({required int createId, required List<int> currentTodoIds}) {
// Create and save TODO, generate ID
final createTodoId = createId;
// Assume this is the entire list
final currentTodos = currentTodoIds;
// All created todos
final updateTodos = [...currentTodos, createTodoId];
// Emit the created todo
_controller.add(updateTodos);
}
}
Test Code
test('【Successful Test StreamQueue】Can receive created todo', () async {
final repository = container.read(todoRepositoryProvider);
final streamQueue = StreamQueue(repository.watch());
var currentTodoIds = <int>[];
repository.create(createId: 0, currentTodoIds: currentTodoIds);
currentTodoIds = [0];
final result = await streamQueue.next;
expect(result, currentTodoIds);
repository.create(createId: 1, currentTodoIds: currentTodoIds);
currentTodoIds = [0, 1];
final result2 = await streamQueue.next;
expect(result2, currentTodoIds);
repository.streamClose();
await streamQueue.cancel();
});
Explanation
The verification flow is as follows:
- Start subscribing to the Stream within
StreamQueue - Emit values
- Advance the world within
StreamQueueto receive the emitted values, and store the result in a variable - Perform the first verification
- Emit the next values
- Advance the world within
StreamQueueagain to receive the next values, and store the result in a variable - Perform the second verification
- Close the Stream
- End subscription with
StreamQueue
StreamQueue is a utility included in Dart's async package, a class for treating Stream data as a "Queue."
While regular Streams require listening every time data flows, StreamQueue allows you to retrieve elements on request.
Compared to the initial expectLater approach, the test code is now much closer to the actual flow, isn't it?
A downside is the increased code volume.
However, I personally prefer this as the order of processing and verification is easier to understand.
Testing a Stream that contains asynchronous processing
Here, we will use the watch method of UserTodoService as an example to test a Stream that contains asynchronous processing.
The difference from the watch method test of TodoRepository in the previous section is that we will mock the repository and stub the method's processing.
Preparing Mocks
@GenerateNiceMocks([
MockSpec<TodoRepository>(),
MockSpec<UserRepository>(),
])
void main() {}
It is recommended to prepare a dedicated file for mocks so that they can be generated from a single location.
Preparing the Test
void main() {
late ProviderContainer container;
final userRepository = MockUserRepository();
final todoRepository = MockTodoRepository();
setUp(() {
reset(userRepository);
reset(todoRepository);
container = ProviderContainer(
overrides: [
userRepositoryProvider.overrideWithValue(userRepository),
todoRepositoryProvider.overrideWithValue(todoRepository),
],
);
});
tearDown(() {
container.dispose();
});
group('watch', () {
// Test implementation
});
}
Test using StreamQueue and mockito together
Implementation Code
UserRepository Implementation
@riverpod
UserRepository userRepository(Ref ref) => UserRepository(ref);
class UserRepository {
UserRepository(this.ref);
final Ref ref;
Future<List<String>> findAll({required List<int> todoIds}) async {
// 100 milliseconds delay
await Future<void>.delayed(const Duration(milliseconds: 100));
// Returns usernames corresponding to IDs
final users = todoIds.map(_getUser).toList();
return users;
}
String _getUser(int id) {
switch (id) {
case 0:
return 'Alice';
case 1:
return 'Bob';
default:
return 'Unknown';
}
}
}
TodoRepository Implementation (reprint)
@riverpod
TodoRepository todoRepository(Ref ref) => TodoRepository(ref);
class TodoRepository {
TodoRepository(this.ref);
final Ref ref;
final _controller = StreamController<List<int>>.broadcast();
void streamClose() => _controller.close();
/// Monitors todos and returns the IDs of created todos
Stream<List<int>> watch() => _controller.stream;
/// Creates a todo
///
/// For testing purposes, the created TODO ID and existing IDs are passed as arguments
void create({required int createId, required List<int> currentTodoIds}) {
// Create and save TODO, generate ID
final createTodoId = createId;
// Assume this is the entire list
final currentTodos = currentTodoIds;
// All created todos
final updateTodos = [...currentTodos, createTodoId];
// Emit the created todo
_controller.add(updateTodos);
}
}
@riverpod
UserTodoService userTodoService(Ref ref) => UserTodoService(ref);
class UserTodoService {
UserTodoService(this.ref);
final Ref ref;
UserRepository get userRepository => ref.read(userRepositoryProvider);
TodoRepository get todoRepository => ref.read(todoRepositoryProvider);
/// This is not a simple Stream, but flows the result of asynchronous processing internally to the Stream
Stream<List<String>> watch() async* {
// Stream subscription
final todoStream = todoRepository.watch();
// Await for the Stream to flow with await for
await for (final todoIds in todoStream) {
// Perform asynchronous processing using the flowed Stream values
final users = await userRepository.findAll(todoIds: todoIds);
// Return the result
yield users;
}
}
}
test('Can get the latest information of the User who created the Todo', () async {
final todoController = StreamController<List<int>>();
when(todoRepository.watch()).thenAnswer((_) => todoController.stream);
when(userRepository.findAll(todoIds: anyNamed('todoIds')))
.thenAnswer((invocation) async {
final todoIds =
invocation.namedArguments[const Symbol('todoIds')] as List<int>;
final users = todoIds.map((id) {
switch (id) {
case 0:
return 'Alice';
case 1:
return 'Bob';
default:
return 'Unknown';
}
}).toList();
return users;
});
final service = container.read(userTodoServiceProvider);
final streamQueue = StreamQueue(service.watch());
todoController.add([0]);
final result = await streamQueue.next;
expect(result, ['Alice']);
todoController.add([0, 1]);
final result2 = await streamQueue.next;
expect(result2, ['Alice', 'Bob']);
await streamQueue.cancel();
await todoController.close();
verify(todoRepository.watch()).called(1);
verify(userRepository.findAll(todoIds: [0])).called(1);
verify(userRepository.findAll(todoIds: [0, 1])).called(1);
});
Explanation
First, the general flow of verification is as follows:
- Create a
StreamControllerto mimic the Stream for thewatchmethod, which will be stubbed in step 2 - Create a stub for the
TodoRepository'swatchmethod - Create a stub for the
UserRepository'sfindAllmethod - Wrap the subscription of
service.watchwithStreamQueue - Emit values using
StreamController - Advance the world within
StreamQueueto receive the emitted values, and store the result in a variable - Perform the first verification
- Emit the next values using
StreamController - Advance the world within
StreamQueueagain to receive the next emitted values, and store the result in a variable - Perform the second verification
- End subscription with
StreamQueue - Close the Stream
- Verify that the stubbed methods were called using
verify
The content of steps 5-10 is the same as in the previous section, so I will omit the explanation.
I will explain other key points and precautions.
Stubbing
TodoRepository's watch
The difference from the previous TodoRepository watch method test is that this time the implementation is mocked.
While it's possible to fix the Stream of the stubbed method, it prevents dynamic verification.
Dynamic verification refers to confirming changes by adding values.
// Value becomes fixed
when(todoRepository.watch()).thenAnswer((_) => Stream.value([0]));
Therefore, we create a StreamController and assign its stream to the return value of the watch method.
This allows us to emit values arbitrarily within the test.
final todoController = StreamController<List<int>>();
when(todoRepository.watch()).thenAnswer((_) => todoController.stream);
UserRepository's findAll
Next, for UserRepository's findAll, we want its return value to also be dynamically changeable.
findAll is set up so that its return value, List<String> (a list of User names), changes dynamically based on the argument values.
Looking at the implementation of UserTodoService's watch method, it flows the todoIds obtained from TodoRepository's watch method, performs asynchronous processing with findAll using those todoIds, and finally streams the result.
Stream<List<String>> watch() async* {
final todoStream = todoRepository.watch();
// When TODO is updated, the latest TODO IDs will flow
await for (final todoIds in todoStream) {
// Returns a list of Users using the TODO IDs
final users = await userRepository.findAll(todoIds: todoIds);
yield users;
}
}
Therefore, to change the return value depending on what argument values are passed, we access the argument values using thenAnswer's invocation.
when(userRepository.findAll(todoIds: anyNamed('todoIds')))
.thenAnswer((invocation) async {
final todoIds =
invocation.namedArguments[const Symbol('todoIds')] as List<int>;
// omitted
});
namedArguments specifies named arguments. We pass the argument name wrapped with Symbol here.
In this case, we wrap and pass the argument name "todoIds".
Furthermore, the return value is List<int>, so we can get it by casting the type.
Since we are returning an array this time, mapping its return value completes the dynamic stubbing.
As a side note, for positional arguments (unnamed arguments), you can access them using positionalArguments.
when(userRepository.findAll(any)).thenAnswer((invocation) async {
// Access by order using positionalArguments and cast the type
final todoIds = invocation.positionalArguments[0] as List<int>;
// omitted
});
Checkpoints when tests time out
StreamController
One mistake I made was defining it as broadcast.
final todoController = StreamController<List<int>>.broadcast(); // ❌Bad
For some reason, the stream doesn't flow correctly with this.
If I set a breakpoint, the implementation side reaches await for, but then no values flow from there.
Stream<List<String>> watch() async* {
final todoStream = todoRepository.watch();
await for (final todoIds in todoStream) {
// Before entering here, it gets stuck in loading and doesn't proceed
final users = await userRepository.findAll(todoIds: todoIds);
yield users;
}
}
This is probably because when mocking, there is only one Stream subscriber.
Since the todoController's Stream is only subscribed to within the test world, there's no need to make it a broadcast for multiple subscribers.
Order of closing Stream and StreamQueue
While it wasn't a concern during the TodoRepository test, because this is a Stream without broadcast, if the closing order is incorrect, the test will fail with a timeout.
This is because if the stream is closed first, the StreamQueue suddenly loses the stream, making it impossible to read, and causing it to get stuck in an eternal read state.
// ⭕️Good
// Close the subscriber first
await streamQueue.cancel();
// Close the stream publisher afterwards
await todoController.close();
Conclusion
In this article, I explained how to test Streams using Dart's standard StreamQueue and a testing method involving stubbing with mockito.
It is difficult for beginners to write tests, and Stream testing is even more complex and hard to understand. However, using StreamQueue makes it possible to implement readable test code not only for beginners but for many engineers. I encourage everyone to adopt it.
I hope this article is helpful to someone
Discussion