🎯

rxdart SwitchMapStreamTransformer

2025/01/14に公開

SwitchMapStreamTransformer in Dart

rxdartが持っているSwitchMapStreamTransformerについて解説します。
https://pub.dev/documentation/rxdart/latest/rx/SwitchMapStreamTransformer-class.html

概要

SwitchMapStreamTransformerは、ストリームの各要素を新しいストリームに変換し、最新のストリームからの出力のみを提供するストリーム変換器です。

特徴

  • 各要素を新しいストリームに変換
  • 新しいストリームが作成されると、以前のストリームは自動的にキャンセル
  • 最新のストリームからの出力のみを提供
  • 非同期APIからの最新の状態取得に最適

構文

Stream<T> transform(
  SwitchMapStreamTransformer<S, T>((S value) => Stream<T>)
)

基本的な使用例

1. 遅延処理の例

Stream.fromIterable([4, 3, 2, 1])
  .transform(SwitchMapStreamTransformer((i) =>
    Stream.fromFuture(
      Future.delayed(Duration(minutes: i), () => i)
    )))
  .listen(print); // 1のみ出力

2. APIリクエストの例

Stream<String> searchQuery;
searchQuery
  .transform(SwitchMapStreamTransformer((query) =>
    fetchSearchResults(query))) // 前のリクエストは自動的にキャンセル
  .listen((results) => print('最新の検索結果: $results'));

ユースケース

1. 検索機能の実装

final searchController = StreamController<String>();

searchController.stream
  .transform(SwitchMapStreamTransformer((String query) =>
    // 前の検索はキャンセルされ、最新の検索のみが実行される
    searchApi(query)
  ))
  .listen((results) => updateUI(results));

2. 位置情報の更新

locationUpdates
  .transform(SwitchMapStreamTransformer((location) =>
    // 前の天気情報リクエストはキャンセルされ、最新の位置の天気のみを取得
    fetchWeatherInfo(location)
  ))
  .listen((weather) => updateWeatherUI(weather));

flatMapとの違い

flatMap(すべての結果を出力)

Stream.fromIterable([3, 2, 1])
  .flatMap((i) => 
    Stream.fromFuture(
      Future.delayed(Duration(seconds: i), () => i)
    ))
  .listen(print); // 3, 2, 1 の順で出力

switchMap(最新の結果のみ出力)

Stream.fromIterable([3, 2, 1])
  .transform(SwitchMapStreamTransformer((i) =>
    Stream.fromFuture(
      Future.delayed(Duration(seconds: i), () => i)
    )))
  .listen(print); // 1 のみ出力

実践的な例

import 'dart:async';

import 'package:rxdart/rxdart.dart';

// 基本的な使用例
void basicExample() {
  print('基本的な使用例:');
  Stream.fromIterable([3, 2, 1])
      .transform(SwitchMapStreamTransformer((i) =>
          Stream.fromFuture(Future.delayed(Duration(seconds: i), () => i))))
      .listen(print); // 1のみ出力
}

// APIリクエストのシミュレーション
Future<List<String>> mockSearchApi(String query) async {
  await Future.delayed(Duration(seconds: 1));
  return ['$query の結果1', '$query の結果2'];
}

// 検索機能の実装例
void searchExample() {
  print('\n検索機能の例:');
  final searchController = StreamController<String>();

  searchController.stream
      .transform(SwitchMapStreamTransformer(
          (String query) => Stream.fromFuture(mockSearchApi(query))))
      .listen((results) => print('検索結果: $results'));

  // 検索クエリの送信
  searchController.add('検索1');
  searchController.add('検索2'); // 検索1はキャンセルされる

  // 3秒後にControllerを閉じる
  Future.delayed(Duration(seconds: 3), () {
    searchController.close();
  });
}

// flatMapとの比較例
void comparisonExample() {
  print('\nflatMapとの比較:');

  print('flatMap:');
  Stream.fromIterable([3, 2, 1])
      .asyncExpand((i) =>
          Stream.fromFuture(Future.delayed(Duration(seconds: i), () => i)))
      .listen(print); // 3, 2, 1 の順で出力

  print('switchMap:');
  Stream.fromIterable([3, 2, 1])
      .transform(SwitchMapStreamTransformer((i) =>
          Stream.fromFuture(Future.delayed(Duration(seconds: i), () => i))))
      .listen(print); // 1 のみ出力
}

// エラーハンドリングの例
void errorHandlingExample() {
  print('\nエラーハンドリングの例:');
  Stream.fromIterable([2, 1, 0])
      .transform(SwitchMapStreamTransformer(
          (i) => Stream.fromFuture(Future.delayed(Duration(seconds: 1), () {
                if (i == 0) throw Exception('ゼロ除算エラー');
                return 10 ~/ i;
              }))))
      .listen(
        print,
        onError: (error) => print('エラーが発生: $error'),
      );
}

void main() {
  basicExample();
  searchExample();
  comparisonExample();
  errorHandlingExample();

  // メインの実行を維持するための遅延
  Future.delayed(Duration(seconds: 10), () {
    print('\n全ての例が完了しました。');
  });
}

ベストプラクティス

  1. エラー処理の実装

    stream
      .transform(SwitchMapStreamTransformer((value) =>
        fetchData(value)
          .handleError((error) => handleError(error))
      ))
      .listen(
        (data) => processData(data),
        onError: (error) => handleError(error),
      );
    
  2. タイムアウトの設定

    stream
      .transform(SwitchMapStreamTransformer((value) =>
        fetchData(value)
          .timeout(
            Duration(seconds: 5),
            onTimeout: () => defaultValue,
          )
      ))
      .listen(print);
    

注意点

  1. メモリリーク防止

    • StreamControllerを使用する場合は、必ずdisposeメソッドで解放
    • 不要になったストリームのサブスクリプションはキャンセル
  2. エラー処理

    • 変換後のストリームでのエラー処理を忘れずに実装
    • タイムアウト処理の追加を検討
  3. パフォーマンス

    • 不必要な変換を避ける
    • 適切なバッファリングの実装

Discussion