Skip to content

ToDartStreamOps<O>

extension ToDartStreamOps<O> on Rill<O>

Methods

toDartStream() extension

Stream<O> toDartStream()

Available on Rill<O>, provided by the ToDartStreamOps<O> extension

Implementation
dart
Stream<O> toDartStream() {
  late StreamController<O> controller;
  IOFiber<Unit>? activeFiber;

  controller = StreamController(
    onListen: () {
      final program = evalMap(
            (o) => IO.exec(() {
              if (!controller.isClosed) controller.add(o);
            }),
          )
          .handleErrorWith((err) {
            return Rill.eval(
              IO.exec(() {
                if (!controller.isClosed) controller.addError(err);
              }),
            );
          })
          .compile
          .drain
          .guarantee(
            IO.exec(() {
              if (!controller.isClosed) controller.close();
            }),
          );

      final startIO = program.start();

      startIO.unsafeRunAsync((outcome) {
        outcome.fold(
          () {
            if (!controller.isClosed) controller.close();
          },
          (err, stackTrace) {
            if (!controller.isClosed) controller.addError(err, stackTrace);
          },
          (fiber) => activeFiber = fiber,
        );
      });
    },
    onCancel: () {
      if (activeFiber != null) {
        final completer = Completer<void>();
        activeFiber!.cancel().unsafeRunAsync((_) => completer.complete());
        return completer.future;
      }
    },
  );

  return controller.stream;
}