🎛️

【Flutter】Completer を使って処理を管理する

2024/12/07に公開

初めに

今回は Completer を用いて処理を外部から操作する実装を行いたいと思います。
Completer を使えば既に実行されている処理に対して別のアクションを実行することができます。
具体的には処理のキャンセルなどができるため、ユーザーが誤って処理を行なってしまった際などにキャンセルをすることで余分な時間を取らせることなく処理の中断ができます。

記事の対象者

  • Flutter 学習者
  • Completer を使って処理を外部から管理する実装を行いたい方
  • 処理のキャンセルを実装したい方

目的

今回の目的は先述の通り、Completer の使用方法を把握することです。
サンプルとして非同期処理や Stream の処理をキャンセルしたり、APIのデータの取得をキャンセルしたりできるようにしていきます。

実装

実装は以下の手順で進めていきます。

  1. Future の処理のキャンセル
  2. Stream の処理のキャンセル
  3. API のデータ取得のキャンセル

1. Future の処理のキャンセル

まずは Future の処理のキャンセルを実装していきます。
コードは以下の通りです。

import 'dart:async';
import 'package:flutter/material.dart';
import 'package:flutter_hooks/flutter_hooks.dart';

/// カスタム例外クラス
class FetchCancelledException implements Exception {
  final String message;
  FetchCancelledException(this.message);

  
  String toString() => message;
}

class FutureDownloadService {
  Completer<void>? _downloadCompleter;

  /// ダウンロードを開始し、ダウンロード完了またはキャンセルを待機する
  Future<void> startDownload() {
    if (_downloadCompleter != null && !_downloadCompleter!.isCompleted) {
      // 既にダウンロードが実行中の場合は既存の Future を返す
      return _downloadCompleter!.future;
    }

    _downloadCompleter = Completer<void>();

    // ダウンロードプロセスをシミュレート
    Future.delayed(const Duration(seconds: 5)).then((_) {
      if (!_downloadCompleter!.isCompleted) {
        _downloadCompleter!.complete();
      }
    });

    return _downloadCompleter!.future;
  }

  /// ダウンロードをキャンセルする
  void cancelDownload() {
    if (_downloadCompleter != null && !_downloadCompleter!.isCompleted) {
      _downloadCompleter!.completeError(FetchCancelledException('ダウンロードがキャンセルされました。'));
    }
  }
}

class CancelableFutureScreen extends HookWidget {
  const CancelableFutureScreen({Key? key}) : super(key: key);

  
  Widget build(BuildContext context) {
    // DownloadService のインスタンスを作成
    final downloadService = useMemoized(() => FutureDownloadService());

    // その他のフック
    final isRunning = useState<bool>(false);
    final messages = useState('');
    final elapsedSeconds = useState<int>(0);

    // クリーンアップ処理
    useEffect(() {
      return () {
        downloadService.cancelDownload(); // ウィジェットが破棄される際にキャンセル信号を送る
      };
    }, [downloadService]);

    // タスクを開始する関数
    Future<void> startTask() async {
      isRunning.value = true;
      messages.value = '';
      elapsedSeconds.value = 0;

      // タイマーで経過時間をカウント
      final timer = Timer.periodic(const Duration(seconds: 1), (timer) {
        elapsedSeconds.value += 1;
      });

      try {
        // DownloadService のタスクを開始
        await downloadService.startDownload();

        // タスクが正常に完了した場合のメッセージ
        messages.value = 'ダウンロードが正常に完了しました。';
      } on FetchCancelledException catch (e) {
        // キャンセルされた場合にスナックバーを表示
        if (context.mounted) {
          ScaffoldMessenger.of(context).showSnackBar(
            SnackBar(content: Text(e.toString())),
          );
        }
      } catch (e) {
        if (context.mounted) {
          ScaffoldMessenger.of(context).showSnackBar(
            SnackBar(content: Text('予期しないエラーが発生しました: $e')),
          );
        }
      } finally {
        timer.cancel(); // タイマーを停止
        isRunning.value = false;
      }
    }

    // タスクをキャンセルする関数
    void cancelTask() {
      downloadService.cancelDownload(); // DownloadService を通じてタスクをキャンセル
    }

    // ボタンのラベルと動作を決定
    String buttonLabel = isRunning.value ? 'キャンセル' : '開始';
    VoidCallback? buttonAction = isRunning.value ? cancelTask : startTask;

    return Scaffold(
      appBar: AppBar(
        title: const Text('Cancel Future with DownloadService'),
      ),
      body: Padding(
        padding: const EdgeInsets.all(16.0),
        child: Center(
          child: Column(
            children: [
              ElevatedButton(
                onPressed: buttonAction,
                child: Text(buttonLabel),
              ),
              const SizedBox(height: 20),
              if (isRunning.value) ...[
                const CircularProgressIndicator(),
                const SizedBox(height: 8),
                Text(
                  '進行中: ${elapsedSeconds.value}秒',
                  style: const TextStyle(fontSize: 16),
                ),
              ],
              const SizedBox(height: 8),
              Text(messages.value),
            ],
          ),
        ),
      ),
    );
  }
}

これで実行すると以下の動画のように、タスクを途中で中断できるようになっています。

それぞれのコードについて詳しく見ていきます。

以下では Exception を継承しているカスタム型の例外である FetchCancelledException を作成しています。ユーザーが処理をキャンセルした際にこの例外を返すようにします。

class FetchCancelledException implements Exception {
  final String message;
  FetchCancelledException(this.message);

  
  String toString() => message;
}

以下ではダウンロードの処理をダミーで行う FutureDownloadService を実装しています。
startDownload の中で _downloadCompleter = Completer<void>(); とすることで Completer をインスタンス化しています。そして、5秒後に _downloadCompleter!.complete() とすることで時間のかかる処理の終了後に complete が呼び出されるようになっています。
また、 startDownload 自体は Completer が保持している値を _downloadCompleter!.future で返すようにしています。

class FutureDownloadService {
  Completer<void>? _downloadCompleter;

  /// ダウンロードを開始し、ダウンロード完了またはキャンセルを待機する
  Future<void> startDownload() {
    if (_downloadCompleter != null && !_downloadCompleter!.isCompleted) {
      // 既にダウンロードが実行中の場合は既存の Future を返す
      return _downloadCompleter!.future;
    }

    _downloadCompleter = Completer<void>();

    // ダウンロードプロセスをシミュレート
    Future.delayed(const Duration(seconds: 5)).then((_) {
      if (!_downloadCompleter!.isCompleted) {
        _downloadCompleter!.complete();
      }
    });

    return _downloadCompleter!.future;
  }

以下では処理のキャンセルを行う cancelDownload を実装しています。
先程の startDownload と同じ Completer のインスタンスである _downloadCompletercompleteError メソッドを実行し、その中に FetchCancelledException を入れることで、ユーザーがキャンセル処理を行なった際に Completer にエラーとして伝わるようになります。

Completer は complete, completeError のいずれかを実行することで処理の完了またはエラーの発生を伝えることができます。ユーザーがキャンセル処理を行わなかった場合は complete、キャンセル処理を行なった場合は completeError を実行することで場合分けを行なっています。

このエラーを後述の処理でキャッチすることで、UI側でもエラーを確認できるようにしています。

/// ダウンロードをキャンセルする
void cancelDownload() {
  if (_downloadCompleter != null && !_downloadCompleter!.isCompleted) {
    _downloadCompleter!.completeError(FetchCancelledException('ダウンロードがキャンセルされました。'));
  }
}

以下では、UIの実装であり、 downloadService として先程定義した FutureDownloadService を読み取っています。また、そのほかにもダウンロード処理が実行中かどうか、表示させるメッセージ、処理の実行時間を保持するための変数を定義しています。

class CancelableFutureScreen extends HookWidget {
  const CancelableFutureScreen({Key? key}) : super(key: key);

  
  Widget build(BuildContext context) {
    // DownloadService のインスタンスを作成
    final downloadService = useMemoized(() => FutureDownloadService());

    // その他のフック
    final isRunning = useState<bool>(false);
    final messages = useState('');
    final elapsedSeconds = useState<int>(0);

以下では startTask として startDownload を実行しています。
キャンセルされることなく実行されると messages.value が更新されます。
FetchCancelledException をキャッチすると、その内容をスナックバーで表示します。

先程実装した cancelDownload では FetchCancelledException をスローするようにしていたため、キャンセルされた段階でここで FetchCancelledException がキャッチされるようになっています。

Future<void> startTask() async {
  // 省略 ...
  try {
    // DownloadService のタスクを開始
    await downloadService.startDownload();

    // タスクが正常に完了した場合のメッセージ
    messages.value = 'ダウンロードが正常に完了しました。';
  } on FetchCancelledException catch (e) {
    // キャンセルされた場合にスナックバーを表示
    if (context.mounted) {
      ScaffoldMessenger.of(context).showSnackBar(
        SnackBar(content: Text(e.toString())),
      );
    }
  } catch (e) {
    // 省略 ...
  } finally {
    // 省略 ...
  }
}

以下では処理をキャンセルするための cancelTask を実装しています。
内容としては、downloadService.cancelDownload() を実行しているだけで、これにより FetchCancelledException がスローされ、その内容がスナックバーとして表示されるようになります。

// タスクをキャンセルする関数
void cancelTask() {
  downloadService.cancelDownload(); // DownloadService を通じてタスクをキャンセル
}

Future の処理を外部から管理するためには、 Complete を用いて以下のような手順で実装を進めれば良いことがわかります。

  1. Completer をインスタンス化
  2. 処理が問題なく成功した場合は completer.complete()
  3. 処理でエラーまたはキャンセルが発生した場合は complete.completeError()
  4. エラーがある場合はエラーハンドリング

2. Stream の処理のキャンセル

次に Stream の処理のキャンセルについて見ていきます。
コードは以下の通りです。

import 'dart:async';
import 'package:flutter/material.dart';
import 'package:flutter_hooks/flutter_hooks.dart';

/// カスタム例外クラス
class FetchCancelledException implements Exception {
  final String message;
  FetchCancelledException(this.message);

  
  String toString() => message;
}

class StreamDownloadService {
  Completer<void>? _streamCompleter;
  StreamController<String>? _controller;

  /// ストリームを開始し、データの送信またはキャンセルを待機する
  Stream<String> startStream() {
    if (_streamCompleter != null && !_streamCompleter!.isCompleted) {
      // 既にストリームが実行中の場合は既存のストリームを返す
      return _controller!.stream;
    }

    _streamCompleter = Completer<void>();
    _controller = StreamController<String>();

    Timer? timer;
    timer = Timer.periodic(const Duration(seconds: 1), (t) {
      if (_streamCompleter!.isCompleted) {
        t.cancel();
      } else {
        _controller!.add('Message at ${DateTime.now()}');
      }
    });

    // キャンセルまたは完了時の処理
    _streamCompleter!.future.then((_) {
      _controller!.addError(FetchCancelledException('ストリームがキャンセルされました。'));
      _controller!.close();
      timer?.cancel();
    }).catchError((e) {
      if (!_streamCompleter!.isCompleted) {
        _controller!.addError('Stream failed: $e');
        _controller!.close();
        timer?.cancel();
      }
    });

    // ストリームがキャンセルされたときの処理
    _controller!.onCancel = () {
      if (!_streamCompleter!.isCompleted) {
        _streamCompleter!.complete();
      }
    };

    return _controller!.stream;
  }

  /// ストリームをキャンセルする
  void cancelStream() {
    if (_streamCompleter != null && !_streamCompleter!.isCompleted) {
      _streamCompleter!.complete();
    }
  }
}

class CancelablestreamScreen extends HookWidget {
  const CancelablestreamScreen({Key? key}) : super(key: key);

  
  Widget build(BuildContext context) {
    // Completer と StreamSubscription を管理するためのフック
    final completer = useMemoized(() => Completer<void>());
    final subscription = useRef<StreamSubscription<String>?>(null);
    final messages = useState<List<String>>([]);
    final isRunning = useState<bool>(false);
    final downloadService = useMemoized(() => StreamDownloadService());

    // メッセージの取得を開始する関数
    void startTask() {
      isRunning.value = true;
      messages.value = [];

      // キャンセル可能なストリームを取得
      Stream<String> messageStream = downloadService.startStream();

      subscription.value = messageStream.listen(
        (message) {
          messages.value = [...messages.value, message];
        },
        onError: (error) {
          if (context.mounted) {
            ScaffoldMessenger.of(context).showSnackBar(
              SnackBar(
                content: Text(
                  error.toString(),
                ),
              ),
            );
          }
          isRunning.value = false;
        },
        onDone: () {
          isRunning.value = false;
        },
      );
    }

    // メッセージの取得をキャンセルする関数
    void cancelTask() {
      downloadService.cancelStream();
    }

    // クリーンアップ処理
    useEffect(() {
      return () {
        subscription.value?.cancel();
        if (!completer.isCompleted) {
          completer.complete();
        }
      };
    }, []);

    String buttonLabel = isRunning.value ? 'キャンセル' : '開始';
    VoidCallback? buttonAction = isRunning.value ? cancelTask : startTask;

    return Scaffold(
      appBar: AppBar(
        title: const Text('Cancel Stream with Completer'),
      ),
      body: Padding(
        padding: const EdgeInsets.all(16.0),
        child: Column(
          children: [
            ElevatedButton(
              onPressed: buttonAction,
              child: Text(buttonLabel),
            ),
            const SizedBox(height: 10),
            Expanded(
              child: ListView.builder(
                itemCount: messages.value.length,
                itemBuilder: (context, index) {
                  return ListTile(
                    title: Text(messages.value[index]),
                  );
                },
              ),
            ),
          ],
        ),
      ),
    );
  }
}

それぞれコードを見ていきます。

以下は先ほどと同様の実装で、ユーザーが処理をキャンセルした際にこの例外を返すようにします。

class FetchCancelledException implements Exception {
  final String message;
  FetchCancelledException(this.message);

  
  String toString() => message;
}

以下では StreamDownloadService を作成し、 CompleterStreamController の定義を行なっています。StreamController を用いて制御する点が Future と異なる点です。

class StreamDownloadService {
  Completer<void>? _streamCompleter;
  StreamController<String>? _controller;

以下ではコメントの通り、 Stream を開始しています。 _streamCompleter, _controller をそれぞれ初期化し、その後に1秒ごとで Stream にテキストを追加するようにしています。

  /// ストリームを開始し、データの送信またはキャンセルを待機する
  Stream<String> startStream() {
    if (_streamCompleter != null && !_streamCompleter!.isCompleted) {
      // 既にストリームが実行中の場合は既存のストリームを返す
      return _controller!.stream;
    }

    _streamCompleter = Completer<void>();
    _controller = StreamController<String>();

    Timer? timer;
    timer = Timer.periodic(const Duration(seconds: 1), (t) {
      if (_streamCompleter!.isCompleted) {
        t.cancel();
      } else {
        _controller!.add('Message at ${DateTime.now()}');
      }
    });

以下では処理がキャンセルまたは完了した時の処理を実装しています。
Stream がキャンセルまたは完了した場合、エラーを Stream に追加し、StreamController を閉じるようにしています。

// キャンセルまたは完了時の処理
_streamCompleter!.future.then((_) {
  _controller!.addError(FetchCancelledException('ストリームがキャンセルされました。'));
  _controller!.close();
  timer?.cancel();
}).catchError((e) {
  if (!_streamCompleter!.isCompleted) {
    _controller!.addError('Stream failed: $e');
    _controller!.close();
    timer?.cancel();
  }
});

StreamControlleronCancel が呼ばれた段階で Completer の complete を実行することで Completer を完了させて、処理を中断しています。
Future の場合は Completer で直接処理を管理していましたが、 Stream では StreamController に紐付けて管理することも可能かと思います。

// ストリームがキャンセルされたときの処理
_controller!.onCancel = () {
  if (!_streamCompleter!.isCompleted) {
    _streamCompleter!.complete();
  }
};

以下では cancelStream として、 Completer の complete を呼び出して処理を中断するようにしています。

/// ストリームをキャンセルする
void cancelStream() {
  if (_streamCompleter != null && !_streamCompleter!.isCompleted) {
    _streamCompleter!.complete();
  }
}

CancelablestreamScreen の中の startTask では先ほど実装した startStream で Stream を開始し、それをリッスンすることでデータの追加等を監視しています。
onError では StreamController にエラーが追加された際にスナックバーを表示するようにしています。

// メッセージの取得を開始する関数
void startTask() {
  isRunning.value = true;
  messages.value = [];

  // キャンセル可能なストリームを取得
  Stream<String> messageStream = downloadService.startStream();

  subscription.value = messageStream.listen(
    (message) {
      messages.value = [...messages.value, message];
    },
    onError: (error) {
      if (context.mounted) {
        ScaffoldMessenger.of(context).showSnackBar(
          SnackBar(
            content: Text(
              error.toString(),
            ),
          ),
        );
      }
      isRunning.value = false;
    },
    onDone: () {
      isRunning.value = false;
    },
  );
}

Stream の処理を外部から管理するためには、 Complete を用いて以下のような手順で実装を進めれば良いことがわかります。

  1. Completer と StreamController をインスタンス化
  2. キャンセルがない場合は completer.complete() を実行して、 StreamController も close する
  3. キャンセルがある場合は completer.complete() で処理を完了させ、 StreamControlleraddError でエラーを追加
  4. エラーがある場合は Stream を listen している箇所の onError で捕捉して対処

3. API のデータ取得のキャンセル

次に実際のユースケースを想定して、APIのデータ取得をキャンセルできるようにしていきます。
今回は Json Placeholder API から連続的にデータを取得し、それを途中で中断できるような仕組みを作りたいと思います。

実装は以下の手順で進めていきます。

  1. model の実装
  2. repository 層の実装
  3. data_source 層の実装
  4. exceptions の実装
  5. service 層の実装
  6. screen 層の実装

1. model の実装

まずは model を実装していきます。
今回は photo を取得するために Photo のモデルを作成します。
コードは以下の通りです。

これでビルドランナーを実行してモデルの作成は完了です。

import 'package:freezed_annotation/freezed_annotation.dart';

part 'photo.freezed.dart';
part 'photo.g.dart';


class Photo with _$Photo {
  const factory Photo({
    required int albumId,
    required int id,
    required String title,
    required String url,
    required String thumbnailUrl,
  }) = _Photo;

  factory Photo.fromJson(Map<String, dynamic> json) =>
      _$PhotoFromJson(json);
}

2. repository 層の実装

次に repository の実装を行います。
コードは以下の通りです。

PhotoRepository は抽象クラスとして定義していますが、複数のメソッドがない場合は PhotoRepository に直接処理を記述しても問題ないかなと思います。

photo_repository.dart
import 'package:sample_flutter/cancel/models/photo.dart';

abstract class PhotoRepository {
  Stream<Photo> getPhotoById(int id);
}

PhotoRepositoryImpl ではこれから実装する RemotePhotoDataSourcegetPhotoByIdStream を実行して、 Stream 型の Photo を返す getPhotoById メソッドを実装しています。

photo_repository_impl.dart
import 'package:sample_flutter/cancel/data_sources/remote_photo_data_source.dart';
import 'package:sample_flutter/cancel/exceptions/fetch_cancelled_exception.dart';
import 'package:sample_flutter/cancel/exceptions/network_exception.dart';
import 'package:sample_flutter/cancel/models/photo.dart';
import 'photo_repository.dart';

class PhotoRepositoryImpl implements PhotoRepository {
  final RemotePhotoDataSource remoteDataSource;

  PhotoRepositoryImpl({required this.remoteDataSource});

  
  Stream<Photo> getPhotoById(int id) async* {
    try {
      yield* remoteDataSource.getPhotoByIdStream(id);
    } catch (e) {
      if (e is FetchCancelledException) {
        rethrow;
      } else if (e is NetworkException) {
        rethrow;
      } else {
        throw Exception('Failed to fetch photo with id $id.');
      }
    }
  }
}

これで Repository の実装は完了です。

3. data_source 層の実装

次に data_source の実装を行います。
コードは以下の通りです。

fetchPhotoById メソッドでは ID に応じて Json Placeholder API を叩いて結果を受け取り、それを Photo に変換して返すようにしています。

getPhotoByIdStream では fetchPhotoById を実行して非同期で Photo のデータを取得しつつ、それぞれの処理の後に1秒間のインターバルを取るようにしています。

import 'dart:async';
import 'dart:convert';
import 'package:flutter/material.dart';
import 'package:http/http.dart' as http;
import 'package:sample_flutter/cancel/models/photo.dart';
import 'package:sample_flutter/cancel/exceptions/fetch_cancelled_exception.dart';

class RemotePhotoDataSource {
  final http.Client client;

  RemotePhotoDataSource({required this.client});

  Future<Photo> fetchPhotoById(int id) async {
    final response = await client
        .get(Uri.parse('https://jsonplaceholder.typicode.com/photos/$id'));

    if (response.statusCode == 200) {
      return Photo.fromJson(jsonDecode(response.body));
    } else {
      throw Exception('Failed to load photo');
    }
  }

  Stream<Photo> getPhotoByIdStream(int id) async* {
    try {
      final photo = await fetchPhotoById(id);
      await Future.delayed(const Duration(seconds: 1));
      yield photo;
    } catch (e) {
      if (e is FetchCancelledException) {
        debugPrint(e.toString());
        rethrow;
      } else {
        debugPrint('Error fetching photo ID $id: $e');
        rethrow;
      }
    }
  }
}

これで Json Placeholder API からデータを取得できるようになりました。
getPhotoByIdStream では Stream 型で Photo を取得しているため、この Stream の StreamController に対してエラーを投げることで外部から処理を止めることができます。

4. exceptions の実装

次に exception を定義していきます。
コードは以下の通りです。

FetchCancelledException は処理がキャンセルされた際の例外、 NetworkException はネットワークに問題があった時の例外として定義しています。

class FetchCancelledException implements Exception {
  final String message;
  FetchCancelledException([this.message = 'Fetching was canceled.']);

  
  String toString() => 'FetchCancelledException: $message';
}
class NetworkException implements Exception {
  final String message;
  NetworkException([this.message = 'Network error occurred.']);

  
  String toString() => 'NetworkException: $message';
}

5. service 層の実装

次に service の実装を進めていきます。
コードは以下の通りです。

import 'dart:async';
import 'package:flutter/material.dart';
import 'package:sample_flutter/cancel/exceptions/fetch_cancelled_exception.dart';
import 'package:sample_flutter/cancel/exceptions/network_exception.dart';
import 'package:sample_flutter/cancel/models/photo.dart';
import 'package:sample_flutter/cancel/repositories/photo_repository.dart';

class PhotoCompleterService {
  final PhotoRepository _photoRepository;

  PhotoCompleterService({required PhotoRepository photoRepository})
      : _photoRepository = photoRepository;

  Stream<Photo> fetchPhotosAsStreamWithCompleter(List<int> ids, Completer<void> completer) {
    final controller = StreamController<Photo>();
    StreamSubscription<Photo>? subscription;

    void startFetching() async {
      try {
        for (var id in ids) {
          if (completer.isCompleted) {
            controller.addError(FetchCancelledException('Fetching photos was cancelled.'));
            break;
          }

          try {
            debugPrint('Fetching photo with id: $id');
            final photoStream = _photoRepository.getPhotoById(id);
            subscription = photoStream.listen((photo) {
              if (completer.isCompleted) {
                controller.addError(FetchCancelledException('Fetching photos was cancelled.'));
                return;
              }
              debugPrint('Fetched photo: ${photo.title}');
              controller.add(photo);
            }, onError: (error) {
              if (error is FetchCancelledException || error is NetworkException) {
                controller.addError(error);
              } else {
                controller.addError(Exception('An unexpected error occurred while fetching photo with id $id.'));
              }
            });

            await subscription!.asFuture();
          } catch (e) {
            if (e is FetchCancelledException || e is NetworkException) {
              controller.addError(e);
              break;
            } else {
              controller.addError(Exception('An unexpected error occurred while fetching photo with id $id.'));
              break;
            }
          }
        }
        await controller.close();
      } catch (e) {
        controller.addError(e);
      }
    }

    startFetching();

    completer.future.then((_) {
      controller.addError(FetchCancelledException('Fetching photos was cancelled.'));
      controller.close();
      subscription?.cancel();
    });

    return controller.stream;
  }

  void cancelFetchingWithCompleter(Completer<void> completer) {
    completer.complete();
  }
}

以下では fetchPhotosAsStreamWithCompleter を定義しています。
この実装では Completer を外部から受け取り、StreamController<Photo>controller としています。

class PhotoCompleterService {
  final PhotoRepository _photoRepository;

  PhotoCompleterService({required PhotoRepository photoRepository})
      : _photoRepository = photoRepository;

  Stream<Photo> fetchPhotosAsStreamWithCompleter(List<int> ids, Completer<void> completer) {
    final controller = StreamController<Photo>();
    StreamSubscription<Photo>? subscription;

以下では startFetching として、受け取ったIDの回数だけ getPhotoById を実行し、それを photoStream として定義しています。
この photoStream をリッスンして監視し、エラーがある時には StreamController に addError でエラーを追加しています。

void startFetching() async {
  try {
    for (var id in ids) {
      if (completer.isCompleted) {
        controller.addError(FetchCancelledException('Fetching photos was cancelled.'));
        break;
      }

      try {
        debugPrint('Fetching photo with id: $id');
        final photoStream = _photoRepository.getPhotoById(id);
        subscription = photoStream.listen((photo) {
          if (completer.isCompleted) {
            controller.addError(FetchCancelledException('Fetching photos was cancelled.'));
            return;
          }
          debugPrint('Fetched photo: ${photo.title}');
          controller.add(photo);
        }, onError: (error) {
          if (error is FetchCancelledException || error is NetworkException) {
            controller.addError(error);
          } else {
            controller.addError(Exception('An unexpected error occurred while fetching photo with id $id.'));
          }
        });

以下では completer.future で Completer の結果を監視して、必要に応じてエラーの追加、 StreamController の close などを行なっています。

  completer.future.then((_) {
    controller.addError(FetchCancelledException('Fetching photos was cancelled.'));
    controller.close();
    subscription?.cancel();
  });

  return controller.stream;
}

以下では処理のキャンセルを実装しています。
complete を実行することで Completer の処理を中断することができます。

void cancelFetchingWithCompleter(Completer<void> completer) {
  completer.complete();
}

6. screen 層の実装

最後に screen の実装です。
コードは以下の通りです。

import 'dart:async';
import 'package:flutter/material.dart';
import 'package:flutter_hooks/flutter_hooks.dart';
import 'package:sample_flutter/cancel/exceptions/fetch_cancelled_exception.dart';
import 'package:sample_flutter/cancel/exceptions/network_exception.dart';
import 'package:sample_flutter/cancel/models/photo.dart';
import 'package:sample_flutter/cancel/services/photo_completer_service.dart';

class PhotoCompleterScreen extends HookWidget {
  const PhotoCompleterScreen({
    super.key,
    required this.photoService,
  });

  final PhotoCompleterService photoService;

  
  Widget build(BuildContext context) {
    final isFetching = useState<bool>(false);
    final photos = useState<List<Photo>>([]);
    final subscription = useRef<StreamSubscription<Photo>?>(null);
    final completer = useRef<Completer<void>?>(null);

    void startFetchingPhotos() {
      isFetching.value = true;
      photos.value = [];

      List<int> photoIds = List.generate(100, (index) => index + 1);
      completer.value = Completer<void>();

      Stream<Photo> photoStream = photoService.fetchPhotosAsStreamWithCompleter(
          photoIds, completer.value!);

      subscription.value = photoStream.listen(
        (photo) {
          photos.value = [...photos.value, photo];
        },
        onError: (e) {
          if (e is FetchCancelledException) {
            if (context.mounted) {
              ScaffoldMessenger.of(context).showSnackBar(
                const SnackBar(
                  content: Text('写真取得がキャンセルされました。'),
                ),
              );
            }
          } else if (e is NetworkException) {
            if (context.mounted) {
              ScaffoldMessenger.of(context).showSnackBar(
                const SnackBar(
                  content: Text('ネットワークエラーが発生しました。'),
                ),
              );
            }
          } else {
            debugPrint('An error occurred: $e');
            if (context.mounted) {
              ScaffoldMessenger.of(context).showSnackBar(
                const SnackBar(
                  content: Text('予期せぬエラーが発生しました。'),
                ),
              );
            }
          }
          isFetching.value = false;
        },
        onDone: () {
          isFetching.value = false;
        },
      );
    }

    void cancelFetching() {
      if (isFetching.value) {
        photoService.cancelFetchingWithCompleter(completer.value!);
        completer.value = null;
        isFetching.value = false;
      }
    }

    useEffect(() {
      return () {
        cancelFetching();

        subscription.value?.cancel();
        subscription.value = null;
      };
    }, []);

    return Scaffold(
      appBar: AppBar(
        title: const Text('画像取得'),
      ),
      body: Column(
        children: [
          isFetching.value
              ? ElevatedButton(
                  onPressed: cancelFetching,
                  child: const Text('キャンセル'),
                )
              : ElevatedButton(
                  onPressed: startFetchingPhotos,
                  child: const Text('写真を取得'),
                ),
          Expanded(
            child: ListView.builder(
              itemCount: photos.value.length,
              itemBuilder: (context, index) {
                final photo = photos.value[index];
                return ListTile(
                  leading: Image.network(
                    photo.thumbnailUrl,
                    height: 100,
                    width: 100,
                    loadingBuilder: (context, child, loadingProgress) {
                      if (loadingProgress == null) return child;
                      return const CircularProgressIndicator();
                    },
                    errorBuilder: (context, error, stackTrace) {
                      return const SizedBox(
                        height: 100,
                        width: 100,
                        child: Icon(
                          Icons.error,
                          color: Colors.yellow,
                        ),
                      );
                    },
                  ),
                  title: Text(photo.title),
                  contentPadding: const EdgeInsets.all(8),
                );
              },
            ),
          ),
        ],
      ),
    );
  }
}

以下ではそれぞれ必要な変数を定義しています。

  • isFetching : 処理の途中かどうかを示すフラグ
  • photos : 取得できた Photo のリスト
  • subscription : Photo の Stream の購読
  • completer : 処理の中断を外部で行えるようにする Completer
final isFetching = useState<bool>(false);
final photos = useState<List<Photo>>([]);
final subscription = useRef<StreamSubscription<Photo>?>(null);
final completer = useRef<Completer<void>?>(null);

以下では Json Placeholder API のリクエストに使うIDの生成と photoStream の定義を行なっています。
Json Placeholder API では /photos/1 のように各カテゴリの後に数字をつけてリクエストを送ることでそれぞれのデータを取得することができます。この例では photoIds として 0 から 99 までのIDを生成しています。したがって、自動的に 100個のデータが取得されるようになっています。

photoStream は先ほど定義した fetchPhotosAsStreamWithCompleter の返り値を指定しており、1秒おきに Stream で Photo のデータが流れ込むようになっています。

List<int> photoIds = List.generate(100, (index) => index + 1);
completer.value = Completer<void>();

Stream<Photo> photoStream = photoService.fetchPhotosAsStreamWithCompleter(
    photoIds, completer.value!);

以下では phtoStreamlisten して、エラーがある際には例外の型に応じてスナックバーに表示させる文言を変更しています。

subscription.value = photoStream.listen(
  (photo) {
    photos.value = [...photos.value, photo];
  },
  onError: (e) {
    if (e is FetchCancelledException) {
      if (context.mounted) {
        ScaffoldMessenger.of(context).showSnackBar(
          const SnackBar(
            content: Text('写真取得がキャンセルされました。'),
          ),
        );
      }
    } 

以下では処理がキャンセルされた際の実装を行なっています。
cancelFetchingWithCompleter を実行しており、これによって Completer の complete が実行されて処理が中断されます。

void cancelFetching() {
  if (isFetching.value) {
    photoService.cancelFetchingWithCompleter(completer.value!);
    isFetching.value = false;
  }
}

以下では処理が実行中かどうかによって実行する処理と表示するテキストを切り替えています。

isFetching.value
    ? ElevatedButton(
        onPressed: cancelFetching,
        child: const Text('キャンセル'),
      )
    : ElevatedButton(
        onPressed: startFetchingPhotos,
        child: const Text('写真を取得'),
      ),

以上です。

まとめ

最後まで読んでいただいてありがとうございました。

今回は Completer を使って処理を制御する方法についてみてきました。
冒頭で述べた通り、 Completer で外部から処理を管理することで、処理の中断などが可能になり、ユーザーの時間を奪わなくて済む場合があるため、導入場所を考えつつ実装できればとても良いなと感じました。

誤っている点やもっと良い書き方があればご指摘いただければ幸いです。

参考

https://2024.flutterkaigi.jp/session/a1ba9bfd-87c8-47b2-b5af-a50e0c64c300

https://zenn.dev/mukkun69n/articles/3b369c6844be9f

https://api.flutter.dev/flutter/dart-async/Completer-class.html

Discussion