🌐

Stream.fromFuture

2024/02/20に公開

📕Overview

https://api.flutter.dev/flutter/dart-async/Stream/Stream.fromFuture.html

Stream<T>.fromFuture(
Future<T> future
)

未来から新しい単一サブスクリプション・ストリームを作成します。

フューチャーが完了すると、ストリームはデータかエラーのいずれかのイベントを発生させ、done イベントで閉じます。

例:

Future<String> futureTask() async {
  await Future.delayed(const Duration(seconds: 5));
  return 'Future complete';
}

final stream = Stream<String>.fromFuture(futureTask());
stream.listen(print,
    onDone: () => print('Done'), onError: print);

// Outputs:
// "Future complete" after 'futureTask' finished.
// "Done" when stream completed.

Implementation(実施)

factory Stream.fromFuture(Future<T> future) {
  // Use the controller's buffering to fill in the value even before
  // the stream has a listener. For a single value, it's not worth it
  // to wait for a listener before doing the `then` on the future.
  _StreamController<T> controller =
      new _SyncStreamController<T>(null, null, null, null);
  future.then((value) {
    controller._add(value);
    controller._closeUnchecked();
  }, onError: (error, stackTrace) {
    controller._addError(error, stackTrace);
    controller._closeUnchecked();
  });
  return controller.stream;
}

🧷summary

APIから取得したデータがStreamじゃなくて、Futureだった!
でもStreamを使いたい。Stream.fromFutureを使えばできるようだ。今回やりたいことは、FutureのデータをStreamに変えることですね。

内部実装を見てみた。

/// 未来から新しい単一加入ストリームを作成する。
///
/// 未来が完了すると、ストリームは1つのイベントを発生させます。
/// その後、done イベントで閉じます。
///
/// 例:
/// ``dart
/// Future<String> futureTask() async { /// Future.delayed(delayed)を待つ。
/// await Future.delayed(const Duration(seconds: 5));
/// return 'フューチャー完了';

///
/// final stream = Stream<String>.fromFuture(futureTask());
/// stream.listen(print、
/// onDone: () => print('Done'), onError: print);

/// // 出力:
/// // 'futureTask'が終了したら "Future complete"。
/// // ストリームが完了したら "Done".
/// ```

factory Stream.fromFuture(Future<T> future) {
  // Use the controller's buffering to fill in the value even before
  // the stream has a listener. For a single value, it's not worth it
  // to wait for a listener before doing the `then` on the future.
  _StreamController<T> controller =
      new _SyncStreamController<T>(null, null, null, null);
  future.then((value) {
    controller._add(value);
    controller._closeUnchecked();
  }, onError: (error, stackTrace) {
    controller._addError(error, stackTrace);
    controller._closeUnchecked();
  });
  return controller.stream;
}

Supabaseを今回使用していて、データをリアルタイムで取得したいので、試しに作ったコードです。

import 'package:riverpod_annotation/riverpod_annotation.dart';
import 'package:supabase_flutter/supabase_flutter.dart';

import '../../core/logger/logger.dart';
import '../../domain/entity/t_user.dart';

abstract interface class TaskRepository {
Future<void> getUserTasks();
Stream<List<TUserState>> watchUserTasks();
}

final taskRepositoryStreamProvider = StreamProvider<List<TUserState>>(
(ref) => ref.watch(taskRepositoryProvider).watchUserTasks(),
);

final taskRepositoryProvider = Provider<TaskRepository>((ref) {
return TaskRepositoryImpl();
});

class TaskRepositoryImpl implements TaskRepository {
final supabase = Supabase.instance;
// FIXME: このメソッドを実装してください。

Future<void> getUserTasks() async {
  try {
    final response =
        await supabase.client.from('t_user').select('*, t_task(*)');
    if (response.isEmpty) {
      throw Exception('Failed to fetch user and tasks: ${response.hashCode}');
    }
    logger.d('User and tasks: $response');
  } on Exception catch (e) {
    logger.d('Error: $e');
    rethrow;
  }
}

// FIXME: Futureでデータが渡されてくる。そのデータをStreamに変換してください。

Stream<List<TUserState>> watchUserTasks() {
  return Stream.fromFuture(
    supabase.client
        .from('t_user')
        .select('*, t_task(*)')
        .then((res) => res.map(TUserState.fromJson).toList()),
  );
}
}

🧑‍🎓thoughts

使ってみた感想ですが、freezedでListでクラスをネストして、Supabaseのテーブルをjoinして、データを取得するロジックを考えてみたのですが、t_userの情報は取れるが、joinというか、List<TTask>のデータを取得することはできなかった。

違うロジックの方が良さそう。今回は、Futureでデータが返ってきていたら、Streamに変換するコードをたまたま知ったので記事にしてみました。多分このコードは使わないと思います😅

Discussion