🎛️

【Dart】distinctを活用してStreamの発火を制御する

に公開

はじめに

変更の通知を行う上で Stream は便利です。
しかし、通常の仕様だと値を流せば問答無用で流し続けてしまいます。
この値を流す条件を設定して値を流す制御を行うことができるのが、 distinct です。

https://api.dart.dev/dart-async/Stream/distinct.html

この記事では Stream の制御に distinct を活用する方法と注意点について解説していきます。

記事の対象者

  • Stream を使った状態管理やイベント処理を行っている方
  • distinct を使って不要なイベント発火を防ぎたい方
  • Stream<T> の型によって distinct の挙動が異なる理由を知りたい方
  • Listカスタムクラス などの参照型を distinct で適切に比較する方法を学びたい方

記事を執筆時点での筆者の環境

[✓] Flutter (Channel stable, 3.27.1, on macOS 15.1 24B2082 darwin-arm64, locale ja-JP)
[✓] Android toolchain - develop for Android devices (Android SDK version 35.0.0)
[✓] Xcode - develop for iOS and macOS (Xcode 16.1)
[✓] Chrome - develop for the web
[✓] Android Studio (version 2024.2)
[✓] VS Code (version 1.96.2)

Stream<bool> を使った例 ~値型~

全体のコード
test/bool_test.dart
import 'dart:async';

import 'package:flutter_test/flutter_test.dart';

late StreamController<bool> _boolController;

late Stream<bool> boolStream;

void addBool({required bool value}) {
  _boolController.add(value);
}

void main() {
  tearDown(() async {
    await _boolController.close();
  });

  group('bool Stream', () {
    setUp(() {
      _boolController = StreamController<bool>();
      boolStream = _boolController.stream;
    });
    test('bool Stream', () async {
      var count = 0;
      final subscriptionStream = boolStream.listen((_) {
        count++;
      });

      addBool(value: true);
      addBool(value: false);
      addBool(value: true);
      addBool(value: true); // 前回と同じでも受け取る

      await Future<void>.delayed(Duration.zero);

      await subscriptionStream.cancel();

      expect(count, equals(4));
    });
  });
  group('bool Stream with distinct', () {
    setUp(() {
      _boolController = StreamController<bool>();
      boolStream = _boolController.stream.distinct();
    });
    test('bool Stream', () async {
      var count = 0;
      final subscriptionStream = boolStream.listen((_) {
        count++;
      });

      addBool(value: true);
      addBool(value: false);
      addBool(value: true);
      addBool(value: true); // 重複しているので受け取らない想定

      await Future<void>.delayed(Duration.zero);

      await subscriptionStream.cancel();

      expect(count, equals(3));
    });
  });
}

例として、 bool を流す Stream を以下のように定義します。

late StreamController<bool> _boolController;

late Stream<bool> boolStream;

void addBool({required bool value}) {
  _boolController.add(value);
}

通常の場合

  group('bool Stream', () {
    setUp(() {
      _boolController = StreamController<bool>();
      boolStream = _boolController.stream;
    });
    test('bool Stream', () async {
      var count = 0;
      final subscriptionStream = boolStream.listen((_) {
        count++;
      });

      addBool(value: true);
      addBool(value: false);
      addBool(value: true);
      addBool(value: true); // 前回と同じでも受け取る

      await Future<void>.delayed(Duration.zero);

      await subscriptionStream.cancel();

      expect(count, equals(4));
    });
  });

boolStream.listen でイベントを購読し、値が流れてくるたびに count++; という単純なテストです。
まずは通常の何も設定しないで Stream を使った場合です。
addBool で値を流した場合は今回4回流しているので、 count も4になっていますね。

distinct で前回と同じ場合は値を流さない

  group('bool Stream with distinct', () {
    setUp(() {
      _boolController = StreamController<bool>();
      boolStream = _boolController.stream.distinct();
    });
    test('bool Stream', () async {
      var count = 0;
      final subscriptionStream = boolStream.listen((_) {
        count++;
      });

      addBool(value: true);
      addBool(value: false);
      addBool(value: true);
      addBool(value: true); // 重複しているので受け取らない想定

      await Future<void>.delayed(Duration.zero);

      await subscriptionStream.cancel();

      expect(count, equals(3));
    });
  });

特筆すべきは boolStream = _boolController.stream.distinct(); の部分です。
Streamdistinct をつけることで前回の値と今回の値を比較して同じであれば値を流さない処理を行っています。
詳しくは後述しますが、これは値型のものが前提です。
具体的には以下の型が該当します。

  • int
  • double
  • bool
  • String

Stream<Todo> を使った例 ~参照型のオブジェクト~

全体のコード
test/todo_test.dart
import 'dart:async';

import 'package:flutter_test/flutter_test.dart';

class Todo {
  Todo({
    required this.id,
    required this.content,
  });
  final String id;
  final String content;
}

late StreamController<Todo> _todoController;

late Stream<Todo> todoStream;

void addTodo(Todo todo) {
  _todoController.add(todo);
}

void main() {
  final todo1_1 = Todo(
    id: '1',
    content: 'test',
  );

  final todo2 = Todo(
    id: '2',
    content: 'test2',
  );

  final todo1_2 = Todo(
    id: '1',
    content: 'test',
  );

  tearDown(() async {
    await _todoController.close();
  });

  group('todo distinct', () {
    setUp(() {
      _todoController = StreamController<Todo>();
      todoStream = _todoController.stream.distinct();
    });

    test('todo Stream 1', () async {
      var count = 0;
      final subscriptionStream = todoStream.listen((_) {
        count++;
      });

      addTodo(todo1_1);
      addTodo(todo2);
      addTodo(todo1_1);
      addTodo(todo1_1); // 重複しているので受け取らない想定

      await Future<void>.delayed(Duration.zero);

      await subscriptionStream.cancel();

      expect(count, equals(3));
    });

    test('todo Stream 2', () async {
      var count = 0;
      final subscriptionStream = todoStream.listen((_) {
        count++;
      });

      addTodo(todo1_1);
      addTodo(todo2);
      addTodo(todo1_1);
      addTodo(todo1_2); // インスタンスが異なるので同一とは判定されない

      await Future<void>.delayed(Duration.zero);

      await subscriptionStream.cancel();

      expect(count, equals(4)); // 3 ではなく 4 になる
    });
  });
  group('todo distinct with (prev, next)', () {
    setUp(() {
      _todoController = StreamController<Todo>();
      todoStream =
          _todoController.stream.distinct((prev, next) => prev.id == next.id);
    });
    test('todo Stream 3 ', () async {
      var count = 0;
      final subscriptionStream = todoStream.listen((_) {
        count++;
      });

      addTodo(todo1_1);
      addTodo(todo2);
      addTodo(todo1_1);
      addTodo(todo1_2); // id が同じなので受け取らない想定

      await Future<void>.delayed(Duration.zero);

      await subscriptionStream.cancel();

      expect(count, equals(3));
    });
  });
}

次に class で作った独自の Object だった場合を見ていきます。

class Todo {
  Todo({
    required this.id,
    required this.content,
  });
  final String id;
  final String content;
}

late StreamController<Todo> _todoController;

late Stream<Todo> todoStream;

void addTodo(Todo todo) {
  _todoController.add(todo);
}

  final todo1_1 = Todo(
    id: '1',
    content: 'test',
  );

  final todo2 = Todo(
    id: '2',
    content: 'test2',
  );

  final todo1_2 = Todo(
    id: '1',
    content: 'test',
  );

distinct をつけても結果が変わる場合

  group('todo distinct', () {
    setUp(() {
      _todoController = StreamController<Todo>();
      todoStream = _todoController.stream.distinct();
    });

    test('todo Stream 1', () async {
      var count = 0;
      final subscriptionStream = todoStream.listen((_) {
        count++;
      });

      addTodo(todo1_1);
      addTodo(todo2);
      addTodo(todo1_1);
      addTodo(todo1_1); // 重複しているので受け取らない想定

      await Future<void>.delayed(Duration.zero);

      await subscriptionStream.cancel();

      expect(count, equals(3));
    });

    test('todo Stream 2', () async {
      var count = 0;
      final subscriptionStream = todoStream.listen((_) {
        count++;
      });

      addTodo(todo1_1);
      addTodo(todo2);
      addTodo(todo1_1);
      addTodo(todo1_2); // インスタンスが異なるので同一とは判定されない

      await Future<void>.delayed(Duration.zero);

      await subscriptionStream.cancel();

      expect(count, equals(4)); // 3 ではなく 4 になる
    });
  });

todo Stream 1todo Stream 2 で結果が異なります。
これは class は参照型のため、インスタンスが違うと内容が同じでも別物と判定される ためです。
仕様によってはこれでいいかもしれませんが、場合によっては内容が全く一緒であれば流さないでほしいということがあると思います。

distinct に条件を追加する

  group('todo distinct with (prev, next)', () {
    setUp(() {
      _todoController = StreamController<Todo>();
      todoStream =
          _todoController.stream.distinct((prev, next) => prev.id == next.id);
    });
    test('todo Stream 3 ', () async {
      var count = 0;
      final subscriptionStream = todoStream.listen((_) {
        count++;
      });

      addTodo(todo1_1);
      addTodo(todo2);
      addTodo(todo1_1);
      addTodo(todo1_2); // id が同じなので受け取らない想定

      await Future<void>.delayed(Duration.zero);

      await subscriptionStream.cancel();

      expect(count, equals(3));
    });
  });

distinct は、デフォルトでは 前回の値 (prev) と今回の値 (next) を == で比較し、同じ値なら流さないという動作をします。
しかし、参照型 (Todo) の場合、== はデフォルトで オブジェクトの参照を比較 するため、同じ内容であっても別々のインスタンスである場合は異なるオブジェクトとみなされてしまうという問題があります。

そこで、distinct に比較ルールを指定する関数を渡すことで、比較するルールを独自に設定することができます。

todoStream =
  _todoController.stream.distinct((prev, next) => prev.id == next.id);

この場合、prev(前回の値)と next(今回の値)の id を比較し、
id が同じなら「同じデータ」とみなして値を流さないようにしています。

たとえば、すべてのプロパティ (idcontent) が完全に一致する場合に限り同一とみなすなら、以下のように書くことも可能です。

todoStream =
  _todoController.stream.distinct((prev, next) => 
    prev.id == next.id && prev.content == next.content
  );

Stream<List<String>> を使った例 ~参照型のオブジェクト~

全体のコード
test/list_test.dart
import 'dart:async';

import 'package:collection/collection.dart';
import 'package:flutter_test/flutter_test.dart';

late StreamController<List<String>> _todoController;
late Stream<List<String>> todoStream;

void addTodoList(List<String> todoList) {
  _todoController.add(todoList);
}

void main() {
  final list1_1 = ['A', 'B'];
  final list2 = ['C', 'D'];
  final list1_2 = ['A', 'B'];

  tearDown(() async {
    await _todoController.close();
  });

  group('list distinct', () {
    setUp(() {
      _todoController = StreamController<List<String>>();
      todoStream = _todoController.stream.distinct();
    });

    test('list Stream 1', () async {
      var count = 0;
      final subscriptionStream = todoStream.listen((_) {
        count++;
      });

      addTodoList(list1_1);
      addTodoList(list2);
      addTodoList(list1_1);
      addTodoList(list1_1); // 重複しているので受け取らない想定

      await Future<void>.delayed(Duration.zero);

      await subscriptionStream.cancel();

      expect(count, equals(3));
    });
    test('list Stream 2', () async {
      var count = 0;
      final subscriptionStream = todoStream.listen((_) {
        count++;
      });

      addTodoList(list1_1);
      addTodoList(list2);
      addTodoList(list1_1);
      addTodoList(list1_2); // listは参照型なので、内容が同じでも別のインスタンスとして扱われる

      await Future<void>.delayed(Duration.zero);

      await subscriptionStream.cancel();

      expect(count, equals(4)); // 重複しているが、内容が異なるので受け取る想定
    });
  });
  group('list distinct with ListEquality', () {
    setUp(() {
      _todoController = StreamController<List<String>>();
      todoStream = _todoController.stream.distinct(
        (prev, next) => const ListEquality<String>().equals(prev, next),
      );
    });
    test('list Stream 3 ', () async {
      var count = 0;
      final subscriptionStream = todoStream.listen((_) {
        count++;
      });

      addTodoList(list1_1);
      addTodoList(list2);
      addTodoList(list1_1);
      addTodoList(list1_2); // インスタンスは違うが内容が同じなので受け取らない想定

      await Future<void>.delayed(Duration.zero);

      await subscriptionStream.cancel();

      expect(count, equals(3));
    });
  });
}

次に List 型の場合を見ていきましょう。

late StreamController<List<String>> _todoController;
late Stream<List<String>> todoStream;

void addTodoList(List<String> todoList) {
  _todoController.add(todoList);
}

  final list1_1 = ['A', 'B'];
  final list2 = ['C', 'D'];
  final list1_2 = ['A', 'B'];

distinct を使っても結果が変わる場合

  group('list distinct', () {
    setUp(() {
      _todoController = StreamController<List<String>>();
      todoStream = _todoController.stream.distinct();
    });

    test('list Stream 1', () async {
      var count = 0;
      final subscriptionStream = todoStream.listen((_) {
        count++;
      });

      addTodoList(list1_1);
      addTodoList(list2);
      addTodoList(list1_1);
      addTodoList(list1_1); // 重複しているので受け取らない想定

      await Future<void>.delayed(Duration.zero);

      await subscriptionStream.cancel();

      expect(count, equals(3));
    });
    test('list Stream 2', () async {
      var count = 0;
      final subscriptionStream = todoStream.listen((_) {
        count++;
      });

      addTodoList(list1_1);
      addTodoList(list2);
      addTodoList(list1_1);
      addTodoList(list1_2); // listは参照型なので、内容が同じでも別のインスタンスとして扱われる

      await Future<void>.delayed(Duration.zero);

      await subscriptionStream.cancel();

      expect(count, equals(4)); // 重複しているが、内容が異なるので受け取る想定
    });
  });

list Stream 1list Stream 2 で結果が異なります。
これも class の時と同様に list は参照型のため、インスタンスが違うと内容が同じでも別物と判定される ためです。

distinct に条件を追加する ~ListEquality の活用~

  group('list distinct with ListEquality', () {
    setUp(() {
      _todoController = StreamController<List<String>>();
      todoStream = _todoController.stream.distinct(
        (prev, next) => const ListEquality<String>().equals(prev, next),
      );
    });
    test('list Stream 3 ', () async {
      var count = 0;
      final subscriptionStream = todoStream.listen((_) {
        count++;
      });

      addTodoList(list1_1);
      addTodoList(list2);
      addTodoList(list1_1);
      addTodoList(list1_2); // インスタンスは違うが内容が同じなので受け取らない想定

      await Future<void>.delayed(Duration.zero);

      await subscriptionStream.cancel();

      expect(count, equals(3));
    });
  });

先ほどの Todo の時と同様に distinct に条件を追加していきます。
ただし List の場合はそのままでは単純な比較ができません。
そこでFlutter標準のパッケージで collection というものがあり、その中の ListEquality<T>.equals を活用すると同一性を検証できるようになります。

https://pub.dev/packages/collection

import 'package:collection/collection.dart';

todoStream = _todoController.stream.distinct(
  (prev, next) => const ListEquality<String>().equals(prev, next),
);

T の部分に List の中身の型を入れるのを忘れないようにしましょう。
今回の例で行けば <String> です。

Stream<User> を使った例 ~参照型のオブジェクに Equatable を継承させる~

全体のコード
test/user_test.dart
import 'dart:async';

import 'package:collection/collection.dart';
import 'package:equatable/equatable.dart';
import 'package:flutter_test/flutter_test.dart';

class User extends Equatable {
  const User({
    required this.id,
    required this.name,
  });
  final String id;
  final String name;

  
  List<Object?> get props => [id, name];
}

late StreamController<User> _userController;
late Stream<User> userStream;

void addUser(User user) {
  _userController.add(user);
}

late StreamController<List<User>> _userListController;
late Stream<List<User>> userListStream;

void addUserList(List<User> userList) {
  _userListController.add(userList);
}

void main() {
  const user1_1 = User(
    id: '1',
    name: 'test',
  );

  const user2 = User(
    id: '2',
    name: 'test2',
  );

  const user1_2 = User(
    id: '1',
    name: 'test',
  );

  const user1_3 = User(
    id: '1',
    name: 'example',
  );

  final userList1_1 = [user1_1, user2];
  final userList2 = [user2];
  final userList1_2 = [user1_2, user2];
  final userList1_3 = [user1_3, user2];

  group('user distinct', () {
    setUp(() {
      _userController = StreamController<User>();
      userStream = _userController.stream.distinct();
    });

    tearDown(() async {
      await _userController.close();
    });

    test('user Stream 1', () async {
      var count = 0;
      final subscriptionStream = userStream.listen((_) {
        count++;
      });

      addUser(user1_1);
      addUser(user2);
      addUser(user1_1);
      addUser(user1_2); // 重複しているので受け取らない想定
      addUser(user1_3); // 重複しているが、name が異なるので受け取る想定

      await Future<void>.delayed(Duration.zero);

      await subscriptionStream.cancel();

      expect(count, equals(4));
    });
  });
  group('userList distinct', () {
    setUp(() {
      _userListController = StreamController<List<User>>();
      userListStream = _userListController.stream.distinct();
    });

    tearDown(() async {
      await _userListController.close();
    });
    test('userList Stream 1', () async {
      var count = 0;
      final subscriptionStream = userListStream.listen((_) {
        count++;
      });

      addUserList(userList1_1);
      addUserList(userList2);
      addUserList(userList1_1);
      addUserList(userList1_2); // 受け取る
      addUserList(userList1_3); // 受け取る

      await Future<void>.delayed(Duration.zero);

      await subscriptionStream.cancel();

      expect(count, equals(5));
    });
  });
  group('userList distinct with ListEquality', () {
    setUp(() {
      _userListController = StreamController<List<User>>();
      userListStream = _userListController.stream.distinct(
        (prev, next) => const ListEquality<User>().equals(prev, next),
      );
    });
    test('userList Stream 2 ', () async {
      var count = 0;
      final subscriptionStream = userListStream.listen((_) {
        count++;
      });

      addUserList(userList1_1);
      addUserList(userList2);
      addUserList(userList1_1);
      addUserList(userList1_2); // 受け取らない想定
      addUserList(userList1_3); // 受け取る想定

      await Future<void>.delayed(Duration.zero);

      await subscriptionStream.cancel();

      expect(count, equals(4));
    });
  });
}

最後に class で定義した User の場合です。

class User extends Equatable {
  const User({
    required this.id,
    required this.name,
  });
  final String id;
  final String name;

  
  List<Object?> get props => [id, name];
}

late StreamController<User> _userController;
late Stream<User> userStream;

void addUser(User user) {
  _userController.add(user);
}

前回の Todo と違うのは Equatable を継承しているところです。
これは参照型のオブジェクトを簡単に比較できるようにするパッケージです。

https://pub.dev/packages/equatable

使い方は対象のclassextendsEquatable を継承させます。
すると @override するように促されるので List<Object?> get props をオーバーライドしましょう。
この配列に比較する内容を書いていきます。
今回は idname という全てのパラメーターを比較対象にしています。(一部のパラメータでも可)

distinct で制御する

  const user1_1 = User(
    id: '1',
    name: 'test',
  );

  const user2 = User(
    id: '2',
    name: 'test2',
  );

  const user1_2 = User(
    id: '1',
    name: 'test',
  );

  const user1_3 = User(
    id: '1',
    name: 'example',
  );
  group('user distinct', () {
    setUp(() {
      _userController = StreamController<User>();
      userStream = _userController.stream.distinct();
    });

    tearDown(() async {
      await _userController.close();
    });

    test('user Stream 1', () async {
      var count = 0;
      final subscriptionStream = userStream.listen((_) {
        count++;
      });

      addUser(user1_1);
      addUser(user2);
      addUser(user1_1);
      addUser(user1_2); // 重複しているので受け取らない想定
      addUser(user1_3); // 重複しているが、name が異なるので受け取る想定

      await Future<void>.delayed(Duration.zero);

      await subscriptionStream.cancel();

      expect(count, equals(4));
    });
  });

先ほどの Todo の時は distinct で細かい条件を指定しなければいけませんでした。
今回は何も条件を追加しなくても Equatable のおかげでそのままでも同一性をチェックしてくれています。
上記は addUser を全部で5回行っています。
3回目と4回目では同じ内容なので値を流していません。
4回目と5回目に関しては id は一緒ですが name が異なるので値を流しています。

Stream<List<User>> の場合 ~デフォルトの distinct~

ちょっと複雑ですが、 UserList 型について見ていきます。

  const user1_1 = User(
    id: '1',
    name: 'test',
  );

  const user2 = User(
    id: '2',
    name: 'test2',
  );

  const user1_2 = User(
    id: '1',
    name: 'test',
  );

  const user1_3 = User(
    id: '1',
    name: 'example',
  );

  final userList1_1 = [user1_1, user2];
  final userList2 = [user2];
  final userList1_2 = [user1_2, user2];
  final userList1_3 = [user1_3, user2];
  group('userList distinct', () {
    setUp(() {
      _userListController = StreamController<List<User>>();
      userListStream = _userListController.stream.distinct();
    });

    tearDown(() async {
      await _userListController.close();
    });
    test('userList Stream 1', () async {
      var count = 0;
      final subscriptionStream = userListStream.listen((_) {
        count++;
      });

      addUserList(userList1_1);
      addUserList(userList2);
      addUserList(userList1_1);
      addUserList(userList1_2); // 受け取る
      addUserList(userList1_3); // 受け取る

      await Future<void>.delayed(Duration.zero);

      await subscriptionStream.cancel();

      expect(count, equals(5));
    });
  });

あくまで Equatable によって同一性が検証できるのは User オブジェクト単体同士の場合です。
よって既存の distinct のままでは List 同士の検証はインスタンスの参照を検証してしまい、値を流してしまいます。

Stream<List<User>> の場合 ~distinct + ListEquality~

  group('userList distinct with ListEquality', () {
    setUp(() {
      _userListController = StreamController<List<User>>();
      userListStream = _userListController.stream.distinct(
        (prev, next) => const ListEquality<User>().equals(prev, next),
      );
    });
    test('userList Stream 2 ', () async {
      var count = 0;
      final subscriptionStream = userListStream.listen((_) {
        count++;
      });

      addUserList(userList1_1);
      addUserList(userList2);
      addUserList(userList1_1);
      addUserList(userList1_2); // 受け取らない想定
      addUserList(userList1_3); // 受け取る想定

      await Future<void>.delayed(Duration.zero);

      await subscriptionStream.cancel();

      expect(count, equals(4));
    });
  });

List 型の時と同様に ListEquality を使うことで、List の中身を比較することができます。

終わりに

この記事では distinct を活用して Stream の発火を制御する方法について、値型 (bool, int, double, String) や参照型 (Todo, List<String>, User) の例を交えながら解説しました。

distinct前回の値と今回の値が同じ場合にイベントを流さない という便利な機能ですが、値型の場合はそのまま使える一方で、参照型の場合は適切な比較処理を追加する必要がある ことがわかりました。

まとめ

  • int, double, bool, String などの値型は distinct だけで問題なく制御できる。
  • Listカスタムクラス は参照型のため、中身が同じでも別のインスタンスとして扱われる ことに注意。
  • distinct((prev, next) => 条件) を使うことで、カスタム比較ルール を指定可能。
  • ListEqualityEquatable を活用することで、より適切に distinct の判定を行える

Stream を利用する際に、無駄なイベント発火を防ぎ、パフォーマンスを向上させる ためにも、distinct を適切に活用していきましょう。


ここまで読んでいただきありがとうございました!
この記事が Stream の制御について理解を深める手助けになれば幸いです 🙌

Discussion