Skip to content

Rill<O>

classRill<O>

A lazy, effectful, chunked stream of values of type O.

Every Rill is backed by a Pull<O, Unit>. Operations are lazy — nothing executes until a compile terminal (e.g. compile.drain) is called. Elements are delivered in Chunks for efficiency; most operators are chunk-transparent.

Creating a Rill

dart
Rill.emit(42)                  // single element
Rill.emits([1, 2, 3])          // multiple elements
Rill.eval(someIO)              // effectful single element
Rill.repeatEval(IO.now)        // infinite effectful stream
Channel.bounded<int>(16).map((ch) => ch.rill)  // from a Channel

Running a Rill

dart
await rill.compile.drain.run();
await rill.compile.toIList.run();

Properties

compile no setter

RillCompile<O>getcompile

Access the compile API to run this stream and aggregate its output.

Implementation
dart
RillCompile<O> get compile => RillCompile(underlying);

distinct no setter

Rill<O>getdistinct

WARN: For long streams and/or large elements, this can be a memory hog. Use with caution;

Implementation
dart
Rill<O> get distinct {
  return scanChunksOpt(ISet.empty<O>(), (seen) {
    return Some((chunk) {
      return (seen.concat(chunk), chunk);
    });
  });
}

dropLast no setter

Rill<O>getdropLast

Removes the last element from the stream.

Implementation
dart
Rill<O> get dropLast => dropLastIf((_) => true);

hashCode no setter inherited

intgethashCode

The hash code for this object.

A hash code is a single integer which represents the state of the object that affects operator == comparisons.

All objects have hash codes. The default hash code implemented by Object represents only the identity of the object, the same way as the default operator == implementation only considers objects equal if they are identical (see identityHashCode).

If operator == is overridden to use the object state instead, the hash code must also be changed to represent that state, otherwise the object cannot be used in hash based data structures like the default Set and Map implementations.

Hash codes must be the same for objects that are equal to each other according to operator ==. The hash code of an object should only change if the object changes in a way that affects equality. There are no further requirements for the hash codes. They need not be consistent between executions of the same program and there are no distribution guarantees.

Objects that are not equal are allowed to have the same hash code. It is even technically allowed that all instances have the same hash code, but if clashes happen too often, it may reduce the efficiency of hash-based data structures like HashSet or HashMap.

If a subclass overrides hashCode, it should override the operator == operator as well to maintain consistency.

Inherited from Object.

Implementation
dart
external int get hashCode;

head no setter

Rill<O>gethead

Emits at most the first element, then terminates.

Implementation
dart
Rill<O> get head => take(1);

last no setter

Rill<Option<O>>getlast

Emits the last element wrapped in Some, or none if the stream is empty.

Implementation
dart
Rill<Option<O>> get last => pull.last.flatMap(Pull.output1).rillNoScope;

mask no setter

Rill<O>getmask

Suppresses all errors, turning a failed stream into an empty one.

Implementation
dart
Rill<O> get mask => handleErrorWith((_) => Rill.empty());

pull no setter

ToPull<O>getpull

Access the Pull API for this Rill.

Implementation
dart
ToPull<O> get pull => ToPull(this);

runtimeType no setter inherited

TypegetruntimeType

A representation of the runtime type of the object.

Inherited from Object.

Implementation
dart
external Type get runtimeType;

scope no setter

Rill<O>getscope

Wraps this stream in a new resource Scope.

Resources acquired inside the stream are released when the scope closes.

Implementation
dart
Rill<O> get scope => Pull.scope(underlying).rillNoScope;

tail no setter

Rill<O>gettail

Drops the first element and emits the rest.

Implementation
dart
Rill<O> get tail => drop(1);

underlying final

finalPull<O,Unit>underlying
Implementation
dart
final Pull<O, Unit> underlying;

Extension Properties

rethrowError extension no setter

Rill<A>getrethrowError

Unwraps Right values and raises the first Left value as an error.

Elements before the first Left are emitted normally; everything after is discarded once the error is raised.

Available on Rill<O>, provided by the RethrowOps<A> extension

Implementation
dart
Rill<A> get rethrowError {
  return chunks().flatMap((c) {
    Option<Object> errOpt = none();
    final size = c.size;
    var i = 0;

    final bldr = <A>[];

    while (i < size && errOpt.isEmpty) {
      c[i].fold(
        (ex) => errOpt = Some(ex),
        (o) {
          bldr.add(o);
          i++;
        },
      );
    }

    final chunk = Chunk.fromList(bldr);

    return Rill.chunk(chunk).append(
      () => errOpt.fold(
        () => Rill.empty(),
        (err) => Rill.raiseError(err),
      ),
    );
  });
}

unchunks extension no setter

Rill<A>getunchunks

Flattens a stream of chunks into a stream of individual elements.

Available on Rill<O>, provided by the RillChunkOps<A> extension

Implementation
dart
Rill<A> get unchunks => underlying.flatMapOutput((c) => Pull.output(c)).rillNoScope;

unNone extension no setter

Rill<A>getunNone

Filters out None elements and unwraps all Some values.

Available on Rill<O>, provided by the RillOptionOps<A> extension

Implementation
dart
Rill<A> get unNone => collect(identity);

unNoneTerminate extension no setter

Rill<A>getunNoneTerminate

Unwraps Some values and terminates the stream at the first None.

Available on Rill<O>, provided by the RillOptionOps<A> extension

Implementation
dart
Rill<A> get unNoneTerminate {
  Pull<A, Unit> loop(Pull<Option<A>, Unit> p) {
    return p.uncons.flatMap((hdtl) {
      return hdtl.foldN(
        () => Pull.done,
        (hd, tl) => hd
            .indexWhere((opt) => opt.isEmpty)
            .fold(
              () => Pull.output(hd.map((opt) => opt.get)).append(() => loop(tl)),
              (idx) => idx == 0 ? Pull.done : Pull.output(hd.take(idx).map((opt) => opt.get)),
            ),
      );
    });
  }

  return loop(underlying).rillNoScope;
}

Methods

andWait()

Rill<O>andWait(Durationduration)

Appends a sleep_ of duration after this stream ends, then terminates.

Implementation
dart
Rill<O> andWait(Duration duration) => append(() => Rill.sleep_<O>(duration));

append()

Rill<O>append(Rill<O>Function()s2)

Runs this stream to completion, then runs s2.

Implementation
dart
Rill<O> append(Function0<Rill<O>> s2) => underlying.append(() => s2().underlying).rillNoScope;

as()

Rill<O2>as<O2>(O2o2)

Replaces every element with o2.

Implementation
dart
Rill<O2> as<O2>(O2 o2) => map((_) => o2);

attempt()

Rill<Either<Object,O>>attempt()

Wraps each element in Right and errors in Left, so errors become values and the stream never fails.

Implementation
dart
Rill<Either<Object, O>> attempt() =>
    map((o) => o.asRight<Object>()).handleErrorWith((err) => Rill.emit(Left<Object, O>(err)));

attempts()

Rill<Either<Object,O>>attempts(Rill<Duration>delays)

Like attempt but retries on failure using the delays from delays.

Implementation
dart
Rill<Either<Object, O>> attempts(Rill<Duration> delays) => attempt().append(
  () => delays.flatMap((delay) => Rill.sleep(delay).flatMap((_) => attempt())),
);

broadcastThrough()

Rill<O2>broadcastThrough<O2>(IList<Rill<O2>Function(Rill<O>)>pipes)

Fans this stream out to all pipes simultaneously and merges their outputs.

Each pipe receives every element. The stream is buffered through a Topic so all subscribers start at the same element.

Implementation
dart
Rill<O2> broadcastThrough<O2>(IList<Pipe<O, O2>> pipes) {
  final rillF = (
    Topic.create<Chunk<O>>(),
    CountDownLatch.create(pipes.size),
  ).mapN((topic, allReady) {
    final checkIn = allReady.release().productR(allReady.await());

    Rill<O2> dump(Pipe<O, O2> pipe) {
      return Rill.resource(topic.subscribeAwait(1)).flatMap((sub) {
        return Rill.exec<O2>(checkIn).append(() => pipe(sub.unchunks));
      });
    }

    final dumpAll = Rill.emits(pipes.toList()).map(dump).parJoinUnbounded();

    final pump = Rill.exec<Never>(allReady.await()).append(() => topic.publish(chunks()));

    return dumpAll.concurrently(pump);
  });

  return Rill.force(rillF);
}

buffer()

Rill<O>buffer(intn)

Re-chunks the stream so each chunk has at most n elements.

Implementation
dart
Rill<O> buffer(int n) {
  if (n <= 0) {
    return this;
  } else {
    return repeatPull((tp) {
      return tp.unconsN(n, allowFewer: true).flatMap((hdtl) {
        return hdtl.foldN(
          () => Pull.pure(none()),
          (hd, tl) => Pull.output(hd).as(Some(tl)),
        );
      });
    });
  }
}

changes()

Rill<O>changes({boolFunction(O,O)?eq})

Emits only elements that differ from the previous element (using == by default, or a custom eq).

Implementation
dart
Rill<O> changes({Function2<O, O, bool>? eq}) => filterWithPrevious(eq ?? (a, b) => a != b);

changesBy()

Rill<O>changesBy<O2>(O2Function(O)f)

Like changes but compares by a key extracted with f.

Implementation
dart
Rill<O> changesBy<O2>(Function1<O, O2> f) => filterWithPrevious((a, b) => f(a) != f(b));

changesWith()

Rill<O>changesWith(boolFunction(O,O)f)

Like changes but uses a custom two-argument predicate f to decide whether consecutive elements differ.

Implementation
dart
Rill<O> changesWith(Function2<O, O, bool> f) => filterWithPrevious((a, b) => f(a, b));

chunkAll()

Rill<Chunk<O>>chunkAll()

Collects the entire stream into a single emitted Chunk.

Implementation
dart
Rill<Chunk<O>> chunkAll() {
  Pull<Chunk<O>, Unit> loop(Rill<O> s, Chunk<O> acc) {
    return s.pull.uncons.flatMap((hdtl) {
      return hdtl.foldN(
        () => Pull.output1(acc),
        (hd, tl) => loop(tl, acc.concat(hd)),
      );
    });
  }

  return loop(this, Chunk.empty()).rillNoScope;
}

chunkLimit()

Rill<Chunk<O>>chunkLimit(intn)

Splits any chunk larger than n elements into chunks of at most n.

Implementation
dart
Rill<Chunk<O>> chunkLimit(int n) {
  Pull<Chunk<O>, Unit> breakup(Chunk<O> ch) {
    if (ch.size <= n) {
      return Pull.output1(ch);
    } else {
      final (pre, rest) = ch.splitAt(n);
      return Pull.output1(pre).append(() => breakup(rest));
    }
  }

  return underlying.unconsFlatMap(breakup).rillNoScope;
}

chunkMin()

Rill<Chunk<O>>chunkMin(intn, {boolallowFewerTotal=true})

Merges consecutive chunks until the accumulated size is at least n.

If the stream ends before n elements are available and allowFewerTotal is true, the partial chunk is emitted.

Implementation
dart
Rill<Chunk<O>> chunkMin(int n, {bool allowFewerTotal = true}) => repeatPull((p) {
  return p.unconsMin(n, allowFewerTotal: allowFewerTotal).flatMap((hdtl) {
    return hdtl.foldN(
      () => Pull.pure(none()),
      (hd, tl) => Pull.output1(hd).as(Some(tl)),
    );
  });
});

chunkN()

Rill<Chunk<O>>chunkN(intn, {boolallowFewer=true})

Groups elements into chunks of exactly n elements.

If allowFewer is true (default), the last chunk may be smaller when the stream ends.

Implementation
dart
Rill<Chunk<O>> chunkN(int n, {bool allowFewer = true}) {
  assert(n > 0, 'n must be positive');
  return repeatPull((tp) {
    return tp.unconsN(n, allowFewer: allowFewer).flatMap((hdtl) {
      return hdtl.foldN(
        () => Pull.pure(none()),
        (hd, tl) => Pull.output1(hd).as(Some(tl)),
      );
    });
  });
}

chunks()

Rill<Chunk<O>>chunks()

Re-emits this stream as a stream of its underlying Chunks.

Implementation
dart
Rill<Chunk<O>> chunks() => underlying.unconsFlatMap(Pull.output1).rillNoScope;

collect()

Rill<O2>collect<O2>(Option<O2>Function(O)f)

Applies f to each element, emitting only the Some results.

Implementation
dart
Rill<O2> collect<O2>(Function1<O, Option<O2>> f) => mapChunks((c) => c.collect(f));

collectFirst()

Rill<O2>collectFirst<O2>(Option<O2>Function(O)f)

Emits only the first Some result of f.

Implementation
dart
Rill<O2> collectFirst<O2>(Function1<O, Option<O2>> f) => collect(f).take(1);

collectWhile()

Rill<O2>collectWhile<O2>(Option<O2>Function(O)f)

Emits Some results of f until f returns none, then terminates.

Implementation
dart
Rill<O2> collectWhile<O2>(Function1<O, Option<O2>> f) =>
    map(f).takeWhile((b) => b.isDefined).unNone;

concurrently()

Rill<O>concurrently<O2>(Rill<O2>that)

Runs that concurrently as a background stream.

Elements of that are discarded; it is run only for its side effects. When this foreground stream ends, that is canceled. If that fails, the foreground stream is interrupted with the error.

Implementation
dart
Rill<O> concurrently<O2>(Rill<O2> that) {
  return _concurrentlyAux(that).flatMapN((startBack, fore) => startBack.flatMap((_) => fore));
}

cons()

Rill<O>cons(Chunk<O>c)

Prepends c to this stream.

Implementation
dart
Rill<O> cons(Chunk<O> c) => c.isEmpty ? this : Rill.chunk(c).append(() => this);

cons1()

Rill<O>cons1(Oo)

Prepends a single element o to this stream.

Implementation
dart
Rill<O> cons1(O o) => Rill.emit(o).append(() => this);

debounce()

Rill<O>debounce(Durationd)

Emits only the latest element after the stream has been quiet for d.

Intermediate elements that arrive within d of each other are dropped.

Implementation
dart
Rill<O> debounce(Duration d) {
  final rillF = Channel.bounded<O>(1).flatMap((chan) {
    return IO.ref(none<O>()).map((ref) {
      final sendLatest = ref.getAndSet(const None()).flatMap((opt) => opt.traverseIO_(chan.send));

      IO<Unit> sendItem(O o) => ref.getAndSet(Some(o)).flatMap((prev) {
        if (prev.isEmpty) {
          return IO.sleep(d).productR(sendLatest).start().voided();
        } else {
          return IO.unit;
        }
      });

      Pull<Never, Unit> go(Pull<O, Unit> pull) {
        return pull.uncons.flatMap((hdtl) {
          return hdtl.foldN(
            () => Pull.eval(sendLatest.productR(chan.close()).voided()),
            (hd, tl) => Pull.eval(sendItem(hd.last)).append(() => go(tl)),
          );
        });
      }

      final debouncedSend = go(underlying).rillNoScope;

      return chan.rill.concurrently(debouncedSend);
    });
  });

  return Rill.force(rillF);
}

delayBy()

Rill<O>delayBy(Durationduration)

Delays the start of this stream by duration.

Implementation
dart
Rill<O> delayBy(Duration duration) => Rill.sleep_<O>(duration).append(() => this);

delete()

Rill<O>delete(boolFunction(O)p)

Removes the first element for which p returns true.

Implementation
dart
Rill<O> delete(Function1<O, bool> p) =>
    pull
        .takeWhile((o) => !p(o))
        .flatMap((r) => r.fold(() => Pull.done, (s) => s.drop(1).pull.echo))
        .rillNoScope;

drain()

Rill<Never>drain()

Consumes all elements for their side effects and emits nothing.

Implementation
dart
Rill<Never> drain() => underlying.unconsFlatMap((_) => Pull.done).rillNoScope;

drop()

Rill<O>drop(intn)

Skips the first n elements and emits the rest.

Implementation
dart
Rill<O> drop(int n) =>
    pull
        .drop(n)
        .flatMap((opt) => opt.fold(() => Pull.done, (rest) => rest.pull.echo))
        .rillNoScope;

dropLastIf()

Rill<O>dropLastIf(boolFunction(O)p)

Removes the last element if p returns true for it; otherwise the stream is unchanged.

Implementation
dart
Rill<O> dropLastIf(Function1<O, bool> p) {
  Pull<O, Unit> go(Chunk<O> last, Rill<O> s) {
    return s.pull.uncons.flatMap((hdtl) {
      return hdtl.foldN(
        () {
          final o = last[last.size - 1];

          if (p(o)) {
            final (prefix, _) = last.splitAt(last.size - 1);
            return Pull.output(prefix);
          } else {
            return Pull.output(last);
          }
        },
        (hd, tl) => Pull.output(last).append(() => go(hd, tl)),
      );
    });
  }

  return pull.uncons.flatMap((hdtl) {
    return hdtl.foldN(
      () => Pull.done,
      (hd, tl) => go(hd, tl),
    );
  }).rillNoScope;
}

dropRight()

Rill<O>dropRight(intn)

Removes the last n elements from the stream, buffering as needed.

Implementation
dart
Rill<O> dropRight(int n) {
  if (n <= 0) {
    return this;
  } else {
    Pull<O, Unit> go(Chunk<O> acc, Rill<O> s) {
      return s.pull.uncons.flatMap((hdtl) {
        return hdtl.foldN(
          () => Pull.done,
          (hd, tl) {
            final all = acc.concat(hd);
            return Pull.output(all.dropRight(n)).append(() => go(all.takeRight(n), tl));
          },
        );
      });
    }

    return go(Chunk.empty(), this).rillNoScope;
  }
}

dropThrough()

Rill<O>dropThrough(boolFunction(O)p)

Drops elements while p is true and also drops the first element for which p is false.

Implementation
dart
Rill<O> dropThrough(Function1<O, bool> p) =>
    pull
        .dropThrough(p)
        .flatMap((tl) => tl.map((tl) => tl.pull.echo).getOrElse(() => Pull.done))
        .rillNoScope;

dropWhile()

Rill<O>dropWhile(boolFunction(O)p)

Drops elements while p is true, then emits the rest.

Implementation
dart
Rill<O> dropWhile(Function1<O, bool> p) =>
    pull
        .dropWhile(p)
        .flatMap((tl) => tl.map((tl) => tl.pull.echo).getOrElse(() => Pull.done))
        .rillNoScope;

either()

Rill<Either<O,O2>>either<O2>(Rill<O2>that)

Merges this stream and that concurrently, wrapping elements in Left and Right respectively.

Implementation
dart
Rill<Either<O, O2>> either<O2>(Rill<O2> that) =>
    map((o) => o.asLeft<O2>()).merge(that.map((o2) => o2.asRight<O>()));

evalFilter()

Rill<O>evalFilter(IO<bool>Function(O)p)

Keeps only elements for which the effectful predicate p returns true.

Implementation
dart
Rill<O> evalFilter(Function1<O, IO<bool>> p) =>
    underlying
        .flatMapOutput(
          (o) => Pull.eval(p(o)).flatMap((pass) => pass ? Pull.output1(o) : Pull.done),
        )
        .rillNoScope;

evalFilterNot()

Rill<O>evalFilterNot(IO<bool>Function(O)p)

Removes elements for which the effectful predicate p returns true.

Implementation
dart
Rill<O> evalFilterNot(Function1<O, IO<bool>> p) =>
    flatMap((o) => Rill.eval(p(o)).ifM(() => Rill.empty(), () => Rill.emit(o)));

evalFold()

Rill<O2>evalFold<O2>(O2z,IO<O2>Function(O2,O)f)

Effectful fold: runs f for each element and emits the final accumulator.

Implementation
dart
Rill<O2> evalFold<O2>(O2 z, Function2<O2, O, IO<O2>> f) {
  Pull<O2, Unit> go(O2 z, Rill<O> r) {
    return r.pull.uncons1.flatMap((hdtl) {
      return hdtl.foldN(
        () => Pull.output1(z),
        (hd, tl) => Pull.eval(f(z, hd)).flatMap((ns) => go(ns, tl)),
      );
    });
  }

  return go(z, this).rillNoScope;
}

evalMap()

Rill<O2>evalMap<O2>(IO<O2>Function(O)f)

Transforms each element by evaluating the effectful function f.

Implementation
dart
Rill<O2> evalMap<O2>(Function1<O, IO<O2>> f) =>
    underlying.flatMapOutput((o) => Pull.eval(f(o)).flatMap(Pull.output1)).rillNoScope;

evalMapFilter()

Rill<O>evalMapFilter(IO<Option<O>>Function(O)f)

Effectful filter-map: evaluates f and emits Some results, dropping none.

Implementation
dart
Rill<O> evalMapFilter(Function1<O, IO<Option<O>>> f) =>
    underlying
        .flatMapOutput((o) => Pull.eval(f(o)).flatMap((opt) => Pull.outputOption1(opt)))
        .rillNoScope;

evalScan()

Rill<O2>evalScan<O2>(O2z,IO<O2>Function(O2,O)f)

Like scan but the accumulation function f returns an IO.

Implementation
dart
Rill<O2> evalScan<O2>(O2 z, Function2<O2, O, IO<O2>> f) {
  Pull<O2, Unit> go(O2 z, Rill<O> s) {
    return s.pull.uncons1.flatMap((hdtl) {
      return hdtl.foldN(
        () => Pull.done,
        (hd, tl) => Pull.eval(f(z, hd)).flatMap((o) => Pull.output1(o).append(() => go(o, tl))),
      );
    });
  }

  return Pull.output1(z).append(() => go(z, this)).rillNoScope;
}

evalTap()

Rill<O>evalTap<O2>(IO<O2>Function(O)f)

Evaluates f for its side effect on each element, re-emitting the original element unchanged.

Implementation
dart
Rill<O> evalTap<O2>(Function1<O, IO<O2>> f) =>
    underlying.flatMapOutput((o) => Pull.eval(f(o)).flatMap((_) => Pull.output1(o))).rillNoScope;

exists()

Rill<bool>exists(boolFunction(O)p)

Emits true if any element satisfies p, otherwise emits false.

Implementation
dart
Rill<bool> exists(Function1<O, bool> p) =>
    pull.forall((o) => !p(o)).flatMap((r) => Pull.output1(!r)).rillNoScope;

filter()

Rill<O>filter(boolFunction(O)p)

Emits only elements for which p returns true.

Implementation
dart
Rill<O> filter(Function1<O, bool> p) => mapChunks((c) => c.filter(p));

filterNot()

Rill<O>filterNot(boolFunction(O)p)

Emits only elements for which p returns false.

Implementation
dart
Rill<O> filterNot(Function1<O, bool> p) => mapChunks((c) => c.filterNot(p));

filterWithPrevious()

Rill<O>filterWithPrevious(boolFunction(O,O)p)

Emits an element only if p returns true for the previous and current element. The very first element is always emitted.

Implementation
dart
Rill<O> filterWithPrevious(Function2<O, O, bool> p) {
  Pull<O, Unit> go(O last, Rill<O> s) {
    return s.pull.uncons.flatMap((hdtl) {
      return hdtl.foldN(
        () => Pull.done,
        (hd, tl) {
          &#47;&#47; can it be emitted unmodified?
          final (allPass, newLast) = hd.foldLeft((
            true,
            last,
          ), (acc, a) => (acc.$1 && p(acc.$2, a), a));

          if (allPass) {
            return Pull.output(hd).append(() => go(newLast, tl));
          } else {
            final (acc, newLast) = hd.foldLeft((IVector.empty<O>(), last), (acc, a) {
              if (p(acc.$2, a)) {
                return (acc.$1.appended(a), a);
              } else {
                return (acc.$1, acc.$2);
              }
            });

            return Pull.output(Chunk.from(acc)).append(() => go(newLast, tl));
          }
        },
      );
    });
  }

  return pull.uncons1.flatMap((hdtl) {
    return hdtl.foldN(
      () => Pull.done,
      (hd, tl) => Pull.output1(hd).append(() => go(hd, tl)),
    );
  }).rillNoScope;
}

flatMap()

Rill<O2>flatMap<O2>(Rill<O2>Function(O)f)

Maps each element to a stream via f and concatenates the results.

Implementation
dart
Rill<O2> flatMap<O2>(Function1<O, Rill<O2>> f) =>
    underlying.flatMapOutput((o) => f(o).underlying).rillNoScope;

fold()

Rill<O2>fold<O2>(O2z,O2Function(O2,O)f)

Folds the stream into a single value, emitting it when the stream ends.

Implementation
dart
Rill<O2> fold<O2>(O2 z, Function2<O2, O, O2> f) =>
    pull.fold(z, f).flatMap(Pull.output1).rillNoScope;

fold1()

Rill<O>fold1(OFunction(O,O)f)

Like fold but uses the first element as the initial accumulator.

Implementation
dart
Rill<O> fold1(Function2<O, O, O> f) =>
    pull.fold1(f).flatMap((opt) => opt.map(Pull.output1).getOrElse(() => Pull.done)).rillNoScope;

forall()

Rill<bool>forall(boolFunction(O)p)

Emits true if all elements satisfy p, otherwise emits false.

Implementation
dart
Rill<bool> forall(Function1<O, bool> p) =>
    pull.forall(p).flatMap((res) => Pull.output1(res)).rillNoScope;

foreach()

Rill<Never>foreach(IO<Unit>Function(O)f)

Evaluates f for each element as a side effect and emits nothing.

Implementation
dart
Rill<Never> foreach(Function1<O, IO<Unit>> f) =>
    underlying.flatMapOutput((o) => Pull.eval(f(o))).rillNoScope;

groupAdjacentBy()

Rill<Record>groupAdjacentBy<O2>(O2Function(O)f)

Groups consecutive elements that share the same key (returned by f) into (key, chunk) pairs, emitting one pair per run of equal keys.

Implementation
dart
Rill<(O2, Chunk<O>)> groupAdjacentBy<O2>(Function1<O, O2> f) =>
    groupAdjacentByLimit(Integer.maxValue, f);

groupAdjacentByLimit()

Rill<Record>groupAdjacentByLimit<O2>(intlimit,O2Function(O)f)

Like groupAdjacentBy but caps each group at limit elements.

Implementation
dart
Rill<(O2, Chunk<O>)> groupAdjacentByLimit<O2>(int limit, Function1<O, O2> f) {
  Pull<(O2, Chunk<O>), Unit> go(Option<(O2, Chunk<O>)> current, Rill<O> s) {
    Pull<(O2, Chunk<O>), Unit> doChunk(
      Chunk<O> chunk,
      Rill<O> s,
      O2 k1,
      Chunk<O> out,
      IQueue<(O2, Chunk<O>)> acc,
    ) {
      final differsAt = chunk.indexWhere((v) => f(v) != k1).getOrElse(() => -1);

      if (differsAt == -1) {
        &#47;&#47; whole chunk matches the current key, add this chunk to the accumulated output
        if (out.size + chunk.size < limit) {
          final newCurrent = Some((k1, out.concat(chunk)));
          return Pull.output(Chunk.from(acc)).append(() => go(newCurrent, s));
        } else {
          final (prefix, suffix) = chunk.splitAt(limit - out.size);
          return Pull.output(
            Chunk.from(acc.appended((k1, out.concat(prefix)))),
          ).append(() => go(Some((k1, suffix)), s));
        }
      } else {
        &#47;&#47; at least part of this chunk does not match the current key, need to group and retain chunkiness
        &#47;&#47; split the chunk into the bit where the keys match and the bit where they don't
        final matching = chunk.take(differsAt);

        late IQueue<(O2, Chunk<O>)> newAcc;

        final newOutSize = out.size + matching.size;

        if (newOutSize == 0) {
          newAcc = acc;
        } else if (newOutSize > limit) {
          final (prefix, suffix) = matching.splitAt(limit - out.size);
          newAcc = acc.appended((k1, out.concat(prefix))).appended((k1, suffix));
        } else {
          newAcc = acc.appended((k1, out.concat(matching)));
        }

        final nonMatching = chunk.drop(differsAt);

        &#47;&#47; nonMatching is guaranteed to be non-empty here, because we know the last element of the chunk doesn't have
        &#47;&#47; the same key as the first
        final k2 = f(nonMatching[0]);

        return doChunk(nonMatching, s, k2, Chunk.empty(), newAcc);
      }
    }

    return s.pull.unconsLimit(limit).flatMap((hdtl) {
      return hdtl.foldN(
        () => current
            .mapN(
              (k1, out) => out.size == 0 ? Pull.done : Pull.output1((k1, out)),
            )
            .getOrElse(() => Pull.done),
        (hd, tl) {
          final (k1, out) = current.getOrElse(() => (f(hd[0]), Chunk.empty()));
          return doChunk(hd, tl, k1, out, IQueue.empty());
        },
      );
    });
  }

  return go(none(), this).rillNoScope;
}

groupWithin()

Rill<Chunk<O>>groupWithin(intchunkSize,Durationtimeout)

Accumulates elements into chunks, emitting whenever chunkSize is reached or timeout has elapsed since the first buffered element — whichever comes first.

Implementation
dart
Rill<Chunk<O>> groupWithin(int chunkSize, Duration timeout) {
  if (chunkSize <= 0) throw ArgumentError('Rill.groupWithin: chunkSize must be > 0');

  return Rill.eval(Queue.unbounded<Option<Chunk<O>>>()).flatMap((queue) {
    &#47;&#47; Producer: runs in background pushing chunks to the queue
    final producer = chunks()
        .map((c) => Some(c))
        .evalMap(queue.offer)
        .compile
        .drain
        .guarantee(queue.offer(const None()));

    &#47;&#47; Consumer: buffers data and races against timeout
    &#47;&#47;
    &#47;&#47; [deadline]: Monotonic time (micros) the current buffer MUST be emitted or null for empty buffer
    Pull<Chunk<O>, Unit> consumeLoop(Chunk<O> buffer, int? deadline) {
      if (buffer.size >= chunkSize) {
        final (toEmit, remainder) = buffer.splitAt(chunkSize);

        if (remainder.isEmpty) {
          return Pull.output1(toEmit).append(() => consumeLoop(Chunk.empty(), null));
        } else {
          &#47;&#47; set new deadline
          return Pull.eval(IO.now).flatMap((now) {
            return Pull.output1(toEmit).append(() {
              return consumeLoop(remainder, now.microsecondsSinceEpoch + timeout.inMicroseconds);
            });
          });
        }
      } else {
        &#47;&#47; Buffer isn't full so wait for data or timeout
        if (buffer.isEmpty) {
          &#47;&#47; no buffered data, don't care about time. wait for next chunk
          return Pull.eval(queue.take()).flatMap((opt) {
            return opt.fold(
              () => Pull.done,
              (chunk) {
                &#47;&#47; data has arrived, start the clock
                return Pull.eval(IO.now).flatMap((now) {
                  return consumeLoop(chunk, now.microsecondsSinceEpoch + timeout.inMicroseconds);
                });
              },
            );
          });
        } else {
          &#47;&#47; buffer has data, race queue againt the clock
          return Pull.eval(IO.now).flatMap((now) {
            final remainingMicros = deadline! - now.microsecondsSinceEpoch;

            if (remainingMicros <= 0) {
              &#47;&#47; timeout reached, emit whatever is buffered
              return Pull.output1(buffer).append(() => consumeLoop(Chunk.empty(), null));
            } else {
              final waitOp = IO.sleep(Duration(microseconds: remainingMicros));
              final takeOp = queue.take();

              return Pull.eval(IO.race(takeOp, waitOp)).flatMap((raceResult) {
                return raceResult.fold(
                  (takeResult) {
                    return takeResult.fold(
                      () => Pull.output1(buffer), &#47;&#47; producer finished
                      (newChunk) => consumeLoop(buffer.concat(newChunk), deadline),
                    );
                  },
                  (_) => Pull.output1(buffer).append(() => consumeLoop(Chunk.empty(), null)),
                );
              });
            }
          });
        }
      }
    }

    return Rill.bracket(
      producer.start(),
      (fiber) => fiber.cancel(),
    ).flatMap((_) => consumeLoop(Chunk.empty(), null).rill);
  });
}

handleErrorWith()

Rill<O>handleErrorWith(Rill<O>Function(Object)f)

Recovers from any error by switching to the stream returned by f.

Implementation
dart
Rill<O> handleErrorWith(Function1<Object, Rill<O>> f) =>
    underlying.handleErrorWith((err) => f(err).underlying).rillNoScope;

holdResource()

Resource<Signal<O>>holdResource(Oinitial)

Creates a Signal that holds the latest element emitted by this stream.

The signal starts with initial and is updated in the background. The Resource ensures the background fiber is canceled on release.

Implementation
dart
Resource<Signal<O>> holdResource(O initial) => Resource.eval(
  SignallingRef.of(initial),
).flatTap((sig) => foreach((n) => sig.setValue(n)).compile.drain.background());

ifEmpty()

Rill<O>ifEmpty(Rill<O>Function()fallback)

Runs fallback if this stream emits no elements.

Implementation
dart
Rill<O> ifEmpty(Function0<Rill<O>> fallback) =>
    pull.uncons.flatMap((hdtl) {
      return hdtl.foldN(
        () => fallback().underlying,
        (hd, tl) => Pull.output(hd).append(() => tl.underlying),
      );
    }).rillNoScope;

ifEmptyEmit()

Rill<O>ifEmptyEmit(OFunction()o)

Emits the value produced by o if this stream is empty.

Implementation
dart
Rill<O> ifEmptyEmit(Function0<O> o) => ifEmpty(() => Rill.emit(o()));

interleave()

Rill<O>interleave(Rill<O>that)

Alternates elements from this stream and that, stopping when either ends.

Implementation
dart
Rill<O> interleave(Rill<O> that) => zip(that).flatMap((t) => Rill.emits([t.$1, t.$2]));

interleaveAll()

Rill<O>interleaveAll(Rill<O>that)

Like interleave but continues with the longer stream after the shorter ends, treating missing elements as absent.

Implementation
dart
Rill<O> interleaveAll(Rill<O> that) => map(
  (o) => Option(o),
).zipAll<Option<O>>(that.map((o) => Option(o)), none(), none()).flatMap((t) {
  final (thisOpt, thatOpt) = t;
  return Rill.chunk(Chunk.from(thisOpt)).append(() => Rill.chunk(Chunk.from(thatOpt)));
});

interruptAfter()

Rill<O>interruptAfter(Durationduration)

Cancels the stream after duration has elapsed. Alias for interruptWhen(IO.sleep(duration)).

Implementation
dart
Rill<O> interruptAfter(Duration duration) => interruptWhen(IO.sleep(duration));

interruptWhen()

Rill<O>interruptWhen<B>(IO<B>signal)

Cancels this stream when signal completes (regardless of its result).

Implementation
dart
Rill<O> interruptWhen<B>(IO<B> signal) {
  return Rill.eval(IO.deferred<Unit>()).flatMap((stopEvent) {
    final startSignalFiber = signal.attempt().flatMap((_) => stopEvent.complete(Unit())).start();

    return Rill.eval(startSignalFiber).flatMap((fiber) {
      final haltWhen = stopEvent.value().as(Unit().asRight<Object>());
      return Pull.interruptWhen(underlying, haltWhen).rillNoScope.onFinalize(fiber.cancel());
    });
  });
}

interruptWhenSignaled()

Rill<O>interruptWhenSignaled(Signal<bool>signal)

Cancels this stream when signal's value becomes true.

Implementation
dart
Rill<O> interruptWhenSignaled(Signal<bool> signal) => interruptWhenTrue(signal.discrete);

interruptWhenTrue()

Rill<O>interruptWhenTrue(Rill<bool>haltWhenTrue)

Cancels this stream when haltWhenTrue emits true.

Implementation
dart
Rill<O> interruptWhenTrue(Rill<bool> haltWhenTrue) {
  final rillF = (
    IO.deferred<Unit>(),
    IO.deferred<Unit>(),
    IO.deferred<Either<Object, Unit>>(),
  ).mapN((
    interruptL,
    interruptR,
    backResult,
  ) {
    final watch =
        haltWhenTrue.exists((x) => x).interruptWhen(interruptR.value().attempt()).compile.drain;

    final wakeWatch =
        watch.guaranteeCase((oc) {
          final Either<Object, Unit> r = oc.fold(
            () => Unit().asRight(),
            (err, _) => err.asLeft(),
            (_) => Unit().asRight(),
          );

          return backResult.complete(r).productR(interruptL.complete(Unit()).voided());
        }).voidError();

    final stopWatch = interruptR
        .complete(Unit())
        .productR(backResult.value().flatMap(IO.fromEither));

    final backWatch = Rill.bracket(wakeWatch.start(), (_) => stopWatch);

    return backWatch.flatMap((_) => interruptWhen(interruptL.value().attempt()));
  });

  return Rill.force(rillF);
}

intersperse()

Rill<O>intersperse(Oseparator)

Inserts separator between every pair of adjacent elements.

Implementation
dart
Rill<O> intersperse(O separator) {
  Chunk<O> doChunk(Chunk<O> hd, bool isFirst) {
    final bldr = <O>[];

    final iter = hd.iterator;

    if (isFirst) bldr.add(iter.next());

    iter.foreach((o) {
      bldr.add(separator);
      bldr.add(o);
    });

    return Chunk.fromList(bldr);
  }

  Pull<O, Unit> go(Rill<O> str) {
    return str.pull.uncons.flatMap((hdtl) {
      return hdtl.foldN(
        () => Pull.done,
        (hd, tl) => Pull.output(doChunk(hd, false)).append(() => go(tl)),
      );
    });
  }

  return pull.uncons.flatMap((hdtl) {
    return hdtl.foldN(
      () => Pull.done,
      (hd, tl) => Pull.output(doChunk(hd, true)).append(() => go(tl)),
    );
  }).rillNoScope;
}

keepAlive()

Rill<O>keepAlive(DurationmaxIdle,IO<O>heartbeat)

Emits a heartbeat value whenever no element has been received for maxIdle, keeping the consumer from timing out.

Implementation
dart
Rill<O> keepAlive(Duration maxIdle, IO<O> heartbeat) {
  return Rill.eval(Queue.unbounded<Option<Chunk<O>>>()).flatMap((queue) {
    final producer = chunks()
        .map((c) => Some(c))
        .evalMap(queue.offer)
        .compile
        .drain
        .guarantee(queue.offer(none()));

    Pull<O, Unit> consumeLoop() {
      final takeOp = queue.take();
      final timerOp = IO.sleep(maxIdle);

      return Pull.eval(IO.race(takeOp, timerOp)).flatMap((raceResult) {
        return raceResult.fold(
          (queueResult) => queueResult.fold(
            () => Pull.done,
            (chunk) => Pull.output(chunk).append(consumeLoop),
          ),
          (_) => Pull.eval(heartbeat).flatMap((o) => Pull.output1(o)).append(consumeLoop),
        );
      });
    }

    return Rill.bracket(
      producer.start(),
      (fiber) => fiber.cancel(),
    ).flatMap((_) => consumeLoop().rillNoScope);
  });
}

lastOr()

Rill<O>lastOr(OFunction()fallback)

Emits the last element, or the value from fallback if the stream is empty.

Implementation
dart
Rill<O> lastOr(Function0<O> fallback) =>
    pull.last
        .flatMap((o) => o.fold(() => Pull.output1(fallback()), (o) => Pull.output1(o)))
        .rillNoScope;

map()

Rill<O2>map<O2>(O2Function(O)f)

Transforms each element by applying f.

Implementation
dart
Rill<O2> map<O2>(Function1<O, O2> f) =>
    pull.echo.unconsFlatMap((hd) => Pull.output(hd.map(f))).rillNoScope;

mapAccumulate()

Rill<Record>mapAccumulate<S, O2>(Sinitial,RecordFunction(S,O)f,);

Stateful map: threads a state S through the stream, emitting (state, output) pairs.

Implementation
dart
Rill<(S, O2)> mapAccumulate<S, O2>(S initial, Function2<S, O, (S, O2)> f) {
  (S, (S, O2)) go(S s, O a) {
    final (newS, newO) = f(s, a);
    return (newS, (newS, newO));
  }

  return scanChunks(initial, (acc, c) => c.mapAccumulate(acc, go));
}

mapAsync()

Rill<O2>mapAsync<O2>(intmaxConcurrent,IO<O2>Function(O)f)

Alias for parEvalMap.

Implementation
dart
Rill<O2> mapAsync<O2>(int maxConcurrent, Function1<O, IO<O2>> f) => parEvalMap(maxConcurrent, f);

mapAsyncUnordered()

Rill<O2>mapAsyncUnordered<O2>(intmaxConcurrent,IO<O2>Function(O)f,);

Alias for parEvalMapUnordered.

Implementation
dart
Rill<O2> mapAsyncUnordered<O2>(int maxConcurrent, Function1<O, IO<O2>> f) =>
    parEvalMapUnordered(maxConcurrent, f);

mapChunks()

Rill<O2>mapChunks<O2>(Chunk<O2>Function(Chunk<O>)f)

Transforms each underlying Chunk directly.

More efficient than map when f can be applied to an entire chunk at once (e.g. Chunk.map or Chunk.filter).

Implementation
dart
Rill<O2> mapChunks<O2>(Function1<Chunk<O>, Chunk<O2>> f) =>
    underlying.unconsFlatMap((hd) => Pull.output(f(hd))).rillNoScope;

merge()

Rill<O>merge(Rill<O>that)

Merges this stream and that concurrently, emitting elements from whichever side produces first.

The merged stream terminates when both sides have completed.

Implementation
dart
Rill<O> merge(Rill<O> that) => _merge(that, (s, fin) => Rill.exec<O>(fin).append(() => s));

mergeAndAwaitDownstream()

Rill<O>mergeAndAwaitDownstream(Rill<O>that)

Like merge but waits for the downstream consumer to finish processing each chunk before releasing resources.

Implementation
dart
Rill<O> mergeAndAwaitDownstream(Rill<O> that) => _merge(that, (s, fin) => s.onFinalize(fin));

mergeHaltBoth()

Rill<O>mergeHaltBoth(Rill<O>that)

Merges this stream and that, terminating as soon as either side ends.

Implementation
dart
Rill<O> mergeHaltBoth(Rill<O> that) =>
    noneTerminate().merge(that.noneTerminate()).unNoneTerminate;

mergeHaltL()

Rill<O>mergeHaltL(Rill<O>that)

Merges this stream and that, terminating when this (left) stream ends.

Implementation
dart
Rill<O> mergeHaltL(Rill<O> that) =>
    noneTerminate().merge(that.map((o) => Option(o))).unNoneTerminate;

mergeHaltR()

Rill<O>mergeHaltR(Rill<O>that)

Merges this stream and that, terminating when that (right) stream ends.

Implementation
dart
Rill<O> mergeHaltR(Rill<O> that) => that.mergeHaltL(this);

metered()

Rill<O>metered(Durationrate)

Throttles this stream to emit at most one element per rate.

Implementation
dart
Rill<O> metered(Duration rate) => Rill.fixedRate(rate).zipRight(this);

meteredStartImmediately()

Rill<O>meteredStartImmediately(Durationrate)

Like metered but emits the first element immediately.

Implementation
dart
Rill<O> meteredStartImmediately(Duration rate) =>
    Rill.fixedRateStartImmediately(rate).zipRight(this);

noneTerminate()

Rill<Option<O>>noneTerminate()

Wraps each element in Some and appends a terminal none sentinel.

Implementation
dart
Rill<Option<O>> noneTerminate() => map((o) => Option(o)).append(() => Rill.emit(none()));

noSuchMethod() inherited

dynamicnoSuchMethod(Invocationinvocation)

Invoked when a nonexistent method or property is accessed.

A dynamic member invocation can attempt to call a member which doesn't exist on the receiving object. Example:

dart
dynamic object = 1;
object.add(42); // Statically allowed, run-time error

This invalid code will invoke the noSuchMethod method of the integer 1 with an Invocation representing the .add(42) call and arguments (which then throws).

Classes can override noSuchMethod to provide custom behavior for such invalid dynamic invocations.

A class with a non-default noSuchMethod invocation can also omit implementations for members of its interface. Example:

dart
class MockList<T> implements List<T> {
  noSuchMethod(Invocation invocation) {
    log(invocation);
    super.noSuchMethod(invocation); // Will throw.
  }
}
void main() {
  MockList().add(42);
}

This code has no compile-time warnings or errors even though the MockList class has no concrete implementation of any of the List interface methods. Calls to List methods are forwarded to noSuchMethod, so this code will log an invocation similar to Invocation.method(#add, [42]) and then throw.

If a value is returned from noSuchMethod, it becomes the result of the original invocation. If the value is not of a type that can be returned by the original invocation, a type error occurs at the invocation.

The default behavior is to throw a NoSuchMethodError.

Inherited from Object.

Implementation
dart
@pragma("vm:entry-point")
@pragma("wasm:entry-point")
external dynamic noSuchMethod(Invocation invocation);

onComplete()

Rill<O>onComplete(Rill<O>Function()s2)

Runs s2 after this stream ends, whether it succeeded or errored.

Implementation
dart
Rill<O> onComplete(Function0<Rill<O>> s2) =>
    handleErrorWith((e) => s2().append(() => Pull.fail(e).rillNoScope)).append(() => s2());

onFinalize()

Rill<O>onFinalize(IO<Unit>finalizer)

Runs finalizer when this stream's scope closes (success, error, or cancel).

Implementation
dart
Rill<O> onFinalize(IO<Unit> finalizer) => onFinalizeCase((_) => finalizer);

onFinalizeCase()

Rill<O>onFinalizeCase(IO<Unit>Function(ExitCase)finalizer)

Like onFinalize but finalizer receives the ExitCase.

Implementation
dart
Rill<O> onFinalizeCase(Function1<ExitCase, IO<Unit>> finalizer) =>
    Rill.bracketCase(IO.unit, (_, ec) => finalizer(ec)).flatMap((_) => this);

parEvalMap()

Rill<O2>parEvalMap<O2>(intmaxConcurrent,IO<O2>Function(O)f)

Maps elements concurrently via f, preserving output order.

At most maxConcurrent evaluations run simultaneously. Use parEvalMapUnordered if order does not matter.

Implementation
dart
Rill<O2> parEvalMap<O2>(int maxConcurrent, Function1<O, IO<O2>> f) {
  if (maxConcurrent == 1) {
    return evalMap(f);
  } else {
    assert(maxConcurrent > 0, 'maxConcurrent must be > 0, was: $maxConcurrent');

    &#47;&#47; One is taken by inner stream read.
    final concurrency = maxConcurrent == Integer.maxValue ? Integer.maxValue : maxConcurrent + 1;
    final channelF = Channel.bounded<IO<Either<Object, O2>>>(concurrency);

    return _parEvalMapImpl(concurrency, channelF, true, f);
  }
}

parEvalMapUnbounded()

Rill<O2>parEvalMapUnbounded<O2>(IO<O2>Function(O)f)

Like parEvalMap with no concurrency limit (unbounded parallelism).

Implementation
dart
Rill<O2> parEvalMapUnbounded<O2>(Function1<O, IO<O2>> f) =>
    _parEvalMapImpl(Integer.maxValue, Channel.unbounded(), true, f);

parEvalMapUnordered()

Rill<O2>parEvalMapUnordered<O2>(intmaxConcurrent,IO<O2>Function(O)f,);

Like parEvalMap but emits results as soon as they are ready, without preserving input order.

Implementation
dart
Rill<O2> parEvalMapUnordered<O2>(int maxConcurrent, Function1<O, IO<O2>> f) {
  if (maxConcurrent == 1) {
    return evalMap(f);
  } else {
    assert(maxConcurrent > 0, 'maxConcurrent must be > 0, was: $maxConcurrent');

    &#47;&#47; One is taken by inner stream read.
    final concurrency = maxConcurrent == Integer.maxValue ? Integer.maxValue : maxConcurrent + 1;
    final channelF = Channel.bounded<IO<Either<Object, O2>>>(concurrency);

    return _parEvalMapImpl(concurrency, channelF, false, f);
  }
}

parEvalMapUnorderedUnbounded()

Rill<O2>parEvalMapUnorderedUnbounded<O2>(IO<O2>Function(O)f)

Like parEvalMapUnordered with no concurrency limit.

Implementation
dart
Rill<O2> parEvalMapUnorderedUnbounded<O2>(Function1<O, IO<O2>> f) =>
    _parEvalMapImpl(Integer.maxValue, Channel.unbounded(), false, f);

pauseWhen()

Rill<O>pauseWhen(Rill<bool>pauseWhenTrue)

Pauses emission whenever pauseWhenTrue emits true and resumes when it emits false.

Implementation
dart
Rill<O> pauseWhen(Rill<bool> pauseWhenTrue) {
  return Rill.eval(SignallingRef.of(false)).flatMap((pauseSignal) {
    return pauseWhenSignal(
      pauseSignal,
    ).mergeHaltBoth(pauseWhenTrue.foreach(pauseSignal.setValue));
  });
}

pauseWhenSignal()

Rill<O>pauseWhenSignal(Signal<bool>pauseWhneTrue)

Like pauseWhen but driven by a Signal rather than a Rill.

Implementation
dart
Rill<O> pauseWhenSignal(Signal<bool> pauseWhneTrue) {
  final waitToResume = pauseWhneTrue.waitUntil((x) => !x);
  final pauseIfNeeded = Rill.exec<O>(
    pauseWhneTrue.value().flatMap((paused) => waitToResume.whenA(paused)),
  );

  return pauseIfNeeded.append(
    () => chunks().flatMap((chunk) {
      return Rill.chunk(chunk).append(() => pauseIfNeeded);
    }),
  );
}

rechunkRandomly()

Rill<O>rechunkRandomly({doubleminFactor=0.1,doublemaxFactor=2.0,int?seed,});

Randomly re-sizes chunks by a factor between minFactor and maxFactor.

Useful for testing that consumers handle arbitrary chunk boundaries.

Implementation
dart
Rill<O> rechunkRandomly({double minFactor = 0.1, double maxFactor = 2.0, int? seed}) {
  if (minFactor <= 0 || maxFactor < minFactor) {
    throw ArgumentError('Invalid rechunk factors. Ensure 0 < minFactor <= maxFactor');
  }

  return Rill.suspend(() {
    final rng = math.Random(seed);

    Pull<O, Unit> loop(Chunk<O> acc, Rill<O> s) {
      return s.pull.uncons.flatMap((hdtl) {
        return hdtl.foldN(
          () => acc.isEmpty ? Pull.done : Pull.output(acc),
          (hd, tl) {
            final newSize =
                math
                    .max(
                      1.0,
                      (acc.size + hd.size) *
                          (minFactor + (maxFactor - minFactor) * rng.nextDouble()),
                    )
                    .toInt();
            final newAcc = acc.concat(hd);

            if (newAcc.size < newSize) {
              return loop(newAcc, tl);
            } else {
              final (toEmit, rest) = newAcc.splitAt(newSize);
              return Pull.output(toEmit).append(() => loop(rest, tl));
            }
          },
        );
      });
    }

    return loop(Chunk.empty<O>(), this).rillNoScope;
  });
}

reduce()

Rill<O>reduce(OFunction(O,O)f)

Alias for fold1.

Implementation
dart
Rill<O> reduce(Function2<O, O, O> f) => fold1(f);

repeat()

Rill<O>repeat()

Repeats this stream indefinitely.

Implementation
dart
Rill<O> repeat() => append(repeat);

repeatN()

Rill<O>repeatN(intn)

Repeats this stream n additional times (total = original + n copies).

Implementation
dart
Rill<O> repeatN(int n) => n > 0 ? append(() => repeatN(n - 1)) : Rill.empty();

repeatPull()

Rill<O2>repeatPull<O2>(Pull<O2,Option<Rill<O>>>Function(ToPull<O>)f,);

Low-level combinator: calls f with a ToPull handle and repeats until f returns none.

Implementation
dart
Rill<O2> repeatPull<O2>(Function1<ToPull<O>, Pull<O2, Option<Rill<O>>>> f) {
  Pull<O2, Unit> go(ToPull<O> tp) {
    return f(tp).flatMap((tail) {
      return tail.fold(() => Pull.done, (tail) => go(tail.pull));
    });
  }

  return go(pull).rillNoScope;
}

scan()

Rill<O2>scan<O2>(O2z,O2Function(O2,O)f)

Emits z followed by each running accumulation of f applied to consecutive elements (i.e. a running fold).

Implementation
dart
Rill<O2> scan<O2>(O2 z, Function2<O2, O, O2> f) =>
    Pull.output1(z).append(() => _scan(z, f)).rillNoScope;

scan1()

Rill<O>scan1(OFunction(O,O)f)

Like scan but uses the first element as the initial accumulator (no seed value emitted).

Implementation
dart
Rill<O> scan1(Function2<O, O, O> f) =>
    pull.uncons.flatMap((hdtl) {
      return hdtl.foldN(
        () => Pull.done,
        (hd, tl) {
          final (pre, post) = hd.splitAt(1);
          return Pull.output(pre).append(() => tl.cons(post)._scan(pre[0], f));
        },
      );
    }).rillNoScope;

scanChunks()

Rill<O2>scanChunks<S, O2>(Sinitial,RecordFunction(S,Chunk<O>)f,);

Chunk-level scan: threads state through each chunk, emitting transformed chunks.

Implementation
dart
Rill<O2> scanChunks<S, O2>(S initial, Function2<S, Chunk<O>, (S, Chunk<O2>)> f) =>
    scanChunksOpt(initial, (s) => Some((c) => f(s, c)));

scanChunksOpt()

Rill<O2>scanChunksOpt<S, O2>(Sinitial,Option<RecordFunction(Chunk<O>)>Function(S)f,);

Like scanChunks but allows early termination: returning none from f stops the scan.

Implementation
dart
Rill<O2> scanChunksOpt<S, O2>(
  S initial,
  Function1<S, Option<Function1<Chunk<O>, (S, Chunk<O2>)>>> f,
) => pull.scanChunksOpt(initial, f).voided.rillNoScope;

sliding()

Rill<Chunk<O>>sliding(intsize, {intstep=1})

Emits overlapping windows of size elements, advancing step elements between windows.

Implementation
dart
Rill<Chunk<O>> sliding(int size, {int step = 1}) {
  assert(size > 0 && step > 0, 'size and step must be positive');
  Pull<Chunk<O>, Unit> stepNotSmallerThanSize(Rill<O> s, Chunk<O> prev) {
    return s.pull.uncons.flatMap(
      (hdtl) => hdtl.foldN(
        () => prev.isEmpty ? Pull.done : Pull.output1(prev.take(size)),
        (hd, tl) {
          final bldr = <Chunk<O>>[];

          var current = prev.concat(hd);

          while (current.size >= step) {
            final (nHeads, nTails) = current.splitAt(step);
            bldr.add(nHeads.take(size));
            current = nTails;
          }

          return Pull.output(
            Chunk.fromList(bldr),
          ).append(() => stepNotSmallerThanSize(tl, current));
        },
      ),
    );
  }

  Pull<Chunk<O>, Unit> stepSmallerThanSize(Rill<O> s, Chunk<O> window, Chunk<O> prev) {
    return s.pull.uncons.flatMap(
      (hdtl) => hdtl.foldN(
        () => prev.isEmpty ? Pull.done : Pull.output1(window.concat(prev).take(size)),
        (hd, tl) {
          final bldr = <Chunk<O>>[];

          var w = window;
          var current = prev.concat(hd);

          while (current.size >= step) {
            final (head, tail) = current.splitAt(step);
            final wind = w.concat(head);

            bldr.add(wind);
            w = wind.drop(step);

            current = tail;
          }

          return Pull.output(
            Chunk.fromList(bldr),
          ).append(() => stepSmallerThanSize(tl, w, current));
        },
      ),
    );
  }

  if (step < size) {
    return pull
        .unconsN(size, allowFewer: true)
        .flatMap(
          (hdtl) => hdtl.foldN(
            () => Pull.done,
            (hd, tl) => Pull.output1(
              hd,
            ).append(() => stepSmallerThanSize(tl, hd.drop(step), Chunk.empty())),
          ),
        )
        .rillNoScope;
  } else {
    return stepNotSmallerThanSize(this, Chunk.empty()).rillNoScope;
  }
}

spaced()

Rill<O>spaced(Durationdelay, {boolstartImmediately=true})

Zips this stream against a fixed-delay ticker so elements are emitted at most once per delay.

When startImmediately is true (default), the first element is emitted without waiting.

Implementation
dart
Rill<O> spaced(Duration delay, {bool startImmediately = true}) {
  final start = startImmediately ? Rill.unit : Rill.empty<Unit>();
  return start.append(() => Rill.fixedDelay(delay)).zipRight(this);
}

split()

Rill<Chunk<O>>split(boolFunction(O)p)

Splits the stream into sub-chunks at elements for which p is true; delimiter elements are dropped.

Implementation
dart
Rill<Chunk<O>> split(Function1<O, bool> p) {
  Pull<Chunk<O>, Unit> go(Chunk<O> buffer, Rill<O> s) {
    return s.pull.uncons.flatMap((hdtl) {
      return hdtl.foldN(
        () => buffer.nonEmpty ? Pull.output1(buffer) : Pull.done,
        (hd, tl) {
          return hd.indexWhere(p).fold(
            () => go(buffer.concat(hd), tl),
            (idx) {
              final pfx = hd.take(idx);
              final b2 = buffer.concat(pfx);

              return Pull.output1(b2).append(() => go(Chunk.empty(), tl.cons(hd.drop(idx + 1))));
            },
          );
        },
      );
    });
  }

  return go(Chunk.empty(), this).rillNoScope;
}

switchMap()

Rill<O2>switchMap<O2>(Rill<O2>Function(O)f)

Maps each element to a stream via f, canceling the previous inner stream when a new element arrives.

Only the inner stream corresponding to the latest element runs at any given time.

Implementation
dart
Rill<O2> switchMap<O2>(Function1<O, Rill<O2>> f) {
  final IO<Rill<O2>> rillF = Semaphore.permits(1).flatMap((guard) {
    return IO.ref(none<Deferred<Unit>>()).map((haltRef) {
      Rill<O2> runInner(O o, Deferred<Unit> halt) {
        return Rill.bracketFull(
          (poll) => poll(guard.acquire()), &#47;&#47; guard against simultaneously running rills
          (_, ec) {
            return ec.fold(
              () => guard.release(),
              (_, _) => IO.unit, &#47;&#47; on error don't start next stream
              () => guard.release(),
            );
          },
        ).flatMap((_) => f(o).interruptWhen(halt.value().attempt()));
      }

      IO<Rill<O2>> haltedF(O o) {
        return IO.deferred<Unit>().flatMap((halt) {
          return haltRef.getAndSet(halt.some).flatMap((prev) {
            return prev
                .fold(() => IO.unit, (prev) => prev.complete(Unit()))
                .map((_) => runInner(o, halt));
          });
        });
      }

      return evalMap(haltedF).parJoin(2);
    });
  });

  return Rill.force(rillF);
}

take()

Rill<O>take(intn)

Emits at most the first n elements, then terminates.

Implementation
dart
Rill<O> take(int n) => pull.take(n).voided.rillNoScope;

takeRight()

Rill<O>takeRight(intn)

Emits only the last n elements, buffering the full stream first.

Implementation
dart
Rill<O> takeRight(int n) => pull.takeRight(n).flatMap(Pull.output).rillNoScope;

takeThrough()

Rill<O>takeThrough(boolFunction(O)p)

Takes elements while p is true and also takes the first failing element.

Implementation
dart
Rill<O> takeThrough(Function1<O, bool> p) => pull.takeThrough(p).voided.rillNoScope;

takeWhile()

Rill<O>takeWhile(boolFunction(O)p, {booltakeFailure=false})

Takes elements while p is true.

When takeFailure is true, also emits the first element for which p is false (equivalent to takeThrough).

Implementation
dart
Rill<O> takeWhile(Function1<O, bool> p, {bool takeFailure = false}) =>
    pull.takeWhile(p, takeFailure: takeFailure).voided.rillNoScope;

through()

Rill<O2>through<O2>(Rill<O2>Function(Rill<O>)f)

Applies a Pipe to this stream. Equivalent to f(this).

Implementation
dart
Rill<O2> through<O2>(Pipe<O, O2> f) => f(this);

timeout()

Rill<O>timeout(Durationtimeout)

Alias for interruptWhen(IO.sleep(timeout)).

Implementation
dart
Rill<O> timeout(Duration timeout) => interruptWhen(IO.sleep(timeout));

toString() inherited

StringtoString()

A string representation of this object.

Some classes have a default textual representation, often paired with a static parse function (like int.parse). These classes will provide the textual representation as their string representation.

Other classes have no meaningful textual representation that a program will care about. Such classes will typically override toString to provide useful information when inspecting the object, mainly for debugging or logging.

Inherited from Object.

Implementation
dart
external String toString();

zip()

Rill<Record>zip<O2>(Rill<O2>that)

Pairs elements from this stream and that in lock-step, stopping when either ends.

Implementation
dart
Rill<(O, O2)> zip<O2>(Rill<O2> that) => zipWith(that, (o, o2) => (o, o2));

zipAll()

Rill<Record>zipAll<O2>(Rill<O2>that,OpadLeft,O2padRight)

Zips with that, padding the shorter stream with padLeft or padRight.

Implementation
dart
Rill<(O, O2)> zipAll<O2>(Rill<O2> that, O padLeft, O2 padRight) =>
    zipAllWith(that, padLeft, padRight, (o, o2) => (o, o2));

zipAllWith()

Rill<O3>zipAllWith<O2, O3>(Rill<O2>that,OpadLeft,O2padRight,O3Function(O,O2)f,);

Like zipAll but combines pairs via f.

Implementation
dart
Rill<O3> zipAllWith<O2, O3>(Rill<O2> that, O padLeft, O2 padRight, Function2<O, O2, O3> f) {
  Pull<O3, Unit> loop(Rill<O> s1, Rill<O2> s2) {
    return s1.pull.uncons.flatMap((hdtl1) {
      return hdtl1.foldN(
        () => s2.map((o2) => f(padLeft, o2)).pull.echo,
        (hd1, tl1) {
          return s2.pull.uncons.flatMap((hdtl2) {
            return hdtl2.foldN(
              () {
                final zippedChunk = hd1.map((o) => f(o, padRight));
                return Pull.output(zippedChunk).flatMap((_) {
                  return tl1.map((o) => f(o, padRight)).pull.echo;
                });
              },
              (hd2, tl2) {
                final len = math.min(hd1.size, hd2.size);

                final nextS1 = tl1.cons(hd1.drop(len));
                final nextS2 = tl2.cons(hd2.drop(len));

                return Pull.output(
                  hd1.zip(hd2).map((t) => f(t.$1, t.$2)),
                ).flatMap((_) => loop(nextS1, nextS2));
              },
            );
          });
        },
      );
    });
  }

  return loop(this, that).rillNoScope;
}

zipLatest()

Rill<Record>zipLatest<O2>(Rill<O2>that)

Emits the latest pair (thisValue, thatValue) whenever either stream produces a new element.

Implementation
dart
Rill<(O, O2)> zipLatest<O2>(Rill<O2> that) => zipLatestWith(that, (o, o2) => (o, o2));

zipLatestWith()

Rill<O3>zipLatestWith<O2, O3>(Rill<O2>that,O3Function(O,O2)f,);

Like zipLatest but combines the latest pair via f.

Implementation
dart
Rill<O3> zipLatestWith<O2, O3>(Rill<O2> that, Function2<O, O2, O3> f) {
  return Rill.eval(Queue.unbounded<Either<Object, Option<O3>>>()).flatMap((queue) {
    return Rill.eval(Ref.of<Option<O>>(none())).flatMap((refLeft) {
      return Rill.eval(Ref.of<Option<O2>>(none())).flatMap((refRight) {
        IO<Unit> tryEmit() {
          return refLeft.value().flatMap((optL) {
            return refRight.value().flatMap((optR) {
              return optL.fold(
                () => IO.unit,
                (l) => optR.fold(
                  () => IO.unit,
                  (r) => queue.offer(Right(Some(f(l, r)))),
                ),
              );
            });
          });
        }

        IO<Unit> runLeft() => evalMap(
          (o) => refLeft.setValue(Some(o)).productL(tryEmit()),
        ).compile.drain.handleErrorWith((err) => queue.offer(Left(err)));

        IO<Unit> runRight() => that
            .evalMap((o2) => refRight.setValue(Some(o2)).productL(tryEmit()))
            .compile
            .drain
            .handleErrorWith((err) => queue.offer(Left(err)));

        Rill<O3> consumeQueue() {
          return Rill.eval(queue.take()).flatMap((event) {
            return event.fold(
              (err) => Rill.raiseError(err),
              (opt) => opt.fold(
                () => Rill.empty(),
                (o3) => Rill.emit(o3).append(() => consumeQueue()),
              ),
            );
          });
        }

        final driver = IO.both(runLeft(), runRight()).guarantee(queue.offer(Right(none())));

        return Rill.bracket(
          driver.start(),
          (fiber) => fiber.cancel(),
        ).flatMap((_) => consumeQueue());
      });
    });
  });
}

zipLeft()

Rill<O>zipLeft<O2>(Rill<O2>other)

Zips with other and keeps only this stream's elements.

Implementation
dart
Rill<O> zipLeft<O2>(Rill<O2> other) => zipWith(other, (a, _) => a);

zipRight()

Rill<O2>zipRight<O2>(Rill<O2>other)

Zips with other and keeps only other's elements.

Implementation
dart
Rill<O2> zipRight<O2>(Rill<O2> other) => zipWith(other, (_, b) => b);

zipWith()

Rill<O3>zipWith<O2, O3>(Rill<O2>that,O3Function(O,O2)f)

Zips with that, combining pairs via f, stopping when either stream ends.

Implementation
dart
Rill<O3> zipWith<O2, O3>(Rill<O2> that, Function2<O, O2, O3> f) {
  Pull<O3, Unit> loop(Rill<O> s1, Rill<O2> s2) {
    return s1.pull.uncons.flatMap((hdtl1) {
      return hdtl1.foldN(
        () => Pull.done,
        (hd1, tl1) {
          return s2.pull.uncons.flatMap((hdtl2) {
            return hdtl2.foldN(
              () => Pull.done,
              (hd2, tl2) {
                final len = math.min(hd1.size, hd2.size);

                final nextS1 = tl1.cons(hd1.drop(len));
                final nextS2 = tl2.cons(hd2.drop(len));

                return Pull.output(
                  hd1.zip(hd2).map((t) => f(t.$1, t.$2)),
                ).flatMap((_) => loop(nextS1, nextS2));
              },
            );
          });
        },
      );
    });
  }

  return loop(this, that).rillNoScope;
}

zipWithIndex()

Rill<Record>zipWithIndex()

Pairs each element with its zero-based index.

Implementation
dart
Rill<(O, int)> zipWithIndex() => scanChunks(0, (index, c) {
  var idx = index;

  final out = c.map((o) {
    final r = (o, idx);
    idx += 1;
    return r;
  });

  return (idx, out);
});

zipWithNext()

Rill<Record>zipWithNext()

Pairs each element with the next one. The last element is paired with none.

Implementation
dart
Rill<(O, Option<O>)> zipWithNext() {
  Pull<(O, Option<O>), Unit> go(O last, Rill<O> s) {
    return s.pull.uncons.flatMap((hdtl) {
      return hdtl.foldN(
        () => Pull.output1((last, none())),
        (hd, tl) {
          final (newLast, out) = hd.mapAccumulate(last, (prev, next) {
            return (next, (prev, Option(next)));
          });

          return Pull.output(out).append(() => go(newLast, tl));
        },
      );
    });
  }

  return pull.uncons1.flatMap((hdtl) {
    return hdtl.foldN(
      () => Pull.done,
      (hd, tl) => go(hd, tl),
    );
  }).rillNoScope;
}

zipWithPrevious()

Rill<Record>zipWithPrevious()

Pairs each element with the previous one. The first element is paired with none.

Implementation
dart
Rill<(Option<O>, O)> zipWithPrevious() =>
    mapAccumulate(none<O>(), (prev, next) => (Option(next), (prev, next))).map((x) => x.$2);

zipWithPreviousAndNext()

Rill<Record>zipWithPreviousAndNext()

Pairs each element with both its predecessor and successor.

Implementation
dart
Rill<(Option<O>, O, Option<O>)> zipWithPreviousAndNext() =>
    zipWithPrevious().zipWithNext().map((tuple) {
      final ((prev, curr), next) = tuple;

      return next.foldN(
        () => (prev, curr, const None()),
        (_, next) => (prev, curr, Some(next)),
      );
    });

zipWithScan()

Rill<Record>zipWithScan<O2>(O2z,O2Function(O2,O)f)

Pairs each element with the accumulated state before applying f to it.

Implementation
dart
Rill<(O, O2)> zipWithScan<O2>(O2 z, Function2<O2, O, O2> f) => mapAccumulate(
  z,
  (s, o) => (f(s, o), (o, s)),
).mapN((_, v) => v);

zipWithScan1()

Rill<Record>zipWithScan1<O2>(O2z,O2Function(O2,O)f)

Pairs each element with the accumulated state after applying f to it.

Implementation
dart
Rill<(O, O2)> zipWithScan1<O2>(O2 z, Function2<O2, O, O2> f) => mapAccumulate(
  z,
  (s, o) {
    final s2 = f(s, o);
    return (s2, (o, s2));
  },
).mapN((_, v) => v);

Extension Methods

collectN() extension

Rill<T3>collectN<T3>(Option<T3>Function(T1,T2)f)

Available on Rill<O>, provided by the RillTuple2Ops<T1, T2> extension

Implementation
dart
Rill<T3> collectN<T3>(Function2<T1, T2, Option<T3>> f) => collect(f.tupled);

collectN() extension

Rill<T4>collectN<T4>(Option<T4>Function(T1,T2,T3)f)

Available on Rill<O>, provided by the RillTuple3Ops<T1, T2, T3> extension

Implementation
dart
Rill<T4> collectN<T4>(Function3<T1, T2, T3, Option<T4>> f) => collect(f.tupled);

collectN() extension

Rill<T5>collectN<T5>(Option<T5>Function(T1,T2,T3,T4)f)

Available on Rill<O>, provided by the RillTuple4Ops<T1, T2, T3, T4> extension

Implementation
dart
Rill<T5> collectN<T5>(Function4<T1, T2, T3, T4, Option<T5>> f) => collect(f.tupled);

collectN() extension

Rill<T6>collectN<T6>(Option<T6>Function(T1,T2,T3,T4,T5)f,);

Available on Rill<O>, provided by the RillTuple5Ops<T1, T2, T3, T4, T5> extension

Implementation
dart
Rill<T6> collectN<T6>(Function5<T1, T2, T3, T4, T5, Option<T6>> f) => collect(f.tupled);

evalMapN() extension

Rill<T3>evalMapN<T3>(IO<T3>Function(T1,T2)f)

Available on Rill<O>, provided by the RillTuple2Ops<T1, T2> extension

Implementation
dart
Rill<T3> evalMapN<T3>(Function2<T1, T2, IO<T3>> f) => evalMap(f.tupled);

evalMapN() extension

Rill<T4>evalMapN<T4>(IO<T4>Function(T1,T2,T3)f)

Available on Rill<O>, provided by the RillTuple3Ops<T1, T2, T3> extension

Implementation
dart
Rill<T4> evalMapN<T4>(Function3<T1, T2, T3, IO<T4>> f) => evalMap(f.tupled);

evalMapN() extension

Rill<T6>evalMapN<T6>(IO<T6>Function(T1,T2,T3,T4,T5)f,);

Available on Rill<O>, provided by the RillTuple5Ops<T1, T2, T3, T4, T5> extension

Implementation
dart
Rill<T6> evalMapN<T6>(Function5<T1, T2, T3, T4, T5, IO<T6>> f) => evalMap(f.tupled);

evalMapN() extension

Rill<T5>evalMapN<T5>(IO<T5>Function(T1,T2,T3,T4)f)

Available on Rill<O>, provided by the RillTuple4Ops<T1, T2, T3, T4> extension

Implementation
dart
Rill<T5> evalMapN<T5>(Function4<T1, T2, T3, T4, IO<T5>> f) => evalMap(f.tupled);

evalTapN() extension

Rill<Record>evalTapN<T6>(IO<T6>Function(T1,T2,T3,T4,T5)f,);

Available on Rill<O>, provided by the RillTuple5Ops<T1, T2, T3, T4, T5> extension

Implementation
dart
Rill<(T1, T2, T3, T4, T5)> evalTapN<T6>(Function5<T1, T2, T3, T4, T5, IO<T6>> f) =>
    evalTap(f.tupled);

evalTapN() extension

Rill<Record>evalTapN<T5>(IO<T5>Function(T1,T2,T3,T4)f)

Available on Rill<O>, provided by the RillTuple4Ops<T1, T2, T3, T4> extension

Implementation
dart
Rill<(T1, T2, T3, T4)> evalTapN<T5>(Function4<T1, T2, T3, T4, IO<T5>> f) => evalTap(f.tupled);

evalTapN() extension

Rill<Record>evalTapN<T4>(IO<T4>Function(T1,T2,T3)f)

Available on Rill<O>, provided by the RillTuple3Ops<T1, T2, T3> extension

Implementation
dart
Rill<(T1, T2, T3)> evalTapN<T4>(Function3<T1, T2, T3, IO<T4>> f) => evalTap(f.tupled);

evalTapN() extension

Rill<Record>evalTapN<T3>(IO<T3>Function(T1,T2)f)

Available on Rill<O>, provided by the RillTuple2Ops<T1, T2> extension

Implementation
dart
Rill<(T1, T2)> evalTapN<T3>(Function2<T1, T2, IO<T3>> f) => evalTap(f.tupled);

filterN() extension

Rill<Record>filterN(boolFunction(T1,T2,T3,T4,T5)f)

Available on Rill<O>, provided by the RillTuple5Ops<T1, T2, T3, T4, T5> extension

Implementation
dart
Rill<(T1, T2, T3, T4, T5)> filterN(Function5<T1, T2, T3, T4, T5, bool> f) => filter(f.tupled);

filterN() extension

Rill<Record>filterN(boolFunction(T1,T2)f)

Available on Rill<O>, provided by the RillTuple2Ops<T1, T2> extension

Implementation
dart
Rill<(T1, T2)> filterN(Function2<T1, T2, bool> f) => filter(f.tupled);

filterN() extension

Rill<Record>filterN(boolFunction(T1,T2,T3,T4)f)

Available on Rill<O>, provided by the RillTuple4Ops<T1, T2, T3, T4> extension

Implementation
dart
Rill<(T1, T2, T3, T4)> filterN(Function4<T1, T2, T3, T4, bool> f) => filter(f.tupled);

filterN() extension

Rill<Record>filterN(boolFunction(T1,T2,T3)f)

Available on Rill<O>, provided by the RillTuple3Ops<T1, T2, T3> extension

Implementation
dart
Rill<(T1, T2, T3)> filterN(Function3<T1, T2, T3, bool> f) => filter(f.tupled);

filterNotN() extension

Rill<Record>filterNotN(boolFunction(T1,T2,T3,T4,T5)f)

Available on Rill<O>, provided by the RillTuple5Ops<T1, T2, T3, T4, T5> extension

Implementation
dart
Rill<(T1, T2, T3, T4, T5)> filterNotN(Function5<T1, T2, T3, T4, T5, bool> f) =>
    filterNot(f.tupled);

filterNotN() extension

Rill<Record>filterNotN(boolFunction(T1,T2,T3,T4)f)

Available on Rill<O>, provided by the RillTuple4Ops<T1, T2, T3, T4> extension

Implementation
dart
Rill<(T1, T2, T3, T4)> filterNotN(Function4<T1, T2, T3, T4, bool> f) => filterNot(f.tupled);

filterNotN() extension

Rill<Record>filterNotN(boolFunction(T1,T2,T3)f)

Available on Rill<O>, provided by the RillTuple3Ops<T1, T2, T3> extension

Implementation
dart
Rill<(T1, T2, T3)> filterNotN(Function3<T1, T2, T3, bool> f) => filterNot(f.tupled);

filterNotN() extension

Rill<Record>filterNotN(boolFunction(T1,T2)f)

Available on Rill<O>, provided by the RillTuple2Ops<T1, T2> extension

Implementation
dart
Rill<(T1, T2)> filterNotN(Function2<T1, T2, bool> f) => filterNot(f.tupled);

flatMapN() extension

Rill<T4>flatMapN<T4>(Rill<T4>Function(T1,T2,T3)f)

Available on Rill<O>, provided by the RillTuple3Ops<T1, T2, T3> extension

Implementation
dart
Rill<T4> flatMapN<T4>(Function3<T1, T2, T3, Rill<T4>> f) => flatMap(f.tupled);

flatMapN() extension

Rill<T3>flatMapN<T3>(Rill<T3>Function(T1,T2)f)

Available on Rill<O>, provided by the RillTuple2Ops<T1, T2> extension

Implementation
dart
Rill<T3> flatMapN<T3>(Function2<T1, T2, Rill<T3>> f) => flatMap(f.tupled);

flatMapN() extension

Rill<T6>flatMapN<T6>(Rill<T6>Function(T1,T2,T3,T4,T5)f,);

Available on Rill<O>, provided by the RillTuple5Ops<T1, T2, T3, T4, T5> extension

Implementation
dart
Rill<T6> flatMapN<T6>(Function5<T1, T2, T3, T4, T5, Rill<T6>> f) => flatMap(f.tupled);

flatMapN() extension

Rill<T5>flatMapN<T5>(Rill<T5>Function(T1,T2,T3,T4)f)

Available on Rill<O>, provided by the RillTuple4Ops<T1, T2, T3, T4> extension

Implementation
dart
Rill<T5> flatMapN<T5>(Function4<T1, T2, T3, T4, Rill<T5>> f) => flatMap(f.tupled);

flatten() extension

Rill<O>flatten()

Concatenates the inner rills sequentially into a single stream.

Available on Rill<O>, provided by the RillFlattenOps<O> extension

Implementation
dart
Rill<O> flatten() => flatMap((r) => r);

ifM() extension

Rill<O2>ifM<O2>(Rill<O2>Function()ifTrue,Rill<O2>Function()ifFalse,);

For each emitted true element runs ifTrue, for each false runs ifFalse.

The chosen sub-rill is spliced into the output at each element position.

Available on Rill<O>, provided by the RillBooleanOps extension

Implementation
dart
Rill<O2> ifM<O2>(Function0<Rill<O2>> ifTrue, Function0<Rill<O2>> ifFalse) =>
    flatMap((b) => b ? ifTrue() : ifFalse());

mapN() extension

Rill<T6>mapN<T6>(T6Function(T1,T2,T3,T4,T5)f)

Available on Rill<O>, provided by the RillTuple5Ops<T1, T2, T3, T4, T5> extension

Implementation
dart
Rill<T6> mapN<T6>(Function5<T1, T2, T3, T4, T5, T6> f) => map(f.tupled);

mapN() extension

Rill<T3>mapN<T3>(T3Function(T1,T2)f)

Available on Rill<O>, provided by the RillTuple2Ops<T1, T2> extension

Implementation
dart
Rill<T3> mapN<T3>(Function2<T1, T2, T3> f) => map(f.tupled);

mapN() extension

Rill<T5>mapN<T5>(T5Function(T1,T2,T3,T4)f)

Available on Rill<O>, provided by the RillTuple4Ops<T1, T2, T3, T4> extension

Implementation
dart
Rill<T5> mapN<T5>(Function4<T1, T2, T3, T4, T5> f) => map(f.tupled);

mapN() extension

Rill<T4>mapN<T4>(T4Function(T1,T2,T3)f)

Available on Rill<O>, provided by the RillTuple3Ops<T1, T2, T3> extension

Implementation
dart
Rill<T4> mapN<T4>(Function3<T1, T2, T3, T4> f) => map(f.tupled);

parEvalMapN() extension

Rill<T6>parEvalMapN<T6>(intmaxConcurrent,IO<T6>Function(T1,T2,T3,T4,T5)f,);

Available on Rill<O>, provided by the RillTuple5Ops<T1, T2, T3, T4, T5> extension

Implementation
dart
Rill<T6> parEvalMapN<T6>(
  int maxConcurrent,
  Function5<T1, T2, T3, T4, T5, IO<T6>> f,
) => parEvalMap(maxConcurrent, f.tupled);

parEvalMapN() extension

Rill<T3>parEvalMapN<T3>(intmaxConcurrent,IO<T3>Function(T1,T2)f,);

Available on Rill<O>, provided by the RillTuple2Ops<T1, T2> extension

Implementation
dart
Rill<T3> parEvalMapN<T3>(
  int maxConcurrent,
  Function2<T1, T2, IO<T3>> f,
) => parEvalMap(maxConcurrent, f.tupled);

parEvalMapN() extension

Rill<T4>parEvalMapN<T4>(intmaxConcurrent,IO<T4>Function(T1,T2,T3)f,);

Available on Rill<O>, provided by the RillTuple3Ops<T1, T2, T3> extension

Implementation
dart
Rill<T4> parEvalMapN<T4>(
  int maxConcurrent,
  Function3<T1, T2, T3, IO<T4>> f,
) => parEvalMap(maxConcurrent, f.tupled);

parEvalMapN() extension

Rill<T5>parEvalMapN<T5>(intmaxConcurrent,IO<T5>Function(T1,T2,T3,T4)f,);

Available on Rill<O>, provided by the RillTuple4Ops<T1, T2, T3, T4> extension

Implementation
dart
Rill<T5> parEvalMapN<T5>(
  int maxConcurrent,
  Function4<T1, T2, T3, T4, IO<T5>> f,
) => parEvalMap(maxConcurrent, f.tupled);

parEvalMapUnboundedN() extension

Rill<T6>parEvalMapUnboundedN<T6>(IO<T6>Function(T1,T2,T3,T4,T5)f,);

Available on Rill<O>, provided by the RillTuple5Ops<T1, T2, T3, T4, T5> extension

Implementation
dart
Rill<T6> parEvalMapUnboundedN<T6>(Function5<T1, T2, T3, T4, T5, IO<T6>> f) =>
    parEvalMapUnbounded(f.tupled);

parEvalMapUnboundedN() extension

Rill<T3>parEvalMapUnboundedN<T3>(IO<T3>Function(T1,T2)f)

Available on Rill<O>, provided by the RillTuple2Ops<T1, T2> extension

Implementation
dart
Rill<T3> parEvalMapUnboundedN<T3>(Function2<T1, T2, IO<T3>> f) => parEvalMapUnbounded(f.tupled);

parEvalMapUnboundedN() extension

Rill<T5>parEvalMapUnboundedN<T5>(IO<T5>Function(T1,T2,T3,T4)f,);

Available on Rill<O>, provided by the RillTuple4Ops<T1, T2, T3, T4> extension

Implementation
dart
Rill<T5> parEvalMapUnboundedN<T5>(Function4<T1, T2, T3, T4, IO<T5>> f) =>
    parEvalMapUnbounded(f.tupled);

parEvalMapUnboundedN() extension

Rill<T4>parEvalMapUnboundedN<T4>(IO<T4>Function(T1,T2,T3)f,);

Available on Rill<O>, provided by the RillTuple3Ops<T1, T2, T3> extension

Implementation
dart
Rill<T4> parEvalMapUnboundedN<T4>(Function3<T1, T2, T3, IO<T4>> f) =>
    parEvalMapUnbounded(f.tupled);

parEvalMapUnorderedN() extension

Rill<T4>parEvalMapUnorderedN<T4>(intmaxConcurrent,IO<T4>Function(T1,T2,T3)f,);

Available on Rill<O>, provided by the RillTuple3Ops<T1, T2, T3> extension

Implementation
dart
Rill<T4> parEvalMapUnorderedN<T4>(
  int maxConcurrent,
  Function3<T1, T2, T3, IO<T4>> f,
) => parEvalMapUnordered(maxConcurrent, f.tupled);

parEvalMapUnorderedN() extension

Rill<T5>parEvalMapUnorderedN<T5>(intmaxConcurrent,IO<T5>Function(T1,T2,T3,T4)f,);

Available on Rill<O>, provided by the RillTuple4Ops<T1, T2, T3, T4> extension

Implementation
dart
Rill<T5> parEvalMapUnorderedN<T5>(
  int maxConcurrent,
  Function4<T1, T2, T3, T4, IO<T5>> f,
) => parEvalMapUnordered(maxConcurrent, f.tupled);

parEvalMapUnorderedN() extension

Rill<T6>parEvalMapUnorderedN<T6>(intmaxConcurrent,IO<T6>Function(T1,T2,T3,T4,T5)f,);

Available on Rill<O>, provided by the RillTuple5Ops<T1, T2, T3, T4, T5> extension

Implementation
dart
Rill<T6> parEvalMapUnorderedN<T6>(
  int maxConcurrent,
  Function5<T1, T2, T3, T4, T5, IO<T6>> f,
) => parEvalMapUnordered(maxConcurrent, f.tupled);

parEvalMapUnorderedN() extension

Rill<T3>parEvalMapUnorderedN<T3>(intmaxConcurrent,IO<T3>Function(T1,T2)f,);

Available on Rill<O>, provided by the RillTuple2Ops<T1, T2> extension

Implementation
dart
Rill<T3> parEvalMapUnorderedN<T3>(
  int maxConcurrent,
  Function2<T1, T2, IO<T3>> f,
) => parEvalMapUnordered(maxConcurrent, f.tupled);

parEvalMapUnorderedUnboundedN() extension

Rill<T3>parEvalMapUnorderedUnboundedN<T3>(IO<T3>Function(T1,T2)f,);

Available on Rill<O>, provided by the RillTuple2Ops<T1, T2> extension

Implementation
dart
Rill<T3> parEvalMapUnorderedUnboundedN<T3>(Function2<T1, T2, IO<T3>> f) =>
    parEvalMapUnorderedUnbounded(f.tupled);

parEvalMapUnorderedUnboundedN() extension

Rill<T4>parEvalMapUnorderedUnboundedN<T4>(IO<T4>Function(T1,T2,T3)f,);

Available on Rill<O>, provided by the RillTuple3Ops<T1, T2, T3> extension

Implementation
dart
Rill<T4> parEvalMapUnorderedUnboundedN<T4>(Function3<T1, T2, T3, IO<T4>> f) =>
    parEvalMapUnorderedUnbounded(f.tupled);

parEvalMapUnorderedUnboundedN() extension

Rill<T5>parEvalMapUnorderedUnboundedN<T5>(IO<T5>Function(T1,T2,T3,T4)f,);

Available on Rill<O>, provided by the RillTuple4Ops<T1, T2, T3, T4> extension

Implementation
dart
Rill<T5> parEvalMapUnorderedUnboundedN<T5>(Function4<T1, T2, T3, T4, IO<T5>> f) =>
    parEvalMapUnorderedUnbounded(f.tupled);

parEvalMapUnorderedUnboundedN() extension

Rill<T6>parEvalMapUnorderedUnboundedN<T6>(IO<T6>Function(T1,T2,T3,T4,T5)f,);

Available on Rill<O>, provided by the RillTuple5Ops<T1, T2, T3, T4, T5> extension

Implementation
dart
Rill<T6> parEvalMapUnorderedUnboundedN<T6>(Function5<T1, T2, T3, T4, T5, IO<T6>> f) =>
    parEvalMapUnorderedUnbounded(f.tupled);

parJoin() extension

Rill<O>parJoin(intmaxOpen)

Runs maxOpen inner streams concurrently.

  • Waits for all inner streams to finish.
  • If any stream fails, the error is propagated and all other streams are canceled.
  • If the output stream is interrupted, all running streams are canceled.

Available on Rill<O>, provided by the ParJoinOps<O> extension

Implementation
dart
Rill<O> parJoin(int maxOpen) {
  assert(maxOpen > 0, 'maxOpen must be greater than 0, was: $maxOpen');

  if (maxOpen == 1) {
    return flatten();
  } else {
    final rillF = (
      SignallingRef.of(none<Object>()),
      Semaphore.permits(maxOpen),
      SignallingRef.of(1),
      Channel.unbounded<Unit>(),
      Channel.unbounded<Chunk<O>>(),
    ).mapN((done, available, running, outcomes, output) {
      IO<Unit> stop(Option<Object> rslt) {
        return done.update((current) {
          return current.fold(
            () => rslt,
            (prev) => rslt.fold(
              () => current, &#47;&#47; If we're trying to set None, keep existing result (if any)
              (err) => Some(
                current.fold(
                  () => Some(err),
                  (prevErr) => Some(CompositeFailure.of(prevErr, err)),
                ),
              ),
            ),
          );
        });
      }

      final incrementRunning = running.update((n) => n + 1);
      final decrementRunning = running
          .updateAndGet((n) => n - 1)
          .flatMap((now) => now == 0 ? outcomes.close().voided() : IO.unit);

      IO<Unit> onOutcome(
        Outcome<Unit> oc,
        Either<Object, Unit> cancelResult,
      ) {
        return oc.fold(
          () => cancelResult.fold((err) => stop(Some(err)), (_) => IO.unit),
          (err, st) => CompositeFailure.fromResults(
            Left(err),
            cancelResult,
          ).fold((err) => stop(Some(err)), (_) => IO.unit),
          (fu) => cancelResult.fold(
            (err) => stop(Some(err)),
            (_) => outcomes.send(fu).voided(),
          ),
        );
      }

      IO<Unit> runInner(Rill<O> inner, Scope outerScope) {
        return IO.uncancelable((_) {
          return outerScope.lease().flatMap((lease) {
            return available.acquire().productR(incrementRunning).flatMap((_) {
              return inner
                  .chunks()
                  .evalMap((chunk) => output.send(chunk).voided())
                  .interruptWhenSignaled(done.map((x) => x.nonEmpty))
                  .compile
                  .drain
                  .guaranteeCase((oc) {
                    return lease.cancel
                        .flatMap((cancelResult) => onOutcome(oc, cancelResult))
                        .guarantee(
                          available.release().productR(decrementRunning),
                        );
                  })
                  .voidError()
                  .start()
                  .voided();
            });
          });
        });
      }

      IO<Unit> runOuter() {
        return IO.uncancelable((_) {
          return flatMap(
                (inner) =>
                    Pull.getScope
                        .flatMap((outerScope) => Pull.eval(runInner(inner, outerScope)))
                        .rillNoScope,
              )
              .drain()
              .interruptWhenSignaled(done.map((x) => x.nonEmpty))
              .compile
              .drain
              .guaranteeCase(
                (oc) => onOutcome(oc, Right(Unit())).productR(decrementRunning),
              )
              .voidError();
        });
      }

      IO<Unit> outcomeJoiner() {
        return outcomes.rill.compile.drain.guaranteeCase((oc) {
          return oc.fold(
            () => stop(none()).productR(output.close().voided()),
            (err, st) => stop(Some(err)).productR(output.close().voided()),
            (fu) => stop(none()).productR(output.close().voided()),
          );
        }).voidError();
      }

      IO<Unit> signalResult(IOFiber<Unit> fiber) {
        return done.value().flatMap((opt) {
          return opt.fold(
            () => fiber.join().voided(),
            (err) => err is _Interruption ? fiber.join().voided() : IO.raiseError(err),
          );
        });
      }

      return Rill.bracket(
        runOuter().start().productR(outcomeJoiner().start()),
        (fiber) {
          return done
              .update((c) => c.fold(() => const Some(_Interruption()), (prev) => Some(prev)))
              .productR(
                &#47;&#47; in case of short-circuiting, the `fiberJoiner` would not have had a chance
                &#47;&#47; to wait until all fibers have been joined, so we need to do it manually
                &#47;&#47; by waiting on the counter
                running.waitUntil((n) => n == 0).productR(signalResult(fiber)),
              );
        },
      ).flatMap((_) => output.rill.flatMap((o) => Rill.chunk(o)));
    });

    return Rill.eval(rillF).flatten();
  }
}

parJoinUnbounded() extension

Rill<O>parJoinUnbounded()

Runs all inner streams concurrently with no concurrency limit.

Equivalent to parJoin(Integer.maxValue). Use parJoin when you need to cap the number of simultaneously running streams.

Available on Rill<O>, provided by the ParJoinOps<O> extension

Implementation
dart
Rill<O> parJoinUnbounded() => parJoin(Integer.maxValue);

toDartStream() extension

Stream<O>toDartStream()

Converts this Rill to a single-subscription Dart Stream.

The Rill runs when the stream is listened to and is canceled when the subscription is canceled. Errors from the Rill are forwarded to the stream's error channel.

Available on Rill<O>, provided by the ToDartStreamOps<O> extension

Implementation
dart
Stream<O> toDartStream() {
  late StreamController<O> controller;
  IOFiber<Unit>? activeFiber;

  controller = StreamController(
    onListen: () {
      final program = evalMap(
            (o) => IO.exec(() {
              if (!controller.isClosed) controller.add(o);
            }),
          )
          .handleErrorWith((err) {
            return Rill.eval(
              IO.exec(() {
                if (!controller.isClosed) controller.addError(err);
              }),
            );
          })
          .compile
          .drain
          .guarantee(
            IO.exec(() {
              if (!controller.isClosed) controller.close();
            }),
          );

      final startIO = program.start();

      startIO.unsafeRunAsync((outcome) {
        outcome.fold(
          () {
            if (!controller.isClosed) controller.close();
          },
          (err, stackTrace) {
            if (!controller.isClosed) controller.addError(err, stackTrace);
          },
          (fiber) => activeFiber = fiber,
        );
      });
    },
    onCancel: () {
      if (activeFiber != null) {
        final completer = Completer<void>();
        activeFiber!.cancel().unsafeRunAsync((_) => completer.complete());
        return completer.future;
      }
    },
  );

  return controller.stream;
}

widen() extension

Rill<O2>widen<O2>()

Widens this Rill<Never> to Rill<O2>.

Useful when an API expects Rill<O2> and you have a Rill<Never> that produces no elements (e.g. after Rill.drain or Rill.foreach).

Why this is type-safe: the ideal signature would use a lower type bound:

dart
Rill<O2> widen<O2 super O>()

where O2 super O means "O2 is a supertype of O". Dart does not support lower type bounds, so restricting the receiver to Rill<Never> encodes the same constraint at the type level. Never is the bottom type — Never <: O2 for every O2 — so the cast Pull<Never, Unit> as Pull<O2, Unit> always succeeds at runtime.

See also Rill.append for the type-preserving concatenation variant.

Available on Rill<O>, provided by the RillNeverOps extension

Implementation
dart
Rill<O2> widen<O2>() => (underlying as Pull<O2, Unit>).rillNoScope;

Operators

operator +()

Rill<O>operator +(Rill<O>other)

Concatenates this stream with other. Alias for append.

Implementation
dart
Rill<O> operator +(Rill<O> other) => underlying.flatMap((_) => other.underlying).rillNoScope;

operator ==() inherited

booloperator ==(Objectother)

The equality operator.

The default behavior for all Objects is to return true if and only if this object and other are the same object.

Override this method to specify a different equality relation on a class. The overriding method must still be an equivalence relation. That is, it must be:

  • Total: It must return a boolean for all arguments. It should never throw.

  • Reflexive: For all objects o, o == o must be true.

  • Symmetric: For all objects o1 and o2, o1 == o2 and o2 == o1 must either both be true, or both be false.

  • Transitive: For all objects o1, o2, and o3, if o1 == o2 and o2 == o3 are true, then o1 == o3 must be true.

The method should also be consistent over time, so whether two objects are equal should only change if at least one of the objects was modified.

If a subclass overrides the equality operator, it should override the hashCode method as well to maintain consistency.

Inherited from Object.

Implementation
dart
external bool operator ==(Object other);

Static Properties

never final

finalRill<Never>never

A stream that never emits and never terminates.

Implementation
dart
static final Rill<Never> never = Rill.eval(IO.never());

unit final

finalRill<Unit>unit

A pre-allocated single-element stream emitting Unit.

Implementation
dart
static final Rill<Unit> unit = Pull.outUnit.rillNoScope;

Static Methods

awakeEvery()

Rill<Duration>awakeEvery(Durationperiod, {booldampen=true})

Emits the elapsed time since stream subscription at a fixed period.

When dampen is true (default), missed ticks due to slow consumers are coalesced into a single emission; set to false to emit all missed ticks.

Implementation
dart
static Rill<Duration> awakeEvery(Duration period, {bool dampen = true}) {
  return Rill.eval(IO.now).flatMap((start) {
    return _fixedRate(
      period,
      start,
      dampen,
    ).flatMap((_) => Rill.eval(IO.now.map((now) => now.difference(start))));
  });
}

bracket()

Rill<R>bracket<R>(IO<R>acquire,IO<Unit>Function(R)release)

Emits the acquired resource R and releases it via release when the stream's scope closes.

Implementation
dart
static Rill<R> bracket<R>(IO<R> acquire, Function1<R, IO<Unit>> release) =>
    bracketFull((_) => acquire, (r, _) => release(r));

bracketCase()

Rill<R>bracketCase<R>(IO<R>acquire,IO<Unit>Function(R,ExitCase)release,);

Like bracket but release receives the ExitCase (succeeded, errored, or canceled) so it can react accordingly.

Implementation
dart
static Rill<R> bracketCase<R>(IO<R> acquire, Function2<R, ExitCase, IO<Unit>> release) =>
    bracketFull((_) => acquire, release);

bracketFull()

Rill<R>bracketFull<R>(IO<R>Function(Poll)acquire,IO<Unit>Function(R,ExitCase)release,);

Fully general resource bracket: acquire is cancelable (via Poll) and release receives the ExitCase.

Implementation
dart
static Rill<R> bracketFull<R>(
  Function1<Poll, IO<R>> acquire,
  Function2<R, ExitCase, IO<Unit>> release,
) => Pull.acquireCancelable(acquire, release).flatMap(Pull.output1).rillNoScope;

chunk()

Rill<O>chunk<O>(Chunk<O>values)

Emits a single pre-built Chunk.

Implementation
dart
static Rill<O> chunk<O>(Chunk<O> values) => Pull.output(values).rillNoScope;

constant()

Rill<O>constant<O>(Oo, {intchunkSize=256})

Infinite stream that repeats o forever, emitting chunkSize copies per chunk.

Implementation
dart
static Rill<O> constant<O>(O o, {int chunkSize = 256}) =>
    chunkSize > 0 ? chunk(Chunk.fill(chunkSize, o)).repeat() : Rill.empty();

duration()

Rill<Duration>duration()

Emits the elapsed time since subscription, polling on every element.

Implementation
dart
static Rill<Duration> duration() =>
    Rill.eval(IO.now).flatMap((t0) => Rill.repeatEval(IO.now.map((now) => now.difference(t0))));

emit()

Rill<O2>emit<O>(Ovalue)

Emits a single value and terminates.

Implementation
dart
static Rill<O> emit<O>(O value) => Pull.output1(value).rillNoScope;

emits()

Rill<O>emits<O>(List<O>values)

Emits all elements of values in order and terminates.

Implementation
dart
static Rill<O> emits<O>(List<O> values) => Rill.chunk(Chunk.fromList(values));

empty()

Rill<O2>empty<O>()

An empty stream that terminates immediately without emitting any elements.

Implementation
dart
static Rill<O> empty<O>() => Rill._noScope(Pull.done);

eval()

Rill<O>eval<O>(IO<O>io)

Evaluates io once and emits its result.

Implementation
dart
static Rill<O> eval<O>(IO<O> io) => Pull.eval(io).flatMap(Pull.output1).rillNoScope;

exec()

Rill<O>exec<O>(IO<Unit>io)

Evaluates io for its side effect and emits no elements.

Implementation
dart
static Rill<O> exec<O>(IO<Unit> io) => Rill._noScope(Pull.eval(io).flatMap((_) => Pull.done));

fixedDelay()

Rill<Unit>fixedDelay(Durationperiod)

Emits Unit repeatedly, waiting period after each emission.

Unlike fixedRate, the interval is measured from the end of the previous emission, so each tick fires at least period after the last.

Implementation
dart
static Rill<Unit> fixedDelay(Duration period) => sleep(period).repeat();

fixedRate()

Rill<Unit>fixedRate(Durationperiod, {booldampen=true})

Emits Unit at a fixed rate, attempting to maintain the cadence even if a consumer is slow. When dampen is true, missed ticks are collapsed.

Implementation
dart
static Rill<Unit> fixedRate(Duration period, {bool dampen = true}) =>
    Rill.eval(IO.now).flatMap((t) => _fixedRate(period, t, dampen));

fixedRateStartImmediately()

Rill<Unit>fixedRateStartImmediately(Durationperiod, {booldampen=true})

Like fixedRate but emits immediately on subscription before waiting for the first interval.

Implementation
dart
static Rill<Unit> fixedRateStartImmediately(Duration period, {bool dampen = true}) =>
    Rill.eval(IO.now).flatMap((t) => Rill.unit.append(() => _fixedRate(period, t, dampen)));

force()

Rill<O>force<O>(IO<Rill<O>>io)

Evaluates io to obtain a Rill and then runs it.

Implementation
dart
static Rill<O> force<O>(IO<Rill<O>> io) => Rill.eval(io).flatMap(identity);

fromEither()

Rill<O>fromEither<E extends Object, O>(Either<E,O>either)

Emits the right value of either, or raises the left value as an error.

Implementation
dart
static Rill<O> fromEither<E extends Object, O>(Either<E, O> either) =>
    either.fold((err) => Rill.raiseError(err), (o) => Rill.emit(o));

fromIterator()

Rill<O>fromIterator<O>(Iterator<O>it, {intchunkSize=64})

Drains a Dart Iterator into a stream, batching elements into chunks of chunkSize.

Implementation
dart
static Rill<O> fromIterator<O>(Iterator<O> it, {int chunkSize = 64}) =>
    fromRIterator(RIterator.fromDart(it), chunkSize: chunkSize);

fromOption()

Rill<O>fromOption<E extends Object, O>(Option<O>option)

Emits the value inside option, or produces an empty stream for none.

Implementation
dart
static Rill<O> fromOption<E extends Object, O>(Option<O> option) =>
    option.fold(() => Rill.empty(), (o) => Rill.emit(o));

fromQueueNoneUnterminated()

Rill<O>fromQueueNoneUnterminated<O>(Queue<Option<O>>queue, {int?limit,});

Consumes queue until a none sentinel is dequeued, then terminates.

Implementation
dart
static Rill<O> fromQueueNoneUnterminated<O>(
  Queue<Option<O>> queue, {
  int? limit,
}) => _awaitNoneTerminated(queue, limit ?? Integer.maxValue);

fromQueueNoneUnterminatedChunk()

Rill<O>fromQueueNoneUnterminatedChunk<O>(Queue<Option<Chunk<O>>>queue, {int?limit,});

Like fromQueueNoneUnterminated but reads pre-chunked values.

Implementation
dart
static Rill<O> fromQueueNoneUnterminatedChunk<O>(
  Queue<Option<Chunk<O>>> queue, {
  int? limit,
}) => _fromQueueNoneUnterminatedChunk(queue.take(), queue.tryTake(), limit: limit);

fromQueueUnterminated()

Rill<O>fromQueueUnterminated<O>(Queue<O>queue, {int?limit})

Consumes queue indefinitely, emitting each dequeued value.

The stream never terminates on its own; cancel it externally. The optional limit caps the maximum number of elements batched per chunk.

Implementation
dart
static Rill<O> fromQueueUnterminated<O>(
  Queue<O> queue, {
  int? limit,
}) {
  final lim = limit ?? Integer.maxValue;

  if (lim > 1) {
    &#47;&#47; First, try non-blocking batch dequeue.
    &#47;&#47; Only if the result is an empty list, semantically block to get one element,
    &#47;&#47; then attempt 2nd tryTakeN to get any other elements that are immediately available.

    final osf = queue
        .tryTakeN(Some(lim))
        .flatMap((os) {
          if (os.isEmpty) {
            return (
              queue.take(),
              queue.tryTakeN(Some(lim - 1)),
            ).mapN((elem, elems) => elems.prepended(elem));
          } else {
            return IO.pure(os);
          }
        })
        .map((l) => Chunk.from(l));

    return Rill.eval(osf).flatMap(Rill.chunk).repeat();
  } else {
    return Rill.repeatEval(queue.take());
  }
}

fromQueueUnterminatedChunk()

Rill<O>fromQueueUnterminatedChunk<O>(Queue<Chunk<O>>queue, {int?limit,});

Like fromQueueUnterminated but reads pre-chunked values from queue.

Implementation
dart
static Rill<O> fromQueueUnterminatedChunk<O>(
  Queue<Chunk<O>> queue, {
  int? limit,
}) => _fromQueueNoneUnterminatedChunk(
  queue.take().map((chunk) => Some(chunk)),
  queue.tryTake().map((chunkOpt) => chunkOpt.map((chunk) => Some(chunk))),
  limit: limit,
);

fromRIterator()

Rill<O>fromRIterator<O>(RIterator<O>it, {intchunkSize=64})

Drains an RIterator into a stream, batching elements into chunks of chunkSize.

Implementation
dart
static Rill<O> fromRIterator<O>(RIterator<O> it, {int chunkSize = 64}) {
  assert(chunkSize > 0, 'Rill.fromRIterator: chunkSize must be positive');
  IO<Option<(Chunk<O>, RIterator<O>)>> getNextChunk(RIterator<O> i) {
    return IO.delay(() {
      final bldr = <O>[];
      int cnt = 0;

      while (cnt < chunkSize && i.hasNext) {
        bldr.add(i.next());
        cnt += 1;
      }

      return cnt == 0 ? const None() : Some((Chunk.fromList(bldr), i));
    });
  }

  return Rill.unfoldChunkEval(it, getNextChunk);
}

fromStream()

Rill<O>fromStream<O>(Stream<O>stream, {OverflowStrategystrategy=const_DropOldest(100),});

Bridges a Dart Stream into a Rill.

Events are buffered according to strategy (default: drop oldest, buffer size 100). The subscription is canceled when the Rill's scope closes.

Implementation
dart
static Rill<O> fromStream<O>(
  Stream<O> stream, {
  OverflowStrategy strategy = const _DropOldest(100),
}) {
  final IO<Queue<Either<Object, Option<O>>>> createQueue = switch (strategy) {
    _DropNewest(:final bufferSize) => Queue.dropping(bufferSize),
    _DropOldest(:final bufferSize) => Queue.circularBuffer(bufferSize),
    _Unbounded() => Queue.unbounded(),
  };

  return Rill.eval(createQueue).flatMap((queue) {
    Rill<O> consumeStreamQueue() {
      return Rill.eval(queue.take()).flatMap((event) {
        return event.fold(
          (err) => Rill.raiseError(err),
          (opt) => opt.fold(
            () => Rill.empty(),
            (o) => Rill.emit(o).append(() => consumeStreamQueue()),
          ),
        );
      });
    }

    final acquire = IO.delay(() {
      return stream.listen(
        (data) => queue.offer(Right(Some(data))).unsafeRunAndForget(),
        onError: (Object err) => queue.offer(Left(err)).unsafeRunAndForget(),
        onDone: () => queue.offer(Right(none())).unsafeRunAndForget(),
        cancelOnError: false, &#47;&#47; handle manually
      );
    });

    IO<Unit> release(StreamSubscription<O> sub) => IO.fromFutureF(() => sub.cancel()).voided();

    return Rill.bracket(acquire, release).flatMap((_) => consumeStreamQueue());
  });
}

iterate()

Rill<O>iterate<O>(Ostart,OFunction(O)f, {intchunkSize=64,});

Infinite stream starting at start and applying f to produce each subsequent element, emitting chunkSize elements per chunk.

Implementation
dart
static Rill<O> iterate<O>(
  O start,
  Function1<O, O> f, {
  int chunkSize = 64,
}) {
  if (chunkSize <= 0) throw ArgumentError('Rill.iterate: chunkSize must be positive.');

  Pull<O, Unit> loop(O current) {
    final elements = <O>[];
    O nextVal = current;

    for (var i = 0; i < chunkSize; i++) {
      elements.add(nextVal);
      nextVal = f(nextVal);
    }

    return Pull.output(Chunk.fromList(elements)).append(() => loop(nextVal));
  }

  return loop(start).rillNoScope;
}

iterateEval()

Rill<O>iterateEval<O>(Ostart,IO<O>Function(O)f)

Like iterate but f returns an IO evaluated for each step.

Implementation
dart
static Rill<O> iterateEval<O>(O start, Function1<O, IO<O>> f) =>
    Rill.emit(start).append(() => Rill.eval(f(start)).flatMap((o) => iterateEval(o, f)));

pure()

Rill<O2>pure<O>(Ovalue)

Emits a single value without introducing a new scope. Alias for emit.

Implementation
dart
static Rill<O> pure<O>(O value) => Rill._noScope(Pull.output1(value));

raiseError()

Rill<O>raiseError<O>(Objecterror, [StackTrace?stackTrace])

A stream that immediately fails with error.

Implementation
dart
static Rill<O> raiseError<O>(Object error, [StackTrace? stackTrace]) =>
    Pull.raiseError(error, stackTrace).rillNoScope;

range()

Rill<int>range(intstart,intstopExclusive, {intstep=1,intchunkSize=64,});

Emits integers from start (inclusive) to stopExclusive (exclusive) with a configurable step (may be negative) and chunkSize.

Implementation
dart
static Rill<int> range(
  int start,
  int stopExclusive, {
  int step = 1,
  int chunkSize = 64,
}) {
  assert(chunkSize > 0, 'chunkSize must be positive');
  if (step == 0) {
    return Rill.empty();
  } else {
    Pull<int, Unit> loop(int current) {
      &#47;&#47; Termination check (direction is constant across iterations)
      if (step > 0 && current >= stopExclusive) {
        return Pull.done;
      } else if (step < 0 && current <= stopExclusive) {
        return Pull.done;
      } else {
        &#47;&#47; Compute exact count for this chunk
        final available =
            step > 0
                ? (stopExclusive - current + step - 1) ~&#47; step
                : (current - stopExclusive + (-step) - 1) ~&#47; (-step);

        final count = available < chunkSize ? available : chunkSize;

        return Pull.output(
          Chunk.tabulate(count, (j) => current + j * step),
        ).append(() => loop(current + count * step));
      }
    }

    return loop(start).rillNoScope;
  }
}

repeatEval()

Rill<O>repeatEval<O>(IO<O>fo)

Evaluates fo repeatedly and emits each result, running forever.

Implementation
dart
static Rill<O> repeatEval<O>(IO<O> fo) => Rill.eval(fo).repeat();

resource()

Rill<O>resource<O>(Resource<O>r)

Runs a Resource as a single-element stream.

The resource is acquired when the stream starts and released when the stream's scope closes.

Implementation
dart
static Rill<O> resource<O>(Resource<O> r) {
  return switch (r) {
    Pure(:final value) => Rill.emit(value),
    Eval(:final task) => Rill.eval(task),
    Bind(:final source, :final f) => Rill.resource(source).flatMap((o) => Rill.resource(f(o))),
    Allocate(:final resource) => Rill.bracketFull(
      (poll) => resource(poll),
      (resAndRelease, exitCase) => resAndRelease.$2(exitCase),
    )._mapNoScope((x) => x.$1),
    _ => throw StateError('Unhandled Resource ADT: $r'),
  }.scope;
}

retry()

Rill<O>retry<O>(IO<O>fo,Durationdelay,DurationFunction(Duration)nextDelay,intmaxAttempts, {boolFunction(Object)?retriable,});

Retries fo up to maxAttempts times with exponentially-growing delays.

nextDelay maps the current delay to the next one. An optional retriable predicate determines which errors should trigger a retry (defaults to all). Raises the last error if all attempts are exhausted.

Implementation
dart
static Rill<O> retry<O>(
  IO<O> fo,
  Duration delay,
  Function1<Duration, Duration> nextDelay,
  int maxAttempts, {
  Function1<Object, bool>? retriable,
}) {
  final delays = Rill.unfold(delay, (d) => Some((d, nextDelay(d))));
  final canRetry = retriable ?? (_) => true;

  return Rill.eval(fo)
      .attempts(delays)
      .take(maxAttempts)
      .takeThrough((r) => r.fold((err) => canRetry(err), (_) => false))
      .last
      .unNone
      .rethrowError;
}

sleep()

Rill<Unit>sleep(Durationduration)

Emits Unit after duration has elapsed.

Implementation
dart
static Rill<Unit> sleep(Duration duration) => Rill.eval(IO.sleep(duration));

sleep_()

Rill<O>sleep_<O>(Durationduration)

Waits duration and emits no elements (useful for delaying concatenated streams).

Implementation
dart
static Rill<O> sleep_<O>(Duration duration) => Rill.exec(IO.sleep(duration));

supervise()

Rill<IOFiber<O>>supervise<O>(IO<O>fo)

Starts fo as a background fiber and emits the resulting IOFiber.

The fiber is canceled when the stream's scope closes.

Implementation
dart
static Rill<IOFiber<O>> supervise<O>(IO<O> fo) =>
    Rill.bracket(fo.start(), (fiber) => fiber.cancel());

suspend()

Rill<O>suspend<O>(Rill<O>Function()f)

Defers construction of the stream until it is first run.

Implementation
dart
static Rill<O> suspend<O>(Function0<Rill<O>> f) => Pull.suspend(() => f().underlying).rillNoScope;

unfold()

Rill<O>unfold<S, O>(Ss,Option<Record>Function(S)f, {intchunkSize=64,});

Generates a stream by repeatedly applying f to a state s.

f returns Some((element, nextState)) to continue or none to end the stream. Elements are batched into chunks of chunkSize.

Implementation
dart
static Rill<O> unfold<S, O>(
  S s,
  Function1<S, Option<(O, S)>> f, {
  int chunkSize = 64,
}) {
  if (chunkSize <= 0) throw ArgumentError('Rill.unfold: chunkSize must be positive.');

  Pull<O, Unit> loop(S currentState) {
    final elements = <O>[];
    S nextState = currentState;
    bool isDone = false;

    for (int i = 0; i < chunkSize; i++) {
      final result = f(nextState);

      result.foldN(
        () => isDone = true,
        (elem, newState) {
          elements.add(elem);
          nextState = newState;
        },
      );

      if (isDone) break;
    }

    if (elements.isEmpty) {
      return Pull.done;
    } else if (isDone) {
      return Pull.output(Chunk.fromList(elements));
    } else {
      return Pull.output(Chunk.fromList(elements)).append(() => loop(nextState));
    }
  }

  return loop(s).rillNoScope;
}

unfoldChunk()

Rill<O>unfoldChunk<S, O>(Ss,Option<Record>Function(S)f, {intchunkSize=64,});

Like unfold but f returns a whole Chunk per step rather than a single element.

Implementation
dart
static Rill<O> unfoldChunk<S, O>(
  S s,
  Function1<S, Option<(Chunk<O>, S)>> f, {
  int chunkSize = 64,
}) => unfold(s, f, chunkSize: chunkSize).flatMap(Rill.chunk);

unfoldChunkEval()

Rill<O>unfoldChunkEval<S, O>(Ss,IO<Option<Record>>Function(S)f,);

Like unfoldEval but each step produces a Chunk.

Implementation
dart
static Rill<O> unfoldChunkEval<S, O>(S s, Function1<S, IO<Option<(Chunk<O>, S)>>> f) {
  Pull<O, Unit> loop(S currentState) {
    return Pull.eval(f(currentState)).flatMap((opt) {
      return opt.foldN(
        () => Pull.done,
        (elems, nextState) => Pull.output(elems).append(() => loop(nextState)),
      );
    });
  }

  return loop(s).rillNoScope;
}

unfoldEval()

Rill<O>unfoldEval<S, O>(Ss,IO<Option<Record>>Function(S)f,);

Like unfold but each step evaluates an IO for the next state and element.

Implementation
dart
static Rill<O> unfoldEval<S, O>(S s, Function1<S, IO<Option<(O, S)>>> f) {
  Pull<O, Unit> loop(S currentState) {
    return Pull.eval(f(currentState)).flatMap((opt) {
      return opt.foldN(
        () => Pull.done,
        (elem, nextState) => Pull.output1(elem).append(() => loop(nextState)),
      );
    });
  }

  return loop(s).rillNoScope;
}