📡

Dartで非同期処理をやってみる②

2022/04/26に公開約7,000字

以前書いていたQiitaの記事を使っております🙇‍♂️

Streamとは?

公式のドキュメントのリンク

非同期プログラミング:ストリームについて

https://dart.dev/tutorials/language/streams
シンク< T >クラスについて
https://api.flutter.dev/flutter/dart-core/Sink-class.html
リッスンメソッドについて
https://api.flutter.dev/flutter/dart-async/Stream/listen.html
ストリームコントローラーについて
https://api.dart.dev/stable/2.16.2/dart-async/StreamController-class.html

英単語自体を直訳すると、流れ となります。訳通り、dartのstreamはある場所からある場所へ値を流す機能です。
Javaの本では、川の流れだとか例えていた。

こんな例えもあった。
Streamとは、「ある値を入れて」、「ある値を出す」仕組みです。
この例えの方が、プログラミングでは、分かりやすいと思う。

StreamControllerにデータを追加して、listenして使います。

  1. 作業用のフォルダを作成
  2. ターミナルで、dart create dart-プロジェクト名を入力する。
    ※DartPadでもいいですよ。僕は、動作が遅いのとコードの保管機能が弱いので、VScodeを使います😅
dart create dart-practice
  1. Visual Studio Codeでプロジェクトを開く。
  2. こちらで、コーディングを行なっていきます。
import 'dart:async';

main() {
  final controller = StreamController();
  controller.sink.add(1);
  controller.sink.add(2);
    
  controller.stream.listen((value){
    print(value);
  });
}

実行するには、Run|Debugのどちらかを押すか、右上の虫のボタンを押す。ファイルを右クリックして、Start Debugginを押しても実行できます。
Flutterでも同じことができる。

スクリーンショット 2022-04-17 9.08.47.png

実行結果

1
2
Exited

sinkとは?

公式を翻訳すると
データの一般的な宛先。

複数のデータ値をシンクに入れることができ、
利用できるデータがなくなったら、シンクを閉じる必要があります。

これは、他のデータ受信者が実装できる汎用インターフェースです。

sink.の後にメソッドで、追加をするadd(T data)と閉じるclose()がある。
今使ってみたのは、追加をするaddですね。

add(値)こんな仕組みになっているようですね。

listenとは?

listen関数を利用することで、Streamからデータを都度受け取れる
公式を翻訳すると分かりずらい?
forEachのような繰り返し処理をしている。

int型だけではなく、あらゆる値を出し入れすることができます。

import 'dart:async';

main() {
  final controller = StreamController();
  controller.sink.add(1);
  controller.sink.add(2.0);
  controller.sink.add('Hello');
  controller.sink.add([10, 20.0, 'World']);
  controller.sink.add(null);
    
  controller.stream.listen((value){
    print(value);
  });
}

実行結果

1
2.0
Hello
[10, 20.0, World]
null

自分でプログラムを考えて実験してみた!

import 'dart:async';

main() {
  // controllerという定数を定義、finalなので代入はできる。変数といった方がいいか...
  final controller = StreamController();
  // controllerに値を追加していく、List型出ないと複数の型を扱えない、エラーが出る
  controller.sink.add("こんにちは!");
  // 計算ができた!、足し算をする
  controller.sink.add(1 + 1 + 1);
  // 掛け算してみる
  controller.sink.add(2 * 2);
  // 掛け算して、余の数を出す。2×2は4で、4÷3は1になる。
  controller.sink.add(2 * 2 % 3);
  // 配列を使ってみる。Dクラスの平均点を配列で作ってみる。
  controller.sink.add(["Dクラス", 68, true]);
  // String、int、bool型が入っている。実際は成績が悪いので、新しく作る。
  controller.sink.add(["Dクラス", 48, false]);
  // forEachのように、controllerという変数ですね。これを繰り返し処理している。
  controller.stream.listen((value) {
    print(value);
  });
}

実行結果

こんにちは!
3
4
1
[Dクラス, 68, true]
[Dクラス, 48, false]

Streamを返す関数を定義する

forEachを使った方法があるそうですが、async関数を使うのが一般的な方法のようです。

streamとasync

async関数の中では「await for」を使ってStreamの値を取り出すことができます。

また、「async*」関数の中で「yield」キーワードを使うことで、返り値のStreamに値を流すことができます。

async*をつけると、関数の戻り値がStreamになります。似たものにasyncがありますが、asyncは戻り値をFutureにするものです。この2つは似て非なるものなので、しっかり区別しましょう。と参考にした昔の記事には書いてありました。

import 'dart:async';

Future<int> sumStream(Stream<int> stream) async {
  var sum = 0;
  await for (var value in stream) {
    sum += value;
  }
  return sum;
}

Stream<int> countStream(int to) async* {
  for (int i = 1; i <= to; i++) {
    yield i;
  }
}

main() async {
  var stream = countStream(10);
  var sum = await sumStream(stream);
  print(sum); // 55
}

実行結果

55

型に関係なく、以下のように使うこともできます。

import 'dart:async';

Future<List> displayStream(Stream<String> stream) async {
  List<String> arr = [];
  await for (var value in stream) {
    arr.add(value + '-inside-stream');
  }
  
  return arr;
}

Stream<String> passStream(List to) async* {
  for(var v in to){
    yield v;
  }
}

main() async {
  var stream = passStream(['a', 'b', 'c', 'd']);
  var str_1 = await displayStream(stream);
  print(str_1);
}

でも、参考にしたこのソースコードだとエラーが出たので、修正しないと使えませんでした🤔

import 'dart:async';
// dynamic型にしないとエラー出る!
Future<List> displayStream(Stream<dynamic> stream) async {
  List<String> arr = [];
  await for (var value in stream) {
    arr.add("配列の中身を表示: " + value);
  }

  return arr;
}
// dynamic型にしないとエラー出る!
Stream<dynamic> passStream(List to) async* {
  for(var v in to){
    yield v;
  }
}

main() async {
  var stream = passStream(['a', 'b', 'c', 'd']);
  var str_1 = await displayStream(stream);
  print(str_1);
}

実行結果

[配列の中身を表示: a, 配列の中身を表示: b, 配列の中身を表示: c, 配列の中身を表示: d]

もっと簡単な書き方があったでもこの書き方古いみたいですね?

import 'dart:async';

main() {
  final members = ["Kboyさん", "Aoiさん", "kosukeさん"];

  toSquared(members).listen((val) {
    print(val);
  });
}

Stream<String> toSquared(List<String> members) async* {
  for (String n in members) {
    yield n;
  }
}

実行結果

Kboyさん
Aoiさん
kosukeさん

現在はこの書き方が推奨されている?、Null safety対応されているみたいです。
公式ドキュメント

https://api.flutter.dev/flutter/dart-math/sqrt.html

コードを修正

import 'dart:async';

main() {
  final members = ["Kboyさん", "Aoiさん", "kosukeさん"];
  // コードを修正: toSquared -> sqrt
  sqrt(members).listen((val) {
    print("sqrtで表示: " + val);
  });
}

// コードを修正: toSquared -> sqrt
Stream<String> sqrt(List<String> members) async* {
  for (String n in members) {
    yield n;
  }
}

実行結果

sqrtで表示: Kboyさん
sqrtで表示: Aoiさん
sqrtで表示: kosukeさん

例外処理を行うこともできます。
例外は、「onError」でcatchします。

import 'dart:async';

// 例外処理を行うこともできます。
void addLessThanFive(StreamController controller, int value) {
  // 5未満までデータを追加する
  if (value < 5) {
    controller.sink.add(value);
  } else {
    controller.sink.addError(StateError('$value is not less than 5'));
  }
}

main() {
  final controller = StreamController();
  addLessThanFive(controller, 1);
  addLessThanFive(controller, 2);
  addLessThanFive(controller, 3);
  addLessThanFive(controller, 4);
  addLessThanFive(controller, 5);

  controller.stream.listen((value) {
    print(value);
  }, onError: (error) {
    print(error);
    print("例外処理が発生しました!");
  });
}

実行結果

1
2
3
4
Bad state: 5 is not less than 5
例外処理が発生しました!

Streamをcloseして、その時に行う処理を書くこともできます。
「onDone」という項目で、closeした時の処理を行います。

import 'dart:async';

void addLessThanFive(StreamController controller, int value) {
  // 5未満まで値を追加して、5を超えると例外処理が発生して、closeする
  if (value < 5) {
    controller.sink.add(value);
  } else {
    controller.sink.addError(StateError('$value is not less than 5'));
  }
}

// Streamをcloseします。
main() {
  final controller = StreamController();
  addLessThanFive(controller, 1);
  addLessThanFive(controller, 2);
  addLessThanFive(controller, 3);
  addLessThanFive(controller, 4);
  addLessThanFive(controller, 5);
  controller.close();

  controller.stream.listen((value) {
    print(value);
  }, onError: (error) {
    print(error);
  }, onDone: () {
    print('controller was closed');
    print('Streamをcloseしました');
  });
}

実行結果

1
2
3
4
Bad state: 5 is not less than 5
controller was closed
Streamをcloseしました

やってみた感想

自分でプログラムを動かしてみないと、サンプル通りに動かなかったりするので、調べながらやってみた、ストリームコントローラーはこんなものかとなんとなく理解できた。
これ理解しないと、Provider、riverpod使っあFirebase使ったアプリ作るときに、人が書いたソースコード読めないのが理解できた。

Discussion

ログインするとコメントできます