Skip to content

ToDartStreamOps<O>

extensionToDartStreamOps<O>onRill<O>

Interop extension for converting a Rill to a Dart Stream.

Methods

toDartStream() extension

Stream<O>toDartStream()

Converts this Rill to a single-subscription Dart Stream.

The Rill runs when the stream is listened to and is canceled when the subscription is canceled. Errors from the Rill are forwarded to the stream's error channel.

Available on Rill<O>, provided by the ToDartStreamOps<O> extension

Implementation
dart
Stream<O> toDartStream() {
  late StreamController<O> controller;
  IOFiber<Unit>? activeFiber;

  controller = StreamController(
    onListen: () {
      final program = evalMap(
            (o) => IO.exec(() {
              if (!controller.isClosed) controller.add(o);
            }),
          )
          .handleErrorWith((err) {
            return Rill.eval(
              IO.exec(() {
                if (!controller.isClosed) controller.addError(err);
              }),
            );
          })
          .compile
          .drain
          .guarantee(
            IO.exec(() {
              if (!controller.isClosed) controller.close();
            }),
          );

      final startIO = program.start();

      startIO.unsafeRunAsync((outcome) {
        outcome.fold(
          () {
            if (!controller.isClosed) controller.close();
          },
          (err, stackTrace) {
            if (!controller.isClosed) controller.addError(err, stackTrace);
          },
          (fiber) => activeFiber = fiber,
        );
      });
    },
    onCancel: () {
      if (activeFiber != null) {
        final completer = Completer<void>();
        activeFiber!.cancel().unsafeRunAsync((_) => completer.complete());
        return completer.future;
      }
    },
  );

  return controller.stream;
}