🌟

Flutter の Stream から見る Reactive Programming の入り口(2/N)

2023/03/31に公開

1章: https://zenn.dev/toridori/articles/75bc9e96237c83
2章: この記事です
3章: ちょっと待ってね

はじめに

Flutterを開発していると、Streamというクラスによく出くわします。Firestore や FCM の SDK がインターフェイスとして公開していたり、BLoC パターンでも頻出するアイツです。

Stream の理解で躓く方、多いんじゃないかと思ってます。特徴を掴もうにも、「非同期なデータのシーケンスを扱える」だとか、「まるで川のように上流から下流に向けて値を流す」だとか、いや理解はできるんだけど腹落ちしないとうか。実装例を見たって「それ Stream 使わなくても良くね?」ってなる感じ。

先に言うと、Stream を完全理解するには Reactive Programming という世界を先に理解することが必要です。世界です世界。パラダイムとか新時代でもいいです。ただ、これの完全理解には時間がかかるし、完全理解したとしても Flutter 開発への寄与率は少なく、コスパが良くないです。多分。

本記事では、そんなあなたに腹落ちを与えるべく、Stream から見る Reactive Programming の入り口を解説します。本記事を読み終えても Stream マスターになれるわけではありませんが、ちょっとだけ楽しく Stream を扱えるようになっていて欲しいなと思っています。よろしくお願いします。

1. 背景から Reactive Programming を知る

第1章では、Reactive Programming を背景から見ていきました。なんか雰囲気は分かったけど雰囲気しか分かってない状態かと思います。本章では、データの性質からもう少し Reactive Programming を深堀ります。

https://zenn.dev/toridori/articles/75bc9e96237c83

2. データの性質から Reactive Programming を深ぼる

Reactive Programming では、シーケンス化されたデータを扱います。本章では、このシーケンス化されたデータ(以降ストリームと呼びます)と、普段の Flutter 開発[1]で使うデータが、いかに似ていて拡張概念であるかを説明します。

なお、本章の説明は ReactiveX のintroduction と重複しています。このページを読んで「あーなるほどね」を得られる方は飛ばしちゃっても良いと思います。

List も Future も Stream も相互に変換できる

最初に、我々が普段扱うデータに対しておさらいです。データにおける大きな特徴の一つとして、「単数か複数か」 があります。複数のデータは List クラスを使うでしょう。単数のデータなら、特に考えることもなくただの変数で扱うでしょう。

final scalar = 2; // 単数のデータ
final list = [1,2,3]; // 複数のデータ

他にも、「同期か非同期か」 という特徴もあります。非同期的に得られるデータには Future クラスを使うでしょう。

final scalar = 2; // 同期的に得られるデータ
final future = Future(() => 2); // 非同期的に得られるデータ

ここで、「同期的に得られる」という特徴と「単数である」という特徴は異なる側面から見ているだけで、同じデータを指すことができます。つまり表にまとめるとこんな感じです。

単数である 複数である
同期的に得られる ただの変数 List
非同期的に得られる Future ???

さて、??? の部分には何が入るでしょうか? はい。ストリームです。ストリームは「非同期的に得られる複数の値」を表現する概念です。 dart では Stream クラスが該当します。

単数である 複数である
同期的に得られる ただの変数 List
非同期的に得られる Future Stream!!!

ここで、一見冗長な次の変数を考えます。1つの値を表現するために List を使用している冗長なデータですが、伝えたいのは、単数のデータは複数のデータに変換できる という点です。

// 単数データ(2)を複数データ([2])に変換している
final list = [2];

同じような変換は Future と Stream に対しても行えます。現に、Stream にはfromFuturesという factory が用意されており、任意の数の Future から Stream を作成することができます

// 単数の非同期データを複数の非同期データに変換している
final stream = Stream.fromFutures([
  Future(() => 1),
]);

逆に、複数のデータを単数に変換する際はどうなるでしょうか。例えば List なら、「先頭要素を一つ取り出す」という方法が考えられます。

// 複数データを単数データに変換している
final scalar = [1,2,3].first; // 1が得られる

同じように、Stream にも「先頭要素を一つ取り出す」という操作があります。次元を落とす作業にも似ていますね。

// 複数の非同期データを単数の非同期データに変換している
final future = Stream.fromFutures([
  Future(() => 1),
  Future(() => 2),
  Future(() => 3),
]).first;

firstの戻り値は Future 型である点に注意して下さい。first はあくまで「複数を単数にする」ための操作であって、非同期を同期に変換する効果はありません。「複数を単数にしたいし非同期も同期にしたい!」って場合は await をつけるのが良いでしょう。

// 複数の非同期データを単数の非同期データに変換した後、同期データへの変換もしている
final scalar = await Stream.fromFutures([
  Future(() => 1),
  Future(() => 2),
  Future(() => 3),  
]).first;

まぁそんな使うものじゃないですが。あくまで理解を深めてほしいための説明でした。

このように、先に示した表の横方向に関して、相互に変換することが可能です。察しのいい方であれば気づいているかもしれませんが、縦方向に関しても相互に変換可能です。

// 同期的なデータを Future に変換する
final future = Future(() => 2); 

// Future を 同期的なデータに変換する
final scalar = await Future(() => 1);

// List を Stream に変換する
final stream = Stream.fromIterable([1,2,3]);

// Stream を List に変換する
final list = await Stream.toList();

つまりこう。

単数である 複数である
同期的に得られる ただの変数 ↔️ List
↕️ ↕️
非同期的に得られる Future ↔️ Stream

ででーん。

一応補足すると、Stream.toList()安易な利用は控えたほうが良いです。List と違い、Stream は終わりのないシーケンスを表現することもできるので、このような Stream に対して List への変換を実行すると世界が止まります。

以上、Stream が相互変換可能であることの説明でした。「よーわからんやつ」から 「List の拡張系であり、Future の拡張系でもあり、相互変換可能なやつ」になってくれているでしょうか。

Future とStream はエラー面でもちょっと似ている

ところで、Futureには、エラーの概念があります。一般的には、Future を try-catch で囲って例外をハンドリングします。

try { 
  await fetch();
} catch (e) {
  //エラーが発生したときの処理
}

await しない場合には Future.catchError でハンドリングできます。このエラーは、Stream においてはどのように扱われるのでしょうか。

試しに、1秒毎に hoge, 1, 2, hoge, 4, 5, hoge,...を出力する Stream を考えます。

final stream = Stream.periodic(const Duration(seconds: 1), (count) {
  if (count % 3 == 0) {
    return "hoge";
  }
  return count.toString();
});

stream.listen((x) => print(x));

これを実行すると、まずは hogeが、1秒後に1が、もう1秒後に2が、もう一秒後にhogeが...という出力が始まります。これは、出力を止める方法が存在しない、プロセスが終了するまで無限に続く処理です。

listenについては、「Future でいう await みたいなもの」と考えて下さい。非同期的な単一の値を利用するには await が必要でした。非同期的な複数の値を1つずつ利用するにはlistenを使用します。

さて、上記のStreamにエラーを仕込みましょう。次を実行するとどうなるでしょうか。

final stream = Stream.periodic(const Duration(seconds: 1), (count) {
  if (count % 3 == 0) {
    throw Exception("致命的なエラーが発生しました");
  }
  return count.toString();
});

stream.listen((x) => print(x));

dartpadで実行すると、次のような出力が得られます。

Uncaught Error: Exception: 致命的なエラーが発生しました
1
2
Uncaught Error: Exception: 致命的なエラーが発生しました
4
5
Uncaught Error: Exception: 致命的なエラーが発生しました
7
...

3回に1回例外が発生しているものの、処理が止まることは有りません。もし例外を補足したい場合は、handleErrorというオペレータが利用できます。

final stream = Stream.periodic(const Duration(seconds: 1), (count) {
  if (count % 3 == 0) {
    throw Exception("致命的なエラーが発生しました");
  }
  return count.toString();
});

stream
  .handleError((st, e) => print("致命的なエラーが起きたけどなんとかしたよ!"))
  .listen((x) => print(x));

出力はこう。エラー発生時の数字(3の倍数)は引き続き出力できませんが、代わりの処理を挟むことができます。BugsnagやCrashlyticsに報告したい時はここですね。

致命的なエラーが起きたけどなんとかしたよ!
1
2
致命的なエラーが起きたけどなんとかしたよ!
4
5
致命的なエラーが起きたけどなんとかしたよ!
7
...

エラーに関してのまとめです。Stream の輪郭が見えてきましたか?

  1. Future 同様、Stream にもエラーの概念がある
  2. Future とは違い、エラーが起きても後続の処理は止まらない[2]
  3. Future でいう try-catch(catchError) は、Stream でいう handleError である

List と Stream は操作面でよく似ている

みなさんはList操作のメソッド使ってますか? よく使うのはこれらでしょうか。

メソッド名 説明
where 条件に一致する要素を取り除く
map 要素を変換する
take n番目以降の要素を取り除く
skip n番目までの要素を取り除く
firstWhere 条件に一致する最初の要素を取得する(単数への変換)
reduce 要素を圧縮する(単数への変換)

実はこれ、「シーケンスを宣言的に操作する」という点において、Reactive Programming とよく似ています。

Listの場合 Streamの場合 説明
where where 条件に一致する要素を取り除く
map map 要素を変換する
take take n番目以降の要素を取り除く
skip skip n番目までの要素を取り除く
firstWhere firstWhere 条件に一致する最初の要素を取得する(単数への変換)
reduce reduce 要素を圧縮する(単数への変換)

勢いで書いてるんですが列挙した全てで名称が一致していてびびりました。

おさらいになりますが、Stream も List もシーケンスです。同期か非同期かという違いはあるものの、これが影響しない範囲においては、操作方法がほとんど一緒です。

もちろん、Stream にしかない操作もあります。その1つとして、asyncMapを紹介します。

asyncMapは、名前の通り要素を変換する操作ですが、変換後の値が Future ならその完了を待ってから次に進むことができます。

つまり、例えば「各種RepositoryのIssuesを取得して一つの配列にまとめる」という狂気の処理があったとき、

    final allIssues = await Stream.fromIterable([
      "niwatly", 
      "flutter", 
      "rxdart"
    ])
      .asyncMap((x) => fetch(x)) // fetchはFuture型を返す非同期データ取得処理
      .reduce((a, b) => [...a, ...b]);

という書き方ができます。はたしてこれはスマートですか?

以上、Stream は List と同じ感覚で操作できるし、Stream ならではのものもあるよ、という話でした。

ここでは asyncMap しか紹介しませんでしたが、RxDart にまで手を伸ばすと Stream ならではの操作がもっとあるし、Stream の合成に手を付けてからが Reactive Programming の本番と言えます。List の合成ですら若干面倒なのに、Stream は List に時間軸を付け加えたものと言えるので、アプリリリース時の後方互換性考慮なんてちっぽけに見えるくらい Stream の合成はMPを消費します。数日経つと自分でも読めなくなります。

(余談)StreamController っていつ使うの?

これまでさんざん Stream のことを説明してきましたが、Flutter開発では StreamController というアイツもたまに見ると思います。StreamControllerの存在意義にかんたんに触れておきます。

簡潔に言うと、StreamControllerの出番はより複雑なStreamを設計したくなった時です。「1秒毎に〇〇する」だとか、「固定数のFutureを展開する」だとか、一言で説明できるような非同期処理ではあまり出番がないのですが、例えば、ユーザの操作を Stream で受け取るときなんかに有用です。

ユーザの操作が「非同期的に値の決まるシーケンスデータである」点にピンときますか? 次のようなユーザによる複数の操作があったとき、

  1. 00:00 プログラム初期化完了
  2. 00:00 ユーザはボタンAを押した
  3. 00:05 ユーザはもう一度ボタンをA押した
  4. 00:06 ユーザはもう一度ボタンをAを押した
  5. ...

これを初期完了時点から見ると、

  1. 直後に値の決まるクリックイベント
  2. 5秒後に値の決まるクリックイベント
  3. 6秒後に値の決まるクリックイベント

というイベントのシーケンスとみなすことができます。これらのイベント1つ1つを StreamController.sinkに流してあげることで、StreamController.streamから受け取ることができ、クリックに応じた追加の処理を行うことができます。

「onPressedに処理書いて終わりじゃん何がしたいの」と思われるかもしれませんが、その通りです。間違ってません正しい感性です。上記の仕様に有用性を感じるためには、

  1. Reactive Programming の世界に思想できている
  2. プロジェクト内の全ては Stream になっている

という前提が必要です。世知辛いですね。

StreamControllerの意義は他にもありますが、ここでは箇条書きでとどめます。「いくつかあるよ」という点だけ覚えておいてほしいです。

  • 複数の購読者を想定する時 (broadcast)
  • 明示的にStreamを終了させたい時

以上、データの性質から見る Reactive Programming でした。前章と比べると細かい話が増えましたが、List との差異、あるいは Future との差異から Streamの輪郭をお伝えしたかった次第です。

3. Flutter開発の日常から Reactive Programming に馴染む

第2章では、Reactive Programming をデータの性質から深堀りました。なんとなく形はわかってきたけど、まだ、普段のFlutter開発とは遠いなぁって印象かと思います。次章では、Flutter開発の日常風景を Reactive Programming という視点から覗いて、より実感をわかせていきたいと思います。

第3章は作成中です。公開したらここにURLを貼ります。


トリドリのプロダクト開発部では通年採用を実施しています!

ユーザの幸せについて真剣に議論したり、「やってみたい」で新しい技術に挑戦をしてみたり、気づいたら30分雑談していたり、ゆるく真面目に開発しています。もし興味を持ってもらえたら、こちらを読んでみてください!

https://toridoridevs.page.link/89eQ

脚注
  1. プログラミング全般の話ではありますが、本記事の想定読者はFlutter開発経験者なのでFlutterに焦点を当てて説明します。 ↩︎

  2. dart Tutorialいわく、エラーが発生した時点で停止するStreamの方が多いようです。実際、async*を使ったStreamの内部で例外を投げるとStreamが止まりますが、これ以外に例を見つけられていません... ↩︎

トリドリバコ

Discussion