Closed11

DartのStreamについて調べる

薄田達哉 / tatsuyasusukida薄田達哉 / tatsuyasusukida

Receiving stream events

ストリームのイベントを受信する

コード

Future<int> sumStream(Stream<int> stream) async {
  var sum = 0;
  await for (final value in stream) {
    sum += value;
  }
  return sum;
}

ストリームからデータを受信するにはawait forを使う

ループの最後に行った時に次のイベントかストリームの終了を待機する

await forを使う関数はasyncである必要がある

試してみるには下記のコードを使用する

Future<int> sumStream(Stream<int> stream) async {
  var sum = 0;
  await for (final value in stream) {
    sum += value;
  }
  return sum;
}

Stream<int> countStream(int to) async* {
  for (int i = 1; i <= to; i++) {
    yield i;
  }
}

void main() async {
  var stream = countStream(10);
  var sum = await sumStream(stream);
  print(sum); // 55
}

ストリームでデータを送信する方法については後から学ぶがポイントは下記の通り

  • 関数をawait*キーワードを使って修飾する
  • 送信にはyieldを使う
薄田達哉 / tatsuyasusukida薄田達哉 / tatsuyasusukida

Error events

throwを使って例外を送信することもできる

受信側ではtrycatchを使って例外を受信する

コード

Future<int> sumStream(Stream<int> stream) async {
  var sum = 0;
  try {
    await for (final value in stream) {
      sum += value;
    }
  } catch (e) {
    return -1;
  }
  return sum;
}

Stream<int> countStream(int to) async* {
  for (int i = 1; i <= to; i++) {
    if (i == 4) {
      throw Exception('Intentional exception');
    } else {
      yield i;
    }
  }
}

void main() async {
  var stream = countStream(10);
  var sum = await sumStream(stream);
  print(sum); // -1
}
薄田達哉 / tatsuyasusukida薄田達哉 / tatsuyasusukida

Two kinds of streams

streamには下記の2種類がある

  • single subscription stream
  • broadcast stream

single subscription stream

通常のストリーム、1回しかリスンできない

試してみる

Stream<int> numbers() async* {
  for (var i = 1; i <= 10; i += 1) {
    yield i;
  }
}

Future<void> main() async {
  final stream = numbers();
  print(await stream.lastWhere((n) => n % 2 == 1));
  print(await stream.lastWhere((n) => n % 2 == 1));
}

実行結果

9
Uncaught Error: Bad state: Stream has already been listened to.

Broadcast streams

複数のリスナーが同時に購読可能なストリーム、マウスクリックなどのイベント処理に向いている

JavaScriptのEventListenerのような感じ

薄田達哉 / tatsuyasusukida薄田達哉 / tatsuyasusukida

Methods that process a stream

Streamの便利メソッドはdrainとpipe以外はIterableとほぼ同じ

drainメソッド

ストリームの完了かエラーの発生まで待機する、それまでに得られたデータは全て廃棄する

戻り値はFuture<T>、オプショナル引数として完了時に返される値を指定できる

Future<void> main() async {
  final result = await Stream.fromIterable([1, 2, 3]).drain(100);
  print(result); // 100
}

例外が発生した場合は中断される

Stream<int> myStream() async* {
  throw new Exception("myStream");
}

Future<void> main() async {
  try {
    final result = await myStream().drain(100);
    print(result); // 表示されない
  } catch (err) {
    print(err); // Exception: myStream
  }
}

pipeメソッド

ストリームからデータを受信して消費する、正直使い所がよくわからない

Node.jsのStreamみたいなものだと考えて良いのだろうか

import 'dart:async';

class MyStreamConsumer<T> implements StreamConsumer<T> {
  
  Future addStream(Stream<T> stream) async {
    await for (final element in stream) {
      print(element);
    }
  }
  
  
  Future close() async {}
}

Future<void> main() async {
  final stream = Stream.fromIterable([1, 2, 3]);
  final consumer = MyStreamConsumer<int>();
  print("start");
  await stream.pipe(consumer);
  print("end");
}

実行結果

start
1
2
3
end
薄田達哉 / tatsuyasusukida薄田達哉 / tatsuyasusukida

Methods that modify a stream

ストリームに変更を加えるメソッド

Stream<R> cast<R>();
Stream<S> expand<S>(Iterable<S> Function(T element) convert);
Stream<S> map<S>(S Function(T event) convert);
Stream<T> skip(int count);
Stream<T> skipWhile(bool Function(T element) test);
Stream<T> take(int count);
Stream<T> takeWhile(bool Function(T element) test);
Stream<T> where(bool Function(T event) test);

試しにexpandを使ってみる

Future<void> main() async {
  final stream = Stream.fromIterable([1, 2, 3])
    .expand((n) {
      return List<int>.generate(n, (i) => i + 1);
    });
  
  await for (final n in stream) {
    print(n);
  }
}

実行結果

1
1
2
1
2
3

asyncバージョンもある

Future<void> main() async {
  final stream = Stream.fromIterable([1, 2, 3])
    .asyncExpand((n) async* {
      for (var i = 1; i <= n; i += 1) {
        await Future.delayed(Duration(seconds: 1));
        yield i;
      }
    });
  
  await for (final n in stream) {
    print(n);
  }
}

実行結果

(1秒待機)
1
(1秒待機)
1
(1秒待機)
2
(1秒待機)
1
(1秒待機)
2
(1秒待機)
3

distinctもIterableにはないらしい

Future<void> main() async {
  final stream = Stream.fromIterable([1, 2, 2, 3, 3, 2, 2, 1])
    .distinct()
    .forEach((n) {
      print(n);
    });
}

実行結果、たしかに連続する重複が省かれている

1
2
3
2
1

handleErrorを使うとエラー処理ができるが途中でストリームが終わってしまう

Stream<int> myStream() async* {
  yield 1;
  throw Exception("myStream");
  yield 2;
}

Future<void> main() async {
  final stream = myStream().handleError((err) => print(err));
  
  print("start");
  await for (final n in stream) {
    print(n);
  }
  print("end");
}

実行結果、中断されたため2が表示されていないことがわかる

start
1
Exception: myStream
end

timeoutも面白い、一定時間以内に次のイベントが発生しなかったらonTimeoutが呼び出される

しかもストリームが中断しない

Stream<int> myStream() async* {
  for (var i = 1; i <= 5; i += 1) {
    await Future.delayed(Duration(seconds: i));
    yield i;
  }
}

Future<void> main() async {
  final stream = myStream().timeout(
    Duration(seconds: 3),
    onTimeout: (_) => print("timeout"),
  );
  
  print("start");
  await for (final n in stream) {
    print(n);
  }
  print("end");
}

実行結果

start
(1秒待機)
1
(2秒待機)
2
(3秒待機)
timeout
3
(3秒待機)
timeout
(1秒待機)
4
(3秒待機)
timeout
(2秒待機)
5
end
薄田達哉 / tatsuyasusukida薄田達哉 / tatsuyasusukida

The transform() function

transformメソッドはmapをより汎用的にしたバージョン

通常のmapメソッドは1回に1つのイベントを扱うがtransformは1回に複数のイベントを扱える

典型的な利用方法は文字コードの変換など

薄田達哉 / tatsuyasusukida薄田達哉 / tatsuyasusukida

Reading and decoding a file

ファイルを読み込んでデコードする、残念ながらDartPadでは実行できないが面白い

import 'dart:convert';
import 'dart:io';

void main(List<String> args) async {
  var file = File(args[0]);
  var lines = utf8.decoder
      .bind(file.openRead())
      .transform(const LineSplitter());
  await for (final line in lines) {
    if (!line.startsWith('#')) print(line);
  }
}

bindはStreamTransformerのメソッド、transformはStreamのメソッド

サイズの大きなデータを扱う時にはStreamTransformerやStreamConsumerを使った方が良さそう

ところでFileってabstractクラスなのにどうしてインスタンス化できるのだろうか

調べたらfactoryというキーワードが関連しているようだ

薄田達哉 / tatsuyasusukida薄田達哉 / tatsuyasusukida

The listen() method

最後にlistenメソッド

Streamを作るにはStreamクラスを継承してlistenメソッドを実装すれば良い

基本的にはforEachと同じだが、キャンセルなどを扱うStreamSubscriptionが返却される点が異なる

Future<void> main() async {
  final stream = Stream.fromIterable([1, 2, 3])
    .asyncMap((n) async {
      await Future.delayed(Duration(seconds: 1));
      return n;
    });

  final subscription = stream.listen((n) => print(n));
  
  await Future.delayed(Duration(seconds: 3));
  await subscription.cancel();
}
このスクラップは2023/01/10にクローズされました