Dartで非同期処理をやってみる②
以前書いていたQiitaの記事を使っております🙇♂️
Streamとは?
公式のドキュメントのリンク
非同期プログラミング:ストリームについて
シンク< T >クラスについて リッスンメソッドについて ストリームコントローラーについて英単語自体を直訳すると、流れ となります。訳通り、dartのstreamはある場所からある場所へ値を流す機能です。
Javaの本では、川の流れだとか例えていた。
こんな例えもあった。
Streamとは、「ある値を入れて」、「ある値を出す」仕組みです。
この例えの方が、プログラミングでは、分かりやすいと思う。
StreamControllerにデータを追加して、listenして使います。
- 作業用のフォルダを作成
- ターミナルで、dart create dart-プロジェクト名を入力する。
※DartPadでもいいですよ。僕は、動作が遅いのとコードの保管機能が弱いので、VScodeを使います😅
dart create dart-practice
- Visual Studio Codeでプロジェクトを開く。
- こちらで、コーディングを行なっていきます。
import 'dart:async';
main() {
final controller = StreamController();
controller.sink.add(1);
controller.sink.add(2);
controller.stream.listen((value){
print(value);
});
}
実行するには、Run|Debugのどちらかを押すか、右上の虫のボタンを押す。ファイルを右クリックして、Start Debugginを押しても実行できます。
Flutterでも同じことができる。
実行結果
1
2
Exited
sinkとは?
公式を翻訳すると
データの一般的な宛先。
複数のデータ値をシンクに入れることができ、
利用できるデータがなくなったら、シンクを閉じる必要があります。
これは、他のデータ受信者が実装できる汎用インターフェースです。
sink.の後にメソッドで、追加をするadd(T data)と閉じるclose()がある。
今使ってみたのは、追加をするaddですね。
add(値)こんな仕組みになっているようですね。
listenとは?
listen関数を利用することで、Streamからデータを都度受け取れる
公式を翻訳すると分かりずらい?
forEachのような繰り返し処理をしている。
int型だけではなく、あらゆる値を出し入れすることができます。
import 'dart:async';
main() {
final controller = StreamController();
controller.sink.add(1);
controller.sink.add(2.0);
controller.sink.add('Hello');
controller.sink.add([10, 20.0, 'World']);
controller.sink.add(null);
controller.stream.listen((value){
print(value);
});
}
実行結果
1
2.0
Hello
[10, 20.0, World]
null
自分でプログラムを考えて実験してみた!
import 'dart:async';
main() {
// controllerという定数を定義、finalなので代入はできる。変数といった方がいいか...
final controller = StreamController();
// controllerに値を追加していく、List型出ないと複数の型を扱えない、エラーが出る
controller.sink.add("こんにちは!");
// 計算ができた!、足し算をする
controller.sink.add(1 + 1 + 1);
// 掛け算してみる
controller.sink.add(2 * 2);
// 掛け算して、余の数を出す。2×2は4で、4÷3は1になる。
controller.sink.add(2 * 2 % 3);
// 配列を使ってみる。Dクラスの平均点を配列で作ってみる。
controller.sink.add(["Dクラス", 68, true]);
// String、int、bool型が入っている。実際は成績が悪いので、新しく作る。
controller.sink.add(["Dクラス", 48, false]);
// forEachのように、controllerという変数ですね。これを繰り返し処理している。
controller.stream.listen((value) {
print(value);
});
}
実行結果
こんにちは!
3
4
1
[Dクラス, 68, true]
[Dクラス, 48, false]
Streamを返す関数を定義する
forEachを使った方法があるそうですが、async関数を使うのが一般的な方法のようです。
streamとasync
async関数の中では「await for」を使ってStreamの値を取り出すことができます。
また、「async*」関数の中で「yield」キーワードを使うことで、返り値のStreamに値を流すことができます。
async*をつけると、関数の戻り値がStreamになります。似たものにasyncがありますが、asyncは戻り値をFutureにするものです。この2つは似て非なるものなので、しっかり区別しましょう。と参考にした昔の記事には書いてありました。
import 'dart:async';
Future<int> sumStream(Stream<int> stream) async {
var sum = 0;
await for (var value in stream) {
sum += value;
}
return sum;
}
Stream<int> countStream(int to) async* {
for (int i = 1; i <= to; i++) {
yield i;
}
}
main() async {
var stream = countStream(10);
var sum = await sumStream(stream);
print(sum); // 55
}
実行結果
55
型に関係なく、以下のように使うこともできます。
import 'dart:async';
Future<List> displayStream(Stream<String> stream) async {
List<String> arr = [];
await for (var value in stream) {
arr.add(value + '-inside-stream');
}
return arr;
}
Stream<String> passStream(List to) async* {
for(var v in to){
yield v;
}
}
main() async {
var stream = passStream(['a', 'b', 'c', 'd']);
var str_1 = await displayStream(stream);
print(str_1);
}
でも、参考にしたこのソースコードだとエラーが出たので、修正しないと使えませんでした🤔
import 'dart:async';
// dynamic型にしないとエラー出る!
Future<List> displayStream(Stream<dynamic> stream) async {
List<String> arr = [];
await for (var value in stream) {
arr.add("配列の中身を表示: " + value);
}
return arr;
}
// dynamic型にしないとエラー出る!
Stream<dynamic> passStream(List to) async* {
for(var v in to){
yield v;
}
}
main() async {
var stream = passStream(['a', 'b', 'c', 'd']);
var str_1 = await displayStream(stream);
print(str_1);
}
実行結果
[配列の中身を表示: a, 配列の中身を表示: b, 配列の中身を表示: c, 配列の中身を表示: d]
もっと簡単な書き方があったでもこの書き方古いみたいですね?
import 'dart:async';
main() {
final members = ["Kboyさん", "Aoiさん", "kosukeさん"];
toSquared(members).listen((val) {
print(val);
});
}
Stream<String> toSquared(List<String> members) async* {
for (String n in members) {
yield n;
}
}
実行結果
Kboyさん
Aoiさん
kosukeさん
現在はこの書き方が推奨されている?、Null safety対応されているみたいです。
公式ドキュメント
コードを修正
import 'dart:async';
main() {
final members = ["Kboyさん", "Aoiさん", "kosukeさん"];
// コードを修正: toSquared -> sqrt
sqrt(members).listen((val) {
print("sqrtで表示: " + val);
});
}
// コードを修正: toSquared -> sqrt
Stream<String> sqrt(List<String> members) async* {
for (String n in members) {
yield n;
}
}
実行結果
sqrtで表示: Kboyさん
sqrtで表示: Aoiさん
sqrtで表示: kosukeさん
例外処理を行うこともできます。
例外は、「onError」でcatchします。
import 'dart:async';
// 例外処理を行うこともできます。
void addLessThanFive(StreamController controller, int value) {
// 5未満までデータを追加する
if (value < 5) {
controller.sink.add(value);
} else {
controller.sink.addError(StateError('$value is not less than 5'));
}
}
main() {
final controller = StreamController();
addLessThanFive(controller, 1);
addLessThanFive(controller, 2);
addLessThanFive(controller, 3);
addLessThanFive(controller, 4);
addLessThanFive(controller, 5);
controller.stream.listen((value) {
print(value);
}, onError: (error) {
print(error);
print("例外処理が発生しました!");
});
}
実行結果
1
2
3
4
Bad state: 5 is not less than 5
例外処理が発生しました!
Streamをcloseして、その時に行う処理を書くこともできます。
「onDone」という項目で、closeした時の処理を行います。
import 'dart:async';
void addLessThanFive(StreamController controller, int value) {
// 5未満まで値を追加して、5を超えると例外処理が発生して、closeする
if (value < 5) {
controller.sink.add(value);
} else {
controller.sink.addError(StateError('$value is not less than 5'));
}
}
// Streamをcloseします。
main() {
final controller = StreamController();
addLessThanFive(controller, 1);
addLessThanFive(controller, 2);
addLessThanFive(controller, 3);
addLessThanFive(controller, 4);
addLessThanFive(controller, 5);
controller.close();
controller.stream.listen((value) {
print(value);
}, onError: (error) {
print(error);
}, onDone: () {
print('controller was closed');
print('Streamをcloseしました');
});
}
実行結果
1
2
3
4
Bad state: 5 is not less than 5
controller was closed
Streamをcloseしました
やってみた感想
自分でプログラムを動かしてみないと、サンプル通りに動かなかったりするので、調べながらやってみました。ストリームコントローラーはこんなものかなとなんとなく理解できました。
Discussion