Skip to content

ParJoinOps<O>

extension ParJoinOps<O> on Rill<Rill<O>>

Methods

parJoin() extension

Rill<O> parJoin(int maxOpen)

Runs maxOpen inner streams concurrently.

  • Waits for all inner streams to finish.
  • If any stream fails, the error is propagated and all other streams are cancelled.
  • If the output stream is interrupted, all running streams are cancelled.

Available on Rill<O>, provided by the ParJoinOps<O> extension

Implementation
dart
Rill<O> parJoin(int maxOpen) {
  assert(maxOpen > 0, 'maxOpen must be greater than 0, was: $maxOpen');

  if (maxOpen == 1) {
    return flatten();
  } else {
    final rillF = (
      SignallingRef.of(none<Object>()),
      Semaphore.permits(maxOpen),
      SignallingRef.of(1),
      Channel.unbounded<Unit>(),
      Channel.unbounded<Chunk<O>>(),
    ).mapN((done, available, running, outcomes, output) {
      IO<Unit> stop(Option<Object> rslt) {
        return done.update((current) {
          return current.fold(
            () => rslt,
            (prev) => rslt.fold(
              () => current, &#47;&#47; If we're trying to set None, keep existing result (if any)
              (err) => Some(
                current.fold(
                  () => Some(err),
                  (prevErr) => Some(CompositeFailure.of(prevErr, err)),
                ),
              ),
            ),
          );
        });
      }

      final incrementRunning = running.update((n) => n + 1);
      final decrementRunning = running
          .updateAndGet((n) => n - 1)
          .flatMap((now) => now == 0 ? outcomes.close().voided() : IO.unit);

      IO<Unit> onOutcome(
        Outcome<Unit> oc,
        Either<Object, Unit> cancelResult,
      ) {
        return oc.fold(
          () => cancelResult.fold((err) => stop(Some(err)), (_) => IO.unit),
          (err, st) => CompositeFailure.fromResults(
            Left(err),
            cancelResult,
          ).fold((err) => stop(Some(err)), (_) => IO.unit),
          (fu) => cancelResult.fold(
            (err) => stop(Some(err)),
            (_) => outcomes.send(fu).voided(),
          ),
        );
      }

      IO<Unit> runInner(Rill<O> inner, Scope outerScope) {
        return IO.uncancelable((_) {
          return outerScope.lease().flatMap((lease) {
            return available.acquire().productR(incrementRunning).flatMap((_) {
              return inner
                  .chunks()
                  .evalMap((chunk) => output.send(chunk).voided())
                  .interruptWhenSignaled(done.map((x) => x.nonEmpty))
                  .compile
                  .drain
                  .guaranteeCase((oc) {
                    return lease.cancel
                        .flatMap((cancelResult) => onOutcome(oc, cancelResult))
                        .guarantee(
                          available.release().productR(decrementRunning),
                        );
                  })
                  .voidError()
                  .start()
                  .voided();
            });
          });
        });
      }

      IO<Unit> runOuter() {
        return IO.uncancelable((_) {
          return flatMap(
                (inner) =>
                    Pull.getScope
                        .flatMap((outerScope) => Pull.eval(runInner(inner, outerScope)))
                        .rillNoScope,
              )
              .drain()
              .interruptWhenSignaled(done.map((x) => x.nonEmpty))
              .compile
              .drain
              .guaranteeCase(
                (oc) => onOutcome(oc, Right(Unit())).productR(decrementRunning),
              )
              .voidError();
        });
      }

      IO<Unit> outcomeJoiner() {
        return outcomes.rill.compile.drain.guaranteeCase((oc) {
          return oc.fold(
            () => stop(none()).productR(output.close().voided()),
            (err, st) => stop(Some(err)).productR(output.close().voided()),
            (fu) => stop(none()).productR(output.close().voided()),
          );
        }).voidError();
      }

      IO<Unit> signalResult(IOFiber<Unit> fiber) {
        return done.value().flatMap((opt) {
          return opt.fold(
            () => fiber.join().voided(),
            (err) => err is _Interruption ? fiber.join().voided() : IO.raiseError(err),
          );
        });
      }

      return Rill.bracket(
        runOuter().start().productR(outcomeJoiner().start()),
        (fiber) {
          return done
              .update((c) => c.fold(() => const Some(_Interruption()), (prev) => Some(prev)))
              .productR(
                &#47;&#47; in case of short-circuiting, the `fiberJoiner` would not have had a chance
                &#47;&#47; to wait until all fibers have been joined, so we need to do it manually
                &#47;&#47; by waiting on the counter
                running.waitUntil((n) => n == 0).productR(signalResult(fiber)),
              );
        },
      ).flatMap((_) => output.rill.flatMap((o) => Rill.chunk(o)));
    });

    return Rill.eval(rillF).flatten();
  }
}

parJoinUnbounded() extension

Rill<O> parJoinUnbounded()

Available on Rill<O>, provided by the ParJoinOps<O> extension

Implementation
dart
Rill<O> parJoinUnbounded() => parJoin(Integer.maxValue);