🎲

【Dart】Streamを簡単に学ぶ

2020/12/11に公開

全てをカバーしているわけではありませんが、基本的な部分を簡単順に記載します。

【Dart】Streamの簡単な使い方

参考

はじめに

  • Streamとは

    • ストリーム - Wikipedia
      • ストリーム(Stream)は原語の川、流れといった意味の派生的用法で用いられる。
  • Dartでもデータを流す川のイメージ

    • 新しいデータが流れた際に受け取りたい人に自動で教えてくれる
      • 例:チャットの文字情報
        • A君のメッセージがStreamに流れ、B君側ではそれをすぐに見れる
  • 実行環境

    • DartPadやAndroid Studio等で実行
      • Dart SDK 2.10.4
  • Streamの説明の流れ

  1. 同期実行(Iterable<T>)と非同期実行(Stream<T>)の基礎
  2. 非同期実行(Stream<T>)のちょっとした応用
  3. Stream<T>の便利なコンストラクタ
  4. Streamを実際に扱うのに便利なコントローラ
  • 説明として、チンチロリンを想定し、サイコロ3つを一つずつ投げるイメージで記載
    • 同期
      • 自分で片手で一つ一つ投げ続ける
    • 非同期
      • 他人と交互に投げるかのイメージ

同期(Iterable<T>)の基礎

  • 目的

    • 全て逐次的な処理の場合
  • ポイント

    • Streamに変化を加える関数(chinchiroSync
      • Iterable<int>を返す
      • sync*をつける
  • 備考

    • yieldは式を計算し、結果の値を渡す(複数回実行できるreturnのようなもの)

    • DartPadでの実行では、import 'dart:io';を利用したsleep関数に対応していないため、以下のエラーとなる。

      Uncaught Error: Unsupported operation: ProcessUtils._sleep

import 'dart:io';
import 'dart:math';

Iterable<int> chinchiroSync() sync* {
  final random = Random();

  for (var i = 0; i < 3; ++i) {
    sleep(Duration(seconds: 1));
    yield random.nextInt(6) + 1;
  }
}

void main() {
  final stream = chinchiroSync();
  for (int i in stream) {
    print(i);
  }
}

実行結果

4
6
2

☝1秒経過毎に1行出力される

  • 上記のmain関数内のfor文をforEach文にした際も同様
import 'dart:io';
import 'dart:math';

Iterable<int> chinchiroSync() sync* {
  final random = Random();

  for (var i = 0; i < 3; ++i) {
    sleep(Duration(seconds: 1));
    yield random.nextInt(6) + 1;
  }
}

void main() {
  final stream = chinchiroSync();
  stream.forEach((int i) => print(i));
}

実行結果

5
4
6

非同期(Stream<T>)の基礎

  • 目的

    • 何かデータを外部から取得する等の処理をしたい場合
  • ポイント

    • Streamに変化を加える関数(chinchiroAsync
      • Stream<int>を返す
      • async*をつける
    • main関数
      • main関数自体にasyncをつけ、値を待つためにawaitを利用する
        • awaitが無い場合は、for(int i in stream)のstream変数に以下エラー
          • The type 'Stream<int>' used in the 'for' loop must implement Iterable
import 'dart:math';

Stream<int> chinchiroAsync() async* {
  final random = Random();

  for (var i = 0; i < 3; ++i) {
    await Future.delayed(Duration(seconds: 1));
    yield random.nextInt(6) + 1;
  }
}

void main() async {
  final stream = chinchiroAsync();
  await for(int i in stream){
    print(i);
  }
}

実行結果

3
3
5
  • 上記のmain関数内のfor文をforEach文にした際は、awaitが無くても問題なく動作
    • 以下どちらでも結果は同様※何か違いがあるかもしれない
      • await stream.forEach((int i) => print(i));
      • stream.forEach((int i) => print(i));
import 'dart:math';

Stream<int> chinchiroAsync() async* {
  final random = Random();

  for (var i = 0; i < 3; ++i) {
    await Future.delayed(Duration(seconds: 1));
    yield random.nextInt(6) + 1;
  }
}

void main() async {
  final stream = chinchiroAsync();
  stream.forEach((int i) => print(i));
}

実行結果

3
3
3

非同期(Stream<T>)の応用

  • 複数回連続でダイスロールを行う場合
    • 「0」でダイスロールし、「-1」で終了の意味とする
import 'dart:math';

Stream<int> chinchiroAsync([int times = 3]) async* {
  final random = Random();
  int count = 0;

  while (true) {
    if (count == times) {
      yield -1; // = 'Finish!'
      break;
    }
    yield 0; // = 'Roll'
    for (var i = 0; i < 3; ++i) {
      await Future.delayed(Duration(seconds: 1));
      yield random.nextInt(6) + 1;
    }
    ++count;
  }
}

void main() async {
  final stream = chinchiroAsync();
  await for (int i in stream) {
    print(i);
  }
}

実行結果

0
5
5
1
0
2
3
6
0
1
3
4
-1
  • Streamの関数を複数に分ける場合(他のStreamを呼び出す場合)
    • yield*を用いて、Streamへの値渡しを一時停止することが可能
    • また、分割することで、可読性向上
import 'dart:math';

Stream<int> chinchiroAsync([int times = 3]) async* {
  int count = 0;

  while (true) {
    if (count == times) {
      yield -1; // = 'Finish!'
      break;
    }
    yield 0; // = 'Roll'
    yield* diceRoll();
    ++count;
  }
}

Stream<int> diceRoll() async* {
  final random = Random();
  for (var i = 0; i < 3; ++i) {
    await Future.delayed(Duration(seconds: 1));
    yield random.nextInt(6) + 1;
  }
}

void main() async {
  final stream = chinchiroAsync();
  await for (int i in stream) {
    print(i);
  }
}

実行結果

0
1
6
5
0
4
2
5
0
1
1
3
-1

非同期(Stream<T>)の便利なコンストラクタ

Stream<T>.value

  • 引数とした一つの情報('Chinchiro')を渡す(yieldする)Streamを作成
void main() async {
  final stream = Stream<String>.value('Chinchiro');
  print(await stream.first);
}

実行結果

Chinchiro
  • Stream<String>.value('Chinchiro')を引数としたFuture関数を自作し、受け取ったデータを処理
Future<void> printThings(Stream<String> data) async {
  await for (var x in data) {
    print(x);
  }
}

void main() async {
  await printThings(Stream<String>.value('Chinchiro'));
  print('TEST');
}

実行結果

Chinchiro
TEST
  • awaitをprintThings関数呼び出し時に利用しないと、先に次のコードが実行される
Future<void> printThings(Stream<String> data) async {
  await for (var x in data) {
    print(x);
  }
}

void main() async {
  printThings(Stream<String>.value('Chinchiro'));
  print('TEST');
}

実行結果

TEST
Chinchiro

Stream<T>.periodic

  • 1秒ごとにStreamに0から始まる値(period)を渡す
void main() async {
  final stream = Stream<int>.periodic(
    const Duration(seconds: 1),
    (int period) => period,
  );
  await stream.forEach((int i) => print(i));
}

実行結果

0
1
2
3
4
5
6
7
8
9
10
・
・
・
【省略】
  • 1秒ごとにStreamに1から6までのランダム値を渡す
    • ※この場合、period変数は特に利用していない
import 'dart:math';

void main() async {
  final random = Random();
  final stream = Stream<int>.periodic(
    const Duration(seconds: 1),
    (period) => random.nextInt(6) + 1,
  );
  await stream.forEach((int i) => print(i));
}

実行結果

1
4
4
1
2
1
6
・
・
・
【省略】

Stream<T>.fromIterable

  • リスト内の数値をStreamに渡す
import 'dart:math';

void main() async {
  int random() => Random().nextInt(6) + 1;
  final stream = Stream<int>.fromIterable(
    <int>[
      random(),
      random(),
      random(),
    ],
  );
  await stream.forEach((int i) => print(i));
}

実行結果

5
3
6

Stream<T>.fromFuture

  • FutureオブジェクトをStreamに渡す
import 'dart:math';

void main() async {
  String random() => (Random().nextInt(6) + 1).toString();
  final stream = Stream<String>.fromFuture(
      Future<String>.value(random() + random() + random()));
  print(await stream.first);
}

実行結果

541

以下、おまけ程度

Streamを扱うのに便利なStreamController<T>

  • 実際にStreamを上手くに利用するためには、StreamControllerが便利

    • final stream = chinchiroAsync();のようにStreamを手動で開始するより便利なのが、StreamController<T>でStreamを制御する方法
      • Streamをlistenさせることで、データを簡単に受け取れる
      • StreamControllerによって、必要な種類と数のStreamを分かりやすく扱える
  • サンプル①

    • StreamController
      • プレイヤーが二人と想定し、二つ用意
    • Input
      • サイコロの結果をaddにより、Streamにデータを渡す
    • Output
      • listen関数を利用することで、Streamからデータを都度受け取れる
import 'dart:async';
import 'dart:math';

void main() {
  String random() => (Random().nextInt(6) + 1).toString();
  String diceData() => random() + random() + random();

  // StreamController
  final controllerPlayer1 = StreamController<String>();
  final controllerPlayer2 = StreamController<String>();

  // Input(add or sink.add)
  controllerPlayer1.add(diceData());
  controllerPlayer1.add(diceData());
  controllerPlayer2.add(diceData());
  controllerPlayer2.add(diceData());

  // Output(listen)
  controllerPlayer1.stream.listen((data) {
    print('Player1:Roll');
    print(data);
  });
  controllerPlayer2.stream.listen((data) {
    print('Player2:Roll');
    print(data);
  });
}

実行結果

Player1:Roll
264
Player2:Roll
511
Player1:Roll
456
Player2:Roll
216
  • サンプル②
    • StreamController
      • StreamController<T>プロパティのstreamをreturnする関数を利用する場合
      • 【備考】StreamControllerのコンストラクタ
        • StreamController({void onListen(), void onPause(), void onResume(), FutureOr<void> onCancel(), bool sync: false})
    • Input
      • サイコロの結果をaddにより、Streamにデータを渡す
        • streamをreturnする関数に与えた秒間隔毎に最大回数までadd
    • Output
      • listen関数を利用することで、Streamからデータを都度受け取れる
import 'dart:async';
import 'dart:math';

Stream<String> chinchiro(Duration interval, [int times]) {
  String random() => (Random().nextInt(6) + 1).toString();
  String diceData() => random() + random() + random();

  // StreamController
  StreamController<String> controller;
  Timer timer;
  int count = 0;

  void tick(Timer timer) {
    count++;
    // Input(add or sink.add)
    controller.add(diceData());
    if (count == times) {
      timer.cancel();
      controller.close();
    }
  }

  void startTimer() {
    timer = Timer.periodic(interval, tick);
  }

  void stopTimer() {
    if (timer != null) {
      timer.cancel();
      timer = null;
    }
  }

  controller = StreamController<String>(
    onListen: startTimer,
    onPause: stopTimer,
    onResume: startTimer,
    onCancel: stopTimer,
  );

  return controller.stream;
}

void main() {
  final stream = chinchiro(const Duration(seconds: 1), 3);
  // Output(listen)
  stream.listen(print);
}

実行結果

243
211
145
  • サンプル③
    • StreamController
      • StreamController<T>を持つ、クラスを利用する場合
      • controller.streamを渡せるようにgetter追加
    • Input
      • サイコロの結果をaddにより、Streamにデータを渡す
        • 1秒間隔毎に最大回数までadd
    • Output
      • listen関数を利用することで、Streamからデータを都度受け取れる
import 'dart:async';
import 'dart:math';

class Chinchiro {
  final int times;
  String random() => (Random().nextInt(6) + 1).toString();
  String diceData() => random() + random() + random();

  Timer timer;
  int count;
  // StreamController
  StreamController<String> controller;

  Chinchiro({this.times = 3}) {
    count = 0;
    controller = StreamController<String>(
      onListen: startTimer,
      onPause: stopTimer,
      onResume: startTimer,
      onCancel: stopTimer,
    );
  }

  Stream<String> get stream => controller.stream;

  void tick(Timer timer) {
    count++;
    // Input(add or sink.add)
    controller.add(diceData());
    if (count == times) {
      timer.cancel();
      controller.close();
    }
  }

  void startTimer() {
    timer = Timer.periodic(const Duration(seconds: 1), tick);
    count = 0;
  }

  void stopTimer() {
    timer?.cancel();
    controller.close();
  }
}

void main() {
  final stream = Chinchiro().stream;
  // Output(listen)
  stream.listen(print);
}

実行結果

631
523
132

Discussion