🌊

Stream のエラーハンドリングについてのまとめ

2020/09/21に公開

Stream のエラーハンドリングについてまとめます。

やりがちな間違い

Stream のエラーは、通常は try - catch ブロックでは catch できません。(ただし、後述の await for ならばできます。)

final controller = StreamController<int>();

try {
  controller.stream.listen(print);
  controller.addError(Exception('exception!'));
} catch (e) {
  print('This will not catch an error on stream: $e');
}

onError callback によるエラーハンドリング

Stream のエラーをキャッチする一つの方法として、listen メソッドの、onError callback があります。

final controller = StreamController<int>();

controller.stream.listen(print, onError: (e) {
  print('Catch an error on stream: $e');
});

controller.addError(Exception('exception!'));

data 処理と error 処理

Stream のある method で Exception か Error が throw されると、それ以降の data を扱う method chain は反応しません。
この挙動は、try {} catch(e) {} の場合と同様です。

final callbackThatCanThrow = (int i) {
  if (i >= 2) {
    throw Exception('exception!');
  }
  return i;
};

final controller = StreamController<int>();

controller.stream.map(callbackThatCanThrow).map((e) {
  print('stream transforming by map: $e');
  return e;
}).listen((e) {
  print('A data is delivered: $e');
}, onError: (e) {
  print('Catch an error on stream subscription: $e');
});

controller.add(1);
controller.add(2);

// Prints.
// stream transforming by map: 1
// A data is delivered: 1
// Catch an error on stream subscription: Exception: exception!

3 つの "lane" (通り道)

Dart 1 時代の古い資料ですが、Stream (と、Future) についての Dart 言語ライブラリデザイナーによる 良解説です。3 つの "lane" という 概念で Stream の data と error と done を区別しています。Stream の API documentation には出てこないのですが、この概念を頭に入れると、data と error と done の関係を理解しやすいと思います。

https://www.yumpu.com/en/document/read/51523123/asynchronous-programming-in-dart-streams-are-the-future

handleError

handleError method で、Stream の途中でエラーを傍受してエラーハンドリングできます。特定の種類のエラーだけを傍受できます。必要ならばそのなかで throw すれば、try-catch エラーハンドリングでいう rethrow の動作になります。

final callbackThatCanThrow = (int i) {
  if (i >= 2) {
    throw Exception('exception!');
  }
  return i;
};

final controller = StreamController<int>();
controller.stream.map(callbackThatCanThrow).handleError((e) {
  print('Catch and handle an error on handleError: $e');
  throw e; // rethrow if necessary.
}, test: (error) {
  return error is Exception;
}).map((e) {
  print('stream transforming by map: $e');
  return e;
}).listen((e) {
  print('A data is delivered: $e');
}, onError: (e) {
  print('Catch an error on stream subscription: $e');
});
controller.add(1);
controller.add(2);
controller.addError(StateError('state error!'));

// Prints.
// stream transforming by map: 1
// A data is delivered: 1
// Catch and handle an error on handleError: Exception: exception!
// Catch an error on stream subscription: Exception: exception!
// Catch an error on stream subscription: Bad state: state error!

Stream method の callback の中でエラーハンドリングし、error レーンに流さない

Stream method の callback の中で、 try-catch でエラーハンドリングします。

final callbackThatCanThrow = (int i) {
  if (i >= 2) {
    throw Exception('exception!');
  }
  return i;
};

final controller = StreamController<int>();
controller.stream.map((e) {
  try {
    callbackThatCanThrow(e);
  } on Exception catch (e) {
    print('Catch on try-catch block: $e');
  } finally {
    return e;
  }
}).map((e) {
  print('stream transforming by map: $e');
  return e;
}).listen((e) {
  print('A data is delivered: $e');
}, onError: (e) {
  print('Catch an error on stream subscription: $e');
});
controller.add(1);
controller.add(2);
controller.addError(StateError('state error!'));

StreamTransformer でエラーハンドリングして、 data レーンに流しなおす

handleError の documentation で説明されている通り、transform でエラーハンドリングして data レーンになんらかの data を流し直すことができます。

final callbackThatCanThrow = (int i) {
  if (i >= 2) {
    throw Exception('exception!');
  }
  return i;
};

final controller = StreamController<int>();
controller.stream.map(callbackThatCanThrow).transform(
    StreamTransformer.fromHandlers(
        handleError: (error, stackTrace, sink) {
  if (error is Exception) {
    print('Catch and handle an error on StreamTransformer: $error');
    sink.add(0);
  } else {
    throw error; // rethrow if it is necessary.
  }
})).map((e) {
  print('stream transforming by map: $e');
  return e;
}).listen((e) {
  print('A data is delivered: $e');
}, onError: (e) {
  print('Catch an error on stream subscription: $e');
});
controller.add(1);
controller.add(2);
controller.addError(StateError('state error!'));

// Prints.
// stream transforming by map: 1
// A data is delivered: 1
// Catch and handle an error on StreamTransformer: Exception: exception!
// stream transforming by map: 0
// A data is delivered: 0
// Catch an error on stream subscription: Bad state: state error!

StreamTransformer.fromHandlers の arguments はすべて optional named argument です。指定しなかった named argument の処理については単に、 data については sink.add(data), error については sink.addError(error), done については sink.close();されます。その API documentation には書かれていませんが、実装はそうなっています。

cancelOnError

listencancelOnError に true を指定すると、onError callback が実行されるとその subscription を cancel します。デフォルトは false です。

final controller = StreamController<int>();
controller.stream.listen((e) {
  print('A data is delivered: $e');
}, onError: (e) {
  print('Catch an error on stream subscription: $e');
}, cancelOnError: true);
print('controller.hasListener: ${controller.hasListener}');
controller.add(1);
controller.add(2);
controller.addError(StateError('state error!'));
return Future(() {
  print('controller.hasListener: ${controller.hasListener}');
});

// Prints.
// controller.hasListener: true
// A data is delivered: 1
// A data is delivered: 2
// Catch an error on stream subscription: Bad state: state error!
// controller.hasListener: false

await for のエラーハンドリング

await for の場合は、try-catch ブロックで囲むと error を catch できます。

ただし、以下のいずれの場合も、StreamSubscription が自動的に cancel されることに注意してください。

  • await forに最初の error が流れてきた
  • await for を try-catch で囲む囲まないに関わらず、await forから Error または Exception が throw された

それぞれ、コードで確認してみましょう。

await forに最初の error が流れてきた場合

以下は、await forに最初の error が流れてきた場合です。

final controller = StreamController<int>();
final awaitFor = (Stream<int> stream) async* {
  await for (final e in stream) {
    yield e;
  }
};

awaitFor(controller.stream).listen((e) {
  print('A data is delivered: $e');
}, onError: (e) {
  print('Catch an error on stream subscription: $e');
});

await Future(() {
  print('controller.hasListener: ${controller.hasListener}');
});
controller.add(1);
controller.add(2);
controller.addError(StateError('state error!'));
controller.add(1);
await Future(() {
  print('controller.hasListener: ${controller.hasListener}');
});

// controller.hasListener: true
// A data is delivered: 1
// A data is delivered: 2
// Catch an error on stream subscription: Bad state: state error!
// controller.hasListener: false

await forを try-catch ブロックで囲み、await forから Error または Exception が throw された場合

以下は、await forを try-catch ブロックで囲み、await forから Exception が throw された場合です。

final controller = StreamController<int>();
final awaitFor = (Stream<int> stream) async* {
  try {
    await for (final e in stream) {
      if (e >= 2) {
        throw Exception('exception!');
      }

      yield e;
    }
  } catch (e) {
    print('Catch an error on try - catch block: $e');
    yield 0;
  }
};

awaitFor(controller.stream).listen((e) {
  print('A data is delivered: $e');
}, onError: (e) {
  print('Catch an error on stream subscription: $e');
});

await Future(() {
  print('controller.hasListener: ${controller.hasListener}');
});
controller.add(1);
controller.add(2);
controller.add(1);
controller.addError(StateError('state error!'));
await Future(() {
  print('controller.hasListener: ${controller.hasListener}');
});

// controller.hasListener: true
// A data is delivered: 1
// Catch an error on try - catch block: Exception: exception!
// A data is delivered: 0
// controller.hasListener: false

await forを try-catch ブロックで囲まず、await forから Error または Exception が throw された場合

以下は、await forを try-catch ブロックで囲まず、await forから Exception が throw された場合です。

final controller = StreamController<int>();
final awaitFor = (Stream<int> stream) async* {
  await for (final e in stream) {
    if (e >= 2) {
      throw Exception('exception!');
    }

    yield e;
  }
};

awaitFor(controller.stream).listen((e) {
  print('A data is delivered: $e');
}, onError: (e) {
  print('Catch an error on stream subscription: $e');
});

await Future(() {
  print('controller.hasListener: ${controller.hasListener}');
});
controller.add(1);
controller.add(2);
controller.add(1);
controller.addError(StateError('state error!'));
await Future(() {
  print('controller.hasListener: ${controller.hasListener}');
});

// controller.hasListener: true
// A data is delivered: 1
// Catch an error on stream subscription: Exception: exception!
// controller.hasListener: false

await forの内側で throw された場合

以下は、await forの内側で throw された場合です。

final controller = StreamController<int>();
final awaitFor = (Stream<int> stream) async* {
  await for (final e in stream) {
    if (e >= 2) {
      throw Exception('exception!');
    }

    yield e;
  }
};

awaitFor(controller.stream).listen((e) {
  print('A data is delivered: $e');
}, onError: (e) {
  print('Catch an error on stream subscription: $e');
});

await Future(() {
  print('controller.hasListener: ${controller.hasListener}');
});
controller.add(1);
controller.add(2);
controller.add(1);
controller.addError(StateError('state error!'));
await Future(() {
  print('controller.hasListener: ${controller.hasListener}');
});

// Prints.
// controller.hasListener: true
// A data is delivered: 1
// Catch an error on stream subscription: Exception: exception!
// controller.hasListener: false

await for の内側でのエラーハンドリング

await forの内側で try-catch ブロックを書けば、StreamSubscription が自動的に cancel されることはありません。

"Stream method の callback の中でエラーハンドリングし、error レーンに流さない" と同じです。

final controller = StreamController<int>();
final awaitFor = (Stream<int> stream) async* {
  await for (final e in stream) {
    try {
      if (e >= 2) {
        throw Exception('exception!');
      }

      yield e;
    } catch (e) {
      print('Catch an error on try - catch block inside "await for": $e');
      yield 0;
    }
  }
};

awaitFor(controller.stream).listen((e) {
  print('A data is delivered: $e');
}, onError: (e) {
  print('Catch an error on stream subscription: $e');
});

await Future(() {
  print('controller.hasListener: ${controller.hasListener}');
});
controller.add(1);
controller.add(2);
await Future(() {
  print('controller.hasListener: ${controller.hasListener}');
});
controller.addError(StateError('state error!'));
await Future(() {
  print('controller.hasListener: ${controller.hasListener}');
});

// Prints.
// controller.hasListener: true
// A data is delivered: 1
// Catch an error on try - catch block inside "await for": Exception: exception!
// A data is delivered: 0
// controller.hasListener: true
// Catch an error on stream subscription: Bad state: state error!
// controller.hasListener: false

まとめ

  • Stream のエラーハンドリングは、通常は、try-catch でなく Stream の仕組みの中で行う。
  • data レーン、error レーン、done レーンという概念が理解の助けになる。
  • async / await の導入によって、try-catch で Stream のエラーをハンドリングできるようになったが、従来の Stream のエラーハンドリングとの挙動の違いを理解しなければならなくなった。

もし見落としていることがあれば教えて下さい。書き足していきます。

私は、Stream のハンドリングについては await for を好みません。Dart は本質的には C スタイルの構文の命令型の言語なので、Future と Stream を async / await によって 命令型で書けるようになったのは進歩と言えますが、Stream に関しては Stream の method を chain していくほうが挙動がわかりやすいのではと感じます。

私は、Future についても、async / await をあまり好まなくなりましたが、これはかなりの少数派だと思います。Future の場合は async / await がはっきりと推奨されていると言えるので、積極的に使うべきでしょう。

Discussion