Stream.fromFuture
📕Overview
Stream<T>.fromFuture(
Future<T> future
)
未来から新しい単一サブスクリプション・ストリームを作成します。
フューチャーが完了すると、ストリームはデータかエラーのいずれかのイベントを発生させ、done イベントで閉じます。
例:
Future<String> futureTask() async {
await Future.delayed(const Duration(seconds: 5));
return 'Future complete';
}
final stream = Stream<String>.fromFuture(futureTask());
stream.listen(print,
onDone: () => print('Done'), onError: print);
// Outputs:
// "Future complete" after 'futureTask' finished.
// "Done" when stream completed.
Implementation(実施)
factory Stream.fromFuture(Future<T> future) {
// Use the controller's buffering to fill in the value even before
// the stream has a listener. For a single value, it's not worth it
// to wait for a listener before doing the `then` on the future.
_StreamController<T> controller =
new _SyncStreamController<T>(null, null, null, null);
future.then((value) {
controller._add(value);
controller._closeUnchecked();
}, onError: (error, stackTrace) {
controller._addError(error, stackTrace);
controller._closeUnchecked();
});
return controller.stream;
}
🧷summary
APIから取得したデータがStreamじゃなくて、Futureだった!
でもStreamを使いたい。Stream.fromFuture
を使えばできるようだ。今回やりたいことは、FutureのデータをStreamに変えることですね。
内部実装を見てみた。
/// 未来から新しい単一加入ストリームを作成する。
///
/// 未来が完了すると、ストリームは1つのイベントを発生させます。
/// その後、done イベントで閉じます。
///
/// 例:
/// ``dart
/// Future<String> futureTask() async { /// Future.delayed(delayed)を待つ。
/// await Future.delayed(const Duration(seconds: 5));
/// return 'フューチャー完了';
///
/// final stream = Stream<String>.fromFuture(futureTask());
/// stream.listen(print、
/// onDone: () => print('Done'), onError: print);
/// // 出力:
/// // 'futureTask'が終了したら "Future complete"。
/// // ストリームが完了したら "Done".
/// ```
factory Stream.fromFuture(Future<T> future) {
// Use the controller's buffering to fill in the value even before
// the stream has a listener. For a single value, it's not worth it
// to wait for a listener before doing the `then` on the future.
_StreamController<T> controller =
new _SyncStreamController<T>(null, null, null, null);
future.then((value) {
controller._add(value);
controller._closeUnchecked();
}, onError: (error, stackTrace) {
controller._addError(error, stackTrace);
controller._closeUnchecked();
});
return controller.stream;
}
Supabaseを今回使用していて、データをリアルタイムで取得したいので、試しに作ったコードです。
import 'package:riverpod_annotation/riverpod_annotation.dart';
import 'package:supabase_flutter/supabase_flutter.dart';
import '../../core/logger/logger.dart';
import '../../domain/entity/t_user.dart';
abstract interface class TaskRepository {
Future<void> getUserTasks();
Stream<List<TUserState>> watchUserTasks();
}
final taskRepositoryStreamProvider = StreamProvider<List<TUserState>>(
(ref) => ref.watch(taskRepositoryProvider).watchUserTasks(),
);
final taskRepositoryProvider = Provider<TaskRepository>((ref) {
return TaskRepositoryImpl();
});
class TaskRepositoryImpl implements TaskRepository {
final supabase = Supabase.instance;
// FIXME: このメソッドを実装してください。
Future<void> getUserTasks() async {
try {
final response =
await supabase.client.from('t_user').select('*, t_task(*)');
if (response.isEmpty) {
throw Exception('Failed to fetch user and tasks: ${response.hashCode}');
}
logger.d('User and tasks: $response');
} on Exception catch (e) {
logger.d('Error: $e');
rethrow;
}
}
// FIXME: Futureでデータが渡されてくる。そのデータをStreamに変換してください。
Stream<List<TUserState>> watchUserTasks() {
return Stream.fromFuture(
supabase.client
.from('t_user')
.select('*, t_task(*)')
.then((res) => res.map(TUserState.fromJson).toList()),
);
}
}
🧑🎓thoughts
使ってみた感想ですが、freezedでListでクラスをネストして、Supabaseのテーブルをjoinして、データを取得するロジックを考えてみたのですが、t_userの情報は取れるが、joinというか、List<TTask>のデータを取得することはできなかった。
違うロジックの方が良さそう。今回は、Futureでデータが返ってきていたら、Streamに変換するコードをたまたま知ったので記事にしてみました。多分このコードは使わないと思います😅
Discussion