Stream のエラーハンドリングについてのまとめ
Stream のエラーハンドリングについてまとめます。
やりがちな間違い
Stream のエラーは、通常は try - catch ブロックでは catch できません。(ただし、後述の await for
ならばできます。)
final controller = StreamController<int>();
try {
controller.stream.listen(print);
controller.addError(Exception('exception!'));
} catch (e) {
print('This will not catch an error on stream: $e');
}
onError
callback によるエラーハンドリング
Stream のエラーをキャッチする一つの方法として、listen
メソッドの、onError
callback があります。
final controller = StreamController<int>();
controller.stream.listen(print, onError: (e) {
print('Catch an error on stream: $e');
});
controller.addError(Exception('exception!'));
data 処理と error 処理
Stream のある method で Exception か Error が throw されると、それ以降の data を扱う method chain は反応しません。
この挙動は、try {} catch(e) {}
の場合と同様です。
final callbackThatCanThrow = (int i) {
if (i >= 2) {
throw Exception('exception!');
}
return i;
};
final controller = StreamController<int>();
controller.stream.map(callbackThatCanThrow).map((e) {
print('stream transforming by map: $e');
return e;
}).listen((e) {
print('A data is delivered: $e');
}, onError: (e) {
print('Catch an error on stream subscription: $e');
});
controller.add(1);
controller.add(2);
// Prints.
// stream transforming by map: 1
// A data is delivered: 1
// Catch an error on stream subscription: Exception: exception!
3 つの "lane" (通り道)
Dart 1 時代の古い資料ですが、Stream (と、Future) についての Dart 言語ライブラリデザイナーによる 良解説です。3 つの "lane" という 概念で Stream の data と error と done を区別しています。Stream の API documentation には出てこないのですが、この概念を頭に入れると、data と error と done の関係を理解しやすいと思います。
handleError
handleError
method で、Stream の途中でエラーを傍受してエラーハンドリングできます。特定の種類のエラーだけを傍受できます。必要ならばそのなかで throw
すれば、try-catch エラーハンドリングでいう rethrow
の動作になります。
final callbackThatCanThrow = (int i) {
if (i >= 2) {
throw Exception('exception!');
}
return i;
};
final controller = StreamController<int>();
controller.stream.map(callbackThatCanThrow).handleError((e) {
print('Catch and handle an error on handleError: $e');
throw e; // rethrow if necessary.
}, test: (error) {
return error is Exception;
}).map((e) {
print('stream transforming by map: $e');
return e;
}).listen((e) {
print('A data is delivered: $e');
}, onError: (e) {
print('Catch an error on stream subscription: $e');
});
controller.add(1);
controller.add(2);
controller.addError(StateError('state error!'));
// Prints.
// stream transforming by map: 1
// A data is delivered: 1
// Catch and handle an error on handleError: Exception: exception!
// Catch an error on stream subscription: Exception: exception!
// Catch an error on stream subscription: Bad state: state error!
Stream method の callback の中でエラーハンドリングし、error レーンに流さない
Stream method の callback の中で、 try-catch でエラーハンドリングします。
final callbackThatCanThrow = (int i) {
if (i >= 2) {
throw Exception('exception!');
}
return i;
};
final controller = StreamController<int>();
controller.stream.map((e) {
try {
callbackThatCanThrow(e);
} on Exception catch (e) {
print('Catch on try-catch block: $e');
} finally {
return e;
}
}).map((e) {
print('stream transforming by map: $e');
return e;
}).listen((e) {
print('A data is delivered: $e');
}, onError: (e) {
print('Catch an error on stream subscription: $e');
});
controller.add(1);
controller.add(2);
controller.addError(StateError('state error!'));
StreamTransformer でエラーハンドリングして、 data レーンに流しなおす
handleError
の documentation で説明されている通り、transform
でエラーハンドリングして data レーンになんらかの data を流し直すことができます。
final callbackThatCanThrow = (int i) {
if (i >= 2) {
throw Exception('exception!');
}
return i;
};
final controller = StreamController<int>();
controller.stream.map(callbackThatCanThrow).transform(
StreamTransformer.fromHandlers(
handleError: (error, stackTrace, sink) {
if (error is Exception) {
print('Catch and handle an error on StreamTransformer: $error');
sink.add(0);
} else {
throw error; // rethrow if it is necessary.
}
})).map((e) {
print('stream transforming by map: $e');
return e;
}).listen((e) {
print('A data is delivered: $e');
}, onError: (e) {
print('Catch an error on stream subscription: $e');
});
controller.add(1);
controller.add(2);
controller.addError(StateError('state error!'));
// Prints.
// stream transforming by map: 1
// A data is delivered: 1
// Catch and handle an error on StreamTransformer: Exception: exception!
// stream transforming by map: 0
// A data is delivered: 0
// Catch an error on stream subscription: Bad state: state error!
StreamTransformer.fromHandlers
の arguments はすべて optional named argument です。指定しなかった named argument の処理については単に、 data については sink.add(data)
, error については sink.addError(error)
, done については sink.close();
されます。その API documentation には書かれていませんが、実装はそうなっています。
cancelOnError
listen
の cancelOnError
に true を指定すると、onError
callback が実行されるとその subscription を cancel します。デフォルトは false です。
final controller = StreamController<int>();
controller.stream.listen((e) {
print('A data is delivered: $e');
}, onError: (e) {
print('Catch an error on stream subscription: $e');
}, cancelOnError: true);
print('controller.hasListener: ${controller.hasListener}');
controller.add(1);
controller.add(2);
controller.addError(StateError('state error!'));
return Future(() {
print('controller.hasListener: ${controller.hasListener}');
});
// Prints.
// controller.hasListener: true
// A data is delivered: 1
// A data is delivered: 2
// Catch an error on stream subscription: Bad state: state error!
// controller.hasListener: false
await for
のエラーハンドリング
await for
の場合は、try-catch ブロックで囲むと error を catch できます。
ただし、以下のいずれの場合も、StreamSubscription が自動的に cancel されることに注意してください。
-
await for
に最初の error が流れてきた -
await for
を try-catch で囲む囲まないに関わらず、await for
から Error または Exception が throw された
それぞれ、コードで確認してみましょう。
await for
に最初の error が流れてきた場合
以下は、await for
に最初の error が流れてきた場合です。
final controller = StreamController<int>();
final awaitFor = (Stream<int> stream) async* {
await for (final e in stream) {
yield e;
}
};
awaitFor(controller.stream).listen((e) {
print('A data is delivered: $e');
}, onError: (e) {
print('Catch an error on stream subscription: $e');
});
await Future(() {
print('controller.hasListener: ${controller.hasListener}');
});
controller.add(1);
controller.add(2);
controller.addError(StateError('state error!'));
controller.add(1);
await Future(() {
print('controller.hasListener: ${controller.hasListener}');
});
// controller.hasListener: true
// A data is delivered: 1
// A data is delivered: 2
// Catch an error on stream subscription: Bad state: state error!
// controller.hasListener: false
await for
を try-catch ブロックで囲み、await for
から Error または Exception が throw された場合
以下は、await for
を try-catch ブロックで囲み、await for
から Exception が throw された場合です。
final controller = StreamController<int>();
final awaitFor = (Stream<int> stream) async* {
try {
await for (final e in stream) {
if (e >= 2) {
throw Exception('exception!');
}
yield e;
}
} catch (e) {
print('Catch an error on try - catch block: $e');
yield 0;
}
};
awaitFor(controller.stream).listen((e) {
print('A data is delivered: $e');
}, onError: (e) {
print('Catch an error on stream subscription: $e');
});
await Future(() {
print('controller.hasListener: ${controller.hasListener}');
});
controller.add(1);
controller.add(2);
controller.add(1);
controller.addError(StateError('state error!'));
await Future(() {
print('controller.hasListener: ${controller.hasListener}');
});
// controller.hasListener: true
// A data is delivered: 1
// Catch an error on try - catch block: Exception: exception!
// A data is delivered: 0
// controller.hasListener: false
await for
を try-catch ブロックで囲まず、await for
から Error または Exception が throw された場合
以下は、await for
を try-catch ブロックで囲まず、await for
から Exception が throw された場合です。
final controller = StreamController<int>();
final awaitFor = (Stream<int> stream) async* {
await for (final e in stream) {
if (e >= 2) {
throw Exception('exception!');
}
yield e;
}
};
awaitFor(controller.stream).listen((e) {
print('A data is delivered: $e');
}, onError: (e) {
print('Catch an error on stream subscription: $e');
});
await Future(() {
print('controller.hasListener: ${controller.hasListener}');
});
controller.add(1);
controller.add(2);
controller.add(1);
controller.addError(StateError('state error!'));
await Future(() {
print('controller.hasListener: ${controller.hasListener}');
});
// controller.hasListener: true
// A data is delivered: 1
// Catch an error on stream subscription: Exception: exception!
// controller.hasListener: false
await for
の内側で throw された場合
以下は、await for
の内側で throw された場合です。
final controller = StreamController<int>();
final awaitFor = (Stream<int> stream) async* {
await for (final e in stream) {
if (e >= 2) {
throw Exception('exception!');
}
yield e;
}
};
awaitFor(controller.stream).listen((e) {
print('A data is delivered: $e');
}, onError: (e) {
print('Catch an error on stream subscription: $e');
});
await Future(() {
print('controller.hasListener: ${controller.hasListener}');
});
controller.add(1);
controller.add(2);
controller.add(1);
controller.addError(StateError('state error!'));
await Future(() {
print('controller.hasListener: ${controller.hasListener}');
});
// Prints.
// controller.hasListener: true
// A data is delivered: 1
// Catch an error on stream subscription: Exception: exception!
// controller.hasListener: false
await for
の内側でのエラーハンドリング
await for
の内側で try-catch ブロックを書けば、StreamSubscription が自動的に cancel されることはありません。
"Stream method の callback の中でエラーハンドリングし、error レーンに流さない" と同じです。
final controller = StreamController<int>();
final awaitFor = (Stream<int> stream) async* {
await for (final e in stream) {
try {
if (e >= 2) {
throw Exception('exception!');
}
yield e;
} catch (e) {
print('Catch an error on try - catch block inside "await for": $e');
yield 0;
}
}
};
awaitFor(controller.stream).listen((e) {
print('A data is delivered: $e');
}, onError: (e) {
print('Catch an error on stream subscription: $e');
});
await Future(() {
print('controller.hasListener: ${controller.hasListener}');
});
controller.add(1);
controller.add(2);
await Future(() {
print('controller.hasListener: ${controller.hasListener}');
});
controller.addError(StateError('state error!'));
await Future(() {
print('controller.hasListener: ${controller.hasListener}');
});
// Prints.
// controller.hasListener: true
// A data is delivered: 1
// Catch an error on try - catch block inside "await for": Exception: exception!
// A data is delivered: 0
// controller.hasListener: true
// Catch an error on stream subscription: Bad state: state error!
// controller.hasListener: false
まとめ
- Stream のエラーハンドリングは、通常は、try-catch でなく Stream の仕組みの中で行う。
- data レーン、error レーン、done レーンという概念が理解の助けになる。
-
async / await
の導入によって、try-catch で Stream のエラーをハンドリングできるようになったが、従来の Stream のエラーハンドリングとの挙動の違いを理解しなければならなくなった。
もし見落としていることがあれば教えて下さい。書き足していきます。
私は、Stream のハンドリングについては await for
を好みません。Dart は本質的には C スタイルの構文の命令型の言語なので、Future と Stream を async / await
によって 命令型で書けるようになったのは進歩と言えますが、Stream に関しては Stream の method を chain していくほうが挙動がわかりやすいのではと感じます。
私は、Future についても、async / await
をあまり好まなくなりましたが、これはかなりの少数派だと思います。Future の場合は async / await
がはっきりと推奨されていると言えるので、積極的に使うべきでしょう。
Discussion