DartのStreamについて調べる

はじめに
Dart公式ドキュメントのAsynchronous programming: Streamsのページを読んでいく
動作確認にはDartPadを使用する
DartのストリームはRxJSかNode.jsのストリームのようなものだと予想するが果たしてどうやら

Receiving stream events
ストリームのイベントを受信する
コード
Future<int> sumStream(Stream<int> stream) async {
var sum = 0;
await for (final value in stream) {
sum += value;
}
return sum;
}
ストリームからデータを受信するにはawait for
を使う
ループの最後に行った時に次のイベントかストリームの終了を待機する
await for
を使う関数はasync
である必要がある
試してみるには下記のコードを使用する
Future<int> sumStream(Stream<int> stream) async {
var sum = 0;
await for (final value in stream) {
sum += value;
}
return sum;
}
Stream<int> countStream(int to) async* {
for (int i = 1; i <= to; i++) {
yield i;
}
}
void main() async {
var stream = countStream(10);
var sum = await sumStream(stream);
print(sum); // 55
}
ストリームでデータを送信する方法については後から学ぶがポイントは下記の通り
- 関数を
await*
キーワードを使って修飾する - 送信には
yield
を使う

Error events
throw
を使って例外を送信することもできる
受信側ではtry
とcatch
を使って例外を受信する
コード
Future<int> sumStream(Stream<int> stream) async {
var sum = 0;
try {
await for (final value in stream) {
sum += value;
}
} catch (e) {
return -1;
}
return sum;
}
Stream<int> countStream(int to) async* {
for (int i = 1; i <= to; i++) {
if (i == 4) {
throw Exception('Intentional exception');
} else {
yield i;
}
}
}
void main() async {
var stream = countStream(10);
var sum = await sumStream(stream);
print(sum); // -1
}

Working with streams
streamはiterableのように便利なメソッドをたくさん持っている
例
Stream<int> numbers() async* {
for (var i = 1; i <= 10; i += 1) {
yield i;
}
}
Future<void> main() async {
final lastOdd = await numbers().lastWhere((n) => n % 2 == 1);
print(lastOdd); // 9
}

Two kinds of streams
streamには下記の2種類がある
- single subscription stream
- broadcast stream
single subscription stream
通常のストリーム、1回しかリスンできない
試してみる
Stream<int> numbers() async* {
for (var i = 1; i <= 10; i += 1) {
yield i;
}
}
Future<void> main() async {
final stream = numbers();
print(await stream.lastWhere((n) => n % 2 == 1));
print(await stream.lastWhere((n) => n % 2 == 1));
}
実行結果
9
Uncaught Error: Bad state: Stream has already been listened to.
Broadcast streams
複数のリスナーが同時に購読可能なストリーム、マウスクリックなどのイベント処理に向いている
JavaScriptのEventListenerのような感じ

Methods that process a stream
Streamの便利メソッドはdrainとpipe以外はIterableとほぼ同じ
drainメソッド
ストリームの完了かエラーの発生まで待機する、それまでに得られたデータは全て廃棄する
戻り値はFuture<T>、オプショナル引数として完了時に返される値を指定できる
Future<void> main() async {
final result = await Stream.fromIterable([1, 2, 3]).drain(100);
print(result); // 100
}
例外が発生した場合は中断される
Stream<int> myStream() async* {
throw new Exception("myStream");
}
Future<void> main() async {
try {
final result = await myStream().drain(100);
print(result); // 表示されない
} catch (err) {
print(err); // Exception: myStream
}
}
pipeメソッド
ストリームからデータを受信して消費する、正直使い所がよくわからない
Node.jsのStreamみたいなものだと考えて良いのだろうか
import 'dart:async';
class MyStreamConsumer<T> implements StreamConsumer<T> {
Future addStream(Stream<T> stream) async {
await for (final element in stream) {
print(element);
}
}
Future close() async {}
}
Future<void> main() async {
final stream = Stream.fromIterable([1, 2, 3]);
final consumer = MyStreamConsumer<int>();
print("start");
await stream.pipe(consumer);
print("end");
}
実行結果
start
1
2
3
end

Methods that modify a stream
ストリームに変更を加えるメソッド
Stream<R> cast<R>();
Stream<S> expand<S>(Iterable<S> Function(T element) convert);
Stream<S> map<S>(S Function(T event) convert);
Stream<T> skip(int count);
Stream<T> skipWhile(bool Function(T element) test);
Stream<T> take(int count);
Stream<T> takeWhile(bool Function(T element) test);
Stream<T> where(bool Function(T event) test);
試しにexpandを使ってみる
Future<void> main() async {
final stream = Stream.fromIterable([1, 2, 3])
.expand((n) {
return List<int>.generate(n, (i) => i + 1);
});
await for (final n in stream) {
print(n);
}
}
実行結果
1
1
2
1
2
3
asyncバージョンもある
Future<void> main() async {
final stream = Stream.fromIterable([1, 2, 3])
.asyncExpand((n) async* {
for (var i = 1; i <= n; i += 1) {
await Future.delayed(Duration(seconds: 1));
yield i;
}
});
await for (final n in stream) {
print(n);
}
}
実行結果
(1秒待機)
1
(1秒待機)
1
(1秒待機)
2
(1秒待機)
1
(1秒待機)
2
(1秒待機)
3
distinctもIterableにはないらしい
Future<void> main() async {
final stream = Stream.fromIterable([1, 2, 2, 3, 3, 2, 2, 1])
.distinct()
.forEach((n) {
print(n);
});
}
実行結果、たしかに連続する重複が省かれている
1
2
3
2
1
handleErrorを使うとエラー処理ができるが途中でストリームが終わってしまう
Stream<int> myStream() async* {
yield 1;
throw Exception("myStream");
yield 2;
}
Future<void> main() async {
final stream = myStream().handleError((err) => print(err));
print("start");
await for (final n in stream) {
print(n);
}
print("end");
}
実行結果、中断されたため2が表示されていないことがわかる
start
1
Exception: myStream
end
timeoutも面白い、一定時間以内に次のイベントが発生しなかったらonTimeoutが呼び出される
しかもストリームが中断しない
Stream<int> myStream() async* {
for (var i = 1; i <= 5; i += 1) {
await Future.delayed(Duration(seconds: i));
yield i;
}
}
Future<void> main() async {
final stream = myStream().timeout(
Duration(seconds: 3),
onTimeout: (_) => print("timeout"),
);
print("start");
await for (final n in stream) {
print(n);
}
print("end");
}
実行結果
start
(1秒待機)
1
(2秒待機)
2
(3秒待機)
timeout
3
(3秒待機)
timeout
(1秒待機)
4
(3秒待機)
timeout
(2秒待機)
5
end

The transform() function
transformメソッドはmapをより汎用的にしたバージョン
通常のmapメソッドは1回に1つのイベントを扱うがtransformは1回に複数のイベントを扱える
典型的な利用方法は文字コードの変換など

Reading and decoding a file
ファイルを読み込んでデコードする、残念ながらDartPadでは実行できないが面白い
import 'dart:convert';
import 'dart:io';
void main(List<String> args) async {
var file = File(args[0]);
var lines = utf8.decoder
.bind(file.openRead())
.transform(const LineSplitter());
await for (final line in lines) {
if (!line.startsWith('#')) print(line);
}
}
bindはStreamTransformerのメソッド、transformはStreamのメソッド
サイズの大きなデータを扱う時にはStreamTransformerやStreamConsumerを使った方が良さそう
ところでFileってabstractクラスなのにどうしてインスタンス化できるのだろうか
調べたらfactory
というキーワードが関連しているようだ

The listen() method
最後にlistenメソッド
Streamを作るにはStreamクラスを継承してlistenメソッドを実装すれば良い
基本的にはforEachと同じだが、キャンセルなどを扱うStreamSubscriptionが返却される点が異なる
Future<void> main() async {
final stream = Stream.fromIterable([1, 2, 3])
.asyncMap((n) async {
await Future.delayed(Duration(seconds: 1));
return n;
});
final subscription = stream.listen((n) => print(n));
await Future.delayed(Duration(seconds: 3));
await subscription.cancel();
}

以上で一旦クローズ、次はMaterial Widgetを一通り使ってみる