ToDartStreamOps<O>
extensionToDartStreamOps<O>onRill<O>
Interop extension for converting a Rill to a Dart Stream.
Methods
toDartStream() extension
Stream<O>toDartStream()
Converts this Rill to a single-subscription Dart Stream.
The Rill runs when the stream is listened to and is canceled when the subscription is canceled. Errors from the Rill are forwarded to the stream's error channel.
Available on Rill<O>, provided by the ToDartStreamOps<O> extension
Implementation
dart
Stream<O> toDartStream() {
late StreamController<O> controller;
IOFiber<Unit>? activeFiber;
controller = StreamController(
onListen: () {
final program = evalMap(
(o) => IO.exec(() {
if (!controller.isClosed) controller.add(o);
}),
)
.handleErrorWith((err) {
return Rill.eval(
IO.exec(() {
if (!controller.isClosed) controller.addError(err);
}),
);
})
.compile
.drain
.guarantee(
IO.exec(() {
if (!controller.isClosed) controller.close();
}),
);
final startIO = program.start();
startIO.unsafeRunAsync((outcome) {
outcome.fold(
() {
if (!controller.isClosed) controller.close();
},
(err, stackTrace) {
if (!controller.isClosed) controller.addError(err, stackTrace);
},
(fiber) => activeFiber = fiber,
);
});
},
onCancel: () {
if (activeFiber != null) {
final completer = Completer<void>();
activeFiber!.cancel().unsafeRunAsync((_) => completer.complete());
return completer.future;
}
},
);
return controller.stream;
}