Skip to content

Rill<O>

class Rill<O>

Properties

compile no setter

RillCompile<O> get compile
Implementation
dart
RillCompile<O> get compile => RillCompile(underlying);

distinct no setter

Rill<O> get distinct

WARN: For long streams and/or large elements, this can be a memory hog. Use with caution;

Implementation
dart
Rill<O> get distinct {
  return scanChunksOpt(ISet.empty<O>(), (seen) {
    return Some((chunk) {
      return (seen.concat(chunk), chunk);
    });
  });
}

dropLast no setter

Rill<O> get dropLast
Implementation
dart
Rill<O> get dropLast => dropLastIf((_) => true);

hashCode no setter inherited

int get hashCode

The hash code for this object.

A hash code is a single integer which represents the state of the object that affects operator == comparisons.

All objects have hash codes. The default hash code implemented by Object represents only the identity of the object, the same way as the default operator == implementation only considers objects equal if they are identical (see identityHashCode).

If operator == is overridden to use the object state instead, the hash code must also be changed to represent that state, otherwise the object cannot be used in hash based data structures like the default Set and Map implementations.

Hash codes must be the same for objects that are equal to each other according to operator ==. The hash code of an object should only change if the object changes in a way that affects equality. There are no further requirements for the hash codes. They need not be consistent between executions of the same program and there are no distribution guarantees.

Objects that are not equal are allowed to have the same hash code. It is even technically allowed that all instances have the same hash code, but if clashes happen too often, it may reduce the efficiency of hash-based data structures like HashSet or HashMap.

If a subclass overrides hashCode, it should override the operator == operator as well to maintain consistency.

Inherited from Object.

Implementation
dart
external int get hashCode;

head no setter

Rill<O> get head
Implementation
dart
Rill<O> get head => take(1);

last no setter

Rill<Option<O>> get last
Implementation
dart
Rill<Option<O>> get last => pull.last.flatMap(Pull.output1).rillNoScope;

mask no setter

Rill<O> get mask
Implementation
dart
Rill<O> get mask => handleErrorWith((_) => Rill.empty());

pull no setter

ToPull<O> get pull

Access the Pull API for this Rill.

Implementation
dart
ToPull<O> get pull => ToPull(this);

runtimeType no setter inherited

Type get runtimeType

A representation of the runtime type of the object.

Inherited from Object.

Implementation
dart
external Type get runtimeType;

scope no setter

Rill<O> get scope
Implementation
dart
Rill<O> get scope => Pull.scope(underlying).rillNoScope;

tail no setter

Rill<O> get tail
Implementation
dart
Rill<O> get tail => drop(1);

underlying final

final Pull<O, Unit> underlying
Implementation
dart
final Pull<O, Unit> underlying;

Extension Properties

rethrowError extension no setter

Rill<A> get rethrowError

Available on Rill<O>, provided by the RethrowOps<A> extension

Implementation
dart
Rill<A> get rethrowError {
  return chunks().flatMap((c) {
    Option<Object> errOpt = none();
    final size = c.size;
    var i = 0;

    final bldr = <A>[];

    while (i < size && errOpt.isEmpty) {
      c[i].fold(
        (ex) => errOpt = Some(ex),
        (o) {
          bldr.add(o);
          i++;
        },
      );
    }

    final chunk = Chunk.fromList(bldr);

    return Rill.chunk(chunk).append(
      () => errOpt.fold(
        () => Rill.empty(),
        (err) => Rill.raiseError(err),
      ),
    );
  });
}

unchunks extension no setter

Rill<A> get unchunks

Available on Rill<O>, provided by the RillChunkOps<A> extension

Implementation
dart
Rill<A> get unchunks => underlying.flatMapOutput((c) => Pull.output(c)).rillNoScope;

unNone extension no setter

Rill<A> get unNone

Available on Rill<O>, provided by the RillOptionOps<A> extension

Implementation
dart
Rill<A> get unNone => collect(identity);

unNoneTerminate extension no setter

Rill<A> get unNoneTerminate

Available on Rill<O>, provided by the RillOptionOps<A> extension

Implementation
dart
Rill<A> get unNoneTerminate {
  Pull<A, Unit> loop(Pull<Option<A>, Unit> p) {
    return p.uncons.flatMap((hdtl) {
      return hdtl.foldN(
        () => Pull.done,
        (hd, tl) => hd
            .indexWhere((opt) => opt.isEmpty)
            .fold(
              () => Pull.output(hd.map(_getopt)).append(() => loop(tl)),
              (idx) => idx == 0 ? Pull.done : Pull.output(hd.take(idx).map(_getopt)),
            ),
      );
    });
  }

  return loop(underlying).rillNoScope;
}

Methods

andWait()

Rill<O> andWait(Duration duration)
Implementation
dart
Rill<O> andWait(Duration duration) => append(() => Rill.sleep_<O>(duration));

append()

Rill<O> append(Rill<O> Function() s2)
Implementation
dart
Rill<O> append(Function0<Rill<O>> s2) => underlying.append(() => s2().underlying).rillNoScope;

as()

Rill<O2> as<O2>(O2 o2)
Implementation
dart
Rill<O2> as<O2>(O2 o2) => map((_) => o2);

attempt()

Rill<Either<Object, O>> attempt()
Implementation
dart
Rill<Either<Object, O>> attempt() =>
    map((o) => o.asRight<Object>()).handleErrorWith((err) => Rill.emit(Left<Object, O>(err)));

attempts()

Rill<Either<Object, O>> attempts(Rill<Duration> delays)
Implementation
dart
Rill<Either<Object, O>> attempts(Rill<Duration> delays) => attempt().append(
  () => delays.flatMap((delay) => Rill.sleep(delay).flatMap((_) => attempt())),
);

broadcastThrough()

Rill<O2> broadcastThrough<O2>(IList<Rill<O2> Function(Rill<O>)> pipes)
Implementation
dart
Rill<O2> broadcastThrough<O2>(IList<Pipe<O, O2>> pipes) {
  final rillF = (
    Topic.create<Chunk<O>>(),
    CountDownLatch.create(pipes.size),
  ).mapN((topic, allReady) {
    final checkIn = allReady.release().productR(allReady.await());

    Rill<O2> dump(Pipe<O, O2> pipe) {
      return Rill.resource(topic.subscribeAwait(1)).flatMap((sub) {
        return Rill.exec<O2>(checkIn).append(() => pipe(sub.unchunks));
      });
    }

    final dumpAll = Rill.emits(pipes.toList()).map(dump).parJoinUnbounded();

    final pump = Rill.exec<Never>(allReady.await()).append(() => topic.publish(chunks()));

    return dumpAll.concurrently(pump);
  });

  return Rill.force(rillF);
}

buffer()

Rill<O> buffer(int n)
Implementation
dart
Rill<O> buffer(int n) {
  if (n <= 0) {
    return this;
  } else {
    return repeatPull((tp) {
      return tp.unconsN(n, allowFewer: true).flatMap((hdtl) {
        return hdtl.foldN(
          () => Pull.pure(none()),
          (hd, tl) => Pull.output(hd).as(Some(tl)),
        );
      });
    });
  }
}

changes()

Rill<O> changes({(bool Function(O, O))? eq})
Implementation
dart
Rill<O> changes({Function2<O, O, bool>? eq}) => filterWithPrevious(eq ?? (a, b) => a != b);

changesBy()

Rill<O> changesBy<O2>(O2 Function(O) f)
Implementation
dart
Rill<O> changesBy<O2>(Function1<O, O2> f) => filterWithPrevious((a, b) => f(a) != f(b));

changesWith()

Rill<O> changesWith(bool Function(O, O) f)
Implementation
dart
Rill<O> changesWith(Function2<O, O, bool> f) => filterWithPrevious((a, b) => f(a, b));

chunkAll()

Rill<Chunk<O>> chunkAll()
Implementation
dart
Rill<Chunk<O>> chunkAll() {
  Pull<Chunk<O>, Unit> loop(Rill<O> s, Chunk<O> acc) {
    return s.pull.uncons.flatMap((hdtl) {
      return hdtl.foldN(
        () => Pull.output1(acc),
        (hd, tl) => loop(tl, acc.concat(hd)),
      );
    });
  }

  return loop(this, Chunk.empty()).rillNoScope;
}

chunkLimit()

Rill<Chunk<O>> chunkLimit(int n)
Implementation
dart
Rill<Chunk<O>> chunkLimit(int n) {
  Pull<Chunk<O>, Unit> breakup(Chunk<O> ch) {
    if (ch.size <= n) {
      return Pull.output1(ch);
    } else {
      final (pre, rest) = ch.splitAt(n);
      return Pull.output1(pre).append(() => breakup(rest));
    }
  }

  return underlying.unconsFlatMap(breakup).rillNoScope;
}

chunkMin()

Rill<Chunk<O>> chunkMin(int n, {bool allowFewerTotal = true})
Implementation
dart
Rill<Chunk<O>> chunkMin(int n, {bool allowFewerTotal = true}) => repeatPull((p) {
  return p.unconsMin(n, allowFewerTotal: allowFewerTotal).flatMap((hdtl) {
    return hdtl.foldN(
      () => Pull.pure(none()),
      (hd, tl) => Pull.output1(hd).as(Some(tl)),
    );
  });
});

chunkN()

Rill<Chunk<O>> chunkN(int n, {bool allowFewer = true})
Implementation
dart
Rill<Chunk<O>> chunkN(int n, {bool allowFewer = true}) {
  assert(n > 0, 'n must be positive');
  return repeatPull((tp) {
    return tp.unconsN(n, allowFewer: allowFewer).flatMap((hdtl) {
      return hdtl.foldN(
        () => Pull.pure(none()),
        (hd, tl) => Pull.output1(hd).as(Some(tl)),
      );
    });
  });
}

chunks()

Rill<Chunk<O>> chunks()
Implementation
dart
Rill<Chunk<O>> chunks() => underlying.unconsFlatMap(Pull.output1).rillNoScope;

collect()

Rill<O2> collect<O2>(Option<O2> Function(O) f)
Implementation
dart
Rill<O2> collect<O2>(Function1<O, Option<O2>> f) => mapChunks((c) => c.collect(f));

collectFirst()

Rill<O2> collectFirst<O2>(Option<O2> Function(O) f)
Implementation
dart
Rill<O2> collectFirst<O2>(Function1<O, Option<O2>> f) => collect(f).take(1);

collectWhile()

Rill<O2> collectWhile<O2>(Option<O2> Function(O) f)
Implementation
dart
Rill<O2> collectWhile<O2>(Function1<O, Option<O2>> f) =>
    map(f).takeWhile((b) => b.isDefined).unNone;

concurrently()

Rill<O> concurrently<O2>(Rill<O2> that)
Implementation
dart
Rill<O> concurrently<O2>(Rill<O2> that) {
  return _concurrentlyAux(that).flatMapN((startBack, fore) => startBack.flatMap((_) => fore));
}

cons()

Rill<O> cons(Chunk<O> c)
Implementation
dart
Rill<O> cons(Chunk<O> c) => c.isEmpty ? this : Rill.chunk(c).append(() => this);

cons1()

Rill<O> cons1(O o)
Implementation
dart
Rill<O> cons1(O o) => Rill.emit(o).append(() => this);

debounce()

Rill<O> debounce(Duration d)
Implementation
dart
Rill<O> debounce(Duration d) {
  final rillF = Channel.bounded<O>(1).flatMap((chan) {
    return IO.ref(none<O>()).map((ref) {
      final sendLatest = ref.getAndSet(const None()).flatMap((opt) => opt.traverseIO_(chan.send));

      IO<Unit> sendItem(O o) => ref.getAndSet(Some(o)).flatMap((prev) {
        if (prev.isEmpty) {
          return IO.sleep(d).productR(sendLatest).start().voided();
        } else {
          return IO.unit;
        }
      });

      Pull<Never, Unit> go(Pull<O, Unit> pull) {
        return pull.uncons.flatMap((hdtl) {
          return hdtl.foldN(
            () => Pull.eval(sendLatest.productR(chan.close()).voided()),
            (hd, tl) => Pull.eval(sendItem(hd.last)).append(() => go(tl)),
          );
        });
      }

      final debouncedSend = go(underlying).rillNoScope;

      return chan.rill.concurrently(debouncedSend);
    });
  });

  return Rill.force(rillF);
}

delayBy()

Rill<O> delayBy(Duration duration)
Implementation
dart
Rill<O> delayBy(Duration duration) => Rill.sleep_<O>(duration).append(() => this);

delete()

Rill<O> delete(bool Function(O) p)
Implementation
dart
Rill<O> delete(Function1<O, bool> p) =>
    pull
        .takeWhile((o) => !p(o))
        .flatMap((r) => r.fold(() => Pull.done, (s) => s.drop(1).pull.echo))
        .rillNoScope;

drain()

Rill<Never> drain()
Implementation
dart
Rill<Never> drain() => underlying.unconsFlatMap((_) => Pull.done).rillNoScope;

drop()

Rill<O> drop(int n)
Implementation
dart
Rill<O> drop(int n) =>
    pull
        .drop(n)
        .flatMap((opt) => opt.fold(() => Pull.done, (rest) => rest.pull.echo))
        .rillNoScope;

dropLastIf()

Rill<O> dropLastIf(bool Function(O) p)
Implementation
dart
Rill<O> dropLastIf(Function1<O, bool> p) {
  Pull<O, Unit> go(Chunk<O> last, Rill<O> s) {
    return s.pull.uncons.flatMap((hdtl) {
      return hdtl.foldN(
        () {
          final o = last[last.size - 1];

          if (p(o)) {
            final (prefix, _) = last.splitAt(last.size - 1);
            return Pull.output(prefix);
          } else {
            return Pull.output(last);
          }
        },
        (hd, tl) => Pull.output(last).append(() => go(hd, tl)),
      );
    });
  }

  return pull.uncons.flatMap((hdtl) {
    return hdtl.foldN(
      () => Pull.done,
      (hd, tl) => go(hd, tl),
    );
  }).rillNoScope;
}

dropRight()

Rill<O> dropRight(int n)
Implementation
dart
Rill<O> dropRight(int n) {
  if (n <= 0) {
    return this;
  } else {
    Pull<O, Unit> go(Chunk<O> acc, Rill<O> s) {
      return s.pull.uncons.flatMap((hdtl) {
        return hdtl.foldN(
          () => Pull.done,
          (hd, tl) {
            final all = acc.concat(hd);
            return Pull.output(all.dropRight(n)).append(() => go(all.takeRight(n), tl));
          },
        );
      });
    }

    return go(Chunk.empty(), this).rillNoScope;
  }
}

dropThrough()

Rill<O> dropThrough(bool Function(O) p)
Implementation
dart
Rill<O> dropThrough(Function1<O, bool> p) =>
    pull
        .dropThrough(p)
        .flatMap((tl) => tl.map((tl) => tl.pull.echo).getOrElse(() => Pull.done))
        .rillNoScope;

dropWhile()

Rill<O> dropWhile(bool Function(O) p)
Implementation
dart
Rill<O> dropWhile(Function1<O, bool> p) =>
    pull
        .dropWhile(p)
        .flatMap((tl) => tl.map((tl) => tl.pull.echo).getOrElse(() => Pull.done))
        .rillNoScope;

either()

Rill<Either<O, O2>> either<O2>(Rill<O2> that)
Implementation
dart
Rill<Either<O, O2>> either<O2>(Rill<O2> that) =>
    map((o) => o.asLeft<O2>()).merge(that.map((o2) => o2.asRight<O>()));

evalFilter()

Rill<O> evalFilter(IO<bool> Function(O) p)
Implementation
dart
Rill<O> evalFilter(Function1<O, IO<bool>> p) =>
    underlying
        .flatMapOutput(
          (o) => Pull.eval(p(o)).flatMap((pass) => pass ? Pull.output1(o) : Pull.done),
        )
        .rillNoScope;

evalFilterNot()

Rill<O> evalFilterNot(IO<bool> Function(O) p)
Implementation
dart
Rill<O> evalFilterNot(Function1<O, IO<bool>> p) =>
    flatMap((o) => Rill.eval(p(o)).ifM(() => Rill.empty(), () => Rill.emit(o)));

evalFold()

Rill<O2> evalFold<O2>(O2 z, IO<O2> Function(O2, O) f)
Implementation
dart
Rill<O2> evalFold<O2>(O2 z, Function2<O2, O, IO<O2>> f) {
  Pull<O2, Unit> go(O2 z, Rill<O> r) {
    return r.pull.uncons1.flatMap((hdtl) {
      return hdtl.foldN(
        () => Pull.output1(z),
        (hd, tl) => Pull.eval(f(z, hd)).flatMap((ns) => go(ns, tl)),
      );
    });
  }

  return go(z, this).rillNoScope;
}

evalMap()

Rill<O2> evalMap<O2>(IO<O2> Function(O) f)
Implementation
dart
Rill<O2> evalMap<O2>(Function1<O, IO<O2>> f) =>
    underlying.flatMapOutput((o) => Pull.eval(f(o)).flatMap(Pull.output1)).rillNoScope;

evalMapFilter()

Rill<O> evalMapFilter(IO<Option<O>> Function(O) f)
Implementation
dart
Rill<O> evalMapFilter(Function1<O, IO<Option<O>>> f) =>
    underlying
        .flatMapOutput((o) => Pull.eval(f(o)).flatMap((opt) => Pull.outputOption1(opt)))
        .rillNoScope;

evalScan()

Rill<O2> evalScan<O2>(O2 z, IO<O2> Function(O2, O) f)
Implementation
dart
Rill<O2> evalScan<O2>(O2 z, Function2<O2, O, IO<O2>> f) {
  Pull<O2, Unit> go(O2 z, Rill<O> s) {
    return s.pull.uncons1.flatMap((hdtl) {
      return hdtl.foldN(
        () => Pull.done,
        (hd, tl) => Pull.eval(f(z, hd)).flatMap((o) => Pull.output1(o).append(() => go(o, tl))),
      );
    });
  }

  return Pull.output1(z).append(() => go(z, this)).rillNoScope;
}

evalTap()

Rill<O> evalTap<O2>(IO<O2> Function(O) f)
Implementation
dart
Rill<O> evalTap<O2>(Function1<O, IO<O2>> f) =>
    underlying.flatMapOutput((o) => Pull.eval(f(o)).flatMap((_) => Pull.output1(o))).rillNoScope;

exists()

Rill<bool> exists(bool Function(O) p)
Implementation
dart
Rill<bool> exists(Function1<O, bool> p) =>
    pull.forall((o) => !p(o)).flatMap((r) => Pull.output1(!r)).rillNoScope;

filter()

Rill<O> filter(bool Function(O) p)
Implementation
dart
Rill<O> filter(Function1<O, bool> p) => mapChunks((c) => c.filter(p));

filterNot()

Rill<O> filterNot(bool Function(O) p)
Implementation
dart
Rill<O> filterNot(Function1<O, bool> p) => mapChunks((c) => c.filterNot(p));

filterWithPrevious()

Rill<O> filterWithPrevious(bool Function(O, O) p)
Implementation
dart
Rill<O> filterWithPrevious(Function2<O, O, bool> p) {
  Pull<O, Unit> go(O last, Rill<O> s) {
    return s.pull.uncons.flatMap((hdtl) {
      return hdtl.foldN(
        () => Pull.done,
        (hd, tl) {
          &#47;&#47; can it be emitted unmodified?
          final (allPass, newLast) = hd.foldLeft((
            true,
            last,
          ), (acc, a) => (acc.$1 && p(acc.$2, a), a));

          if (allPass) {
            return Pull.output(hd).append(() => go(newLast, tl));
          } else {
            final (acc, newLast) = hd.foldLeft((IVector.empty<O>(), last), (acc, a) {
              if (p(acc.$2, a)) {
                return (acc.$1.appended(a), a);
              } else {
                return (acc.$1, acc.$2);
              }
            });

            return Pull.output(Chunk.from(acc)).append(() => go(newLast, tl));
          }
        },
      );
    });
  }

  return pull.uncons1.flatMap((hdtl) {
    return hdtl.foldN(
      () => Pull.done,
      (hd, tl) => Pull.output1(hd).append(() => go(hd, tl)),
    );
  }).rillNoScope;
}

flatMap()

Rill<O2> flatMap<O2>(Rill<O2> Function(O) f)
Implementation
dart
Rill<O2> flatMap<O2>(Function1<O, Rill<O2>> f) =>
    underlying.flatMapOutput((o) => f(o).underlying).rillNoScope;

fold()

Rill<O2> fold<O2>(O2 z, O2 Function(O2, O) f)
Implementation
dart
Rill<O2> fold<O2>(O2 z, Function2<O2, O, O2> f) =>
    pull.fold(z, f).flatMap(Pull.output1).rillNoScope;

fold1()

Rill<O> fold1(O Function(O, O) f)
Implementation
dart
Rill<O> fold1(Function2<O, O, O> f) =>
    pull.fold1(f).flatMap((opt) => opt.map(Pull.output1).getOrElse(() => Pull.done)).rillNoScope;

forall()

Rill<bool> forall(bool Function(O) p)
Implementation
dart
Rill<bool> forall(Function1<O, bool> p) =>
    pull.forall(p).flatMap((res) => Pull.output1(res)).rillNoScope;

foreach()

Rill<Never> foreach(IO<Unit> Function(O) f)
Implementation
dart
Rill<Never> foreach(Function1<O, IO<Unit>> f) =>
    underlying.flatMapOutput((o) => Pull.eval(f(o))).rillNoScope;

groupAdjacentBy()

Rill<Record> groupAdjacentBy<O2>(O2 Function(O) f)
Implementation
dart
Rill<(O2, Chunk<O>)> groupAdjacentBy<O2>(Function1<O, O2> f) =>
    groupAdjacentByLimit(Integer.maxValue, f);

groupAdjacentByLimit()

Rill<Record> groupAdjacentByLimit<O2>(int limit, O2 Function(O) f)
Implementation
dart
Rill<(O2, Chunk<O>)> groupAdjacentByLimit<O2>(int limit, Function1<O, O2> f) {
  Pull<(O2, Chunk<O>), Unit> go(Option<(O2, Chunk<O>)> current, Rill<O> s) {
    Pull<(O2, Chunk<O>), Unit> doChunk(
      Chunk<O> chunk,
      Rill<O> s,
      O2 k1,
      Chunk<O> out,
      IQueue<(O2, Chunk<O>)> acc,
    ) {
      final differsAt = chunk.indexWhere((v) => f(v) != k1).getOrElse(() => -1);

      if (differsAt == -1) {
        &#47;&#47; whole chunk matches the current key, add this chunk to the accumulated output
        if (out.size + chunk.size < limit) {
          final newCurrent = Some((k1, out.concat(chunk)));
          return Pull.output(Chunk.from(acc)).append(() => go(newCurrent, s));
        } else {
          final (prefix, suffix) = chunk.splitAt(limit - out.size);
          return Pull.output(
            Chunk.from(acc.appended((k1, out.concat(prefix)))),
          ).append(() => go(Some((k1, suffix)), s));
        }
      } else {
        &#47;&#47; at least part of this chunk does not match the current key, need to group and retain chunkiness
        &#47;&#47; split the chunk into the bit where the keys match and the bit where they don't
        final matching = chunk.take(differsAt);

        late IQueue<(O2, Chunk<O>)> newAcc;

        final newOutSize = out.size + matching.size;

        if (newOutSize == 0) {
          newAcc = acc;
        } else if (newOutSize > limit) {
          final (prefix, suffix) = matching.splitAt(limit - out.size);
          newAcc = acc.appended((k1, out.concat(prefix))).appended((k1, suffix));
        } else {
          newAcc = acc.appended((k1, out.concat(matching)));
        }

        final nonMatching = chunk.drop(differsAt);

        &#47;&#47; nonMatching is guaranteed to be non-empty here, because we know the last element of the chunk doesn't have
        &#47;&#47; the same key as the first
        final k2 = f(nonMatching[0]);

        return doChunk(nonMatching, s, k2, Chunk.empty(), newAcc);
      }
    }

    return s.pull.unconsLimit(limit).flatMap((hdtl) {
      return hdtl.foldN(
        () => current
            .mapN(
              (k1, out) => out.size == 0 ? Pull.done : Pull.output1((k1, out)),
            )
            .getOrElse(() => Pull.done),
        (hd, tl) {
          final (k1, out) = current.getOrElse(() => (f(hd[0]), Chunk.empty()));
          return doChunk(hd, tl, k1, out, IQueue.empty());
        },
      );
    });
  }

  return go(none(), this).rillNoScope;
}

groupWithin()

Rill<Chunk<O>> groupWithin(int chunkSize, Duration timeout)
Implementation
dart
Rill<Chunk<O>> groupWithin(int chunkSize, Duration timeout) {
  if (chunkSize <= 0) throw ArgumentError('Rill.groupWithin: chunkSize must be > 0');

  return Rill.eval(Queue.unbounded<Option<Chunk<O>>>()).flatMap((queue) {
    &#47;&#47; Producer: runs in background pushing chunks to the queue
    final producer = chunks()
        .map((c) => Some(c))
        .evalMap(queue.offer)
        .compile
        .drain
        .guarantee(queue.offer(const None()));

    &#47;&#47; Consumer: buffers data and races agaist timeout
    &#47;&#47;
    &#47;&#47; [deadline]: Monotonic time (micros) the current buffer MUST be emitted or null for empty buffer
    Pull<Chunk<O>, Unit> consumeLoop(Chunk<O> buffer, int? deadline) {
      if (buffer.size >= chunkSize) {
        final (toEmit, remainder) = buffer.splitAt(chunkSize);

        if (remainder.isEmpty) {
          return Pull.output1(toEmit).append(() => consumeLoop(Chunk.empty(), null));
        } else {
          &#47;&#47; set new deadline
          return Pull.eval(IO.now).flatMap((now) {
            return Pull.output1(toEmit).append(() {
              return consumeLoop(remainder, now.microsecondsSinceEpoch + timeout.inMicroseconds);
            });
          });
        }
      } else {
        &#47;&#47; Buffer isn't full so wait for data or timeout
        if (buffer.isEmpty) {
          &#47;&#47; no buffered data, don't care about time. wait for next chunk
          return Pull.eval(queue.take()).flatMap((opt) {
            return opt.fold(
              () => Pull.done,
              (chunk) {
                &#47;&#47; data has arrived, start the clock
                return Pull.eval(IO.now).flatMap((now) {
                  return consumeLoop(chunk, now.microsecondsSinceEpoch + timeout.inMicroseconds);
                });
              },
            );
          });
        } else {
          &#47;&#47; buffer has data, race queue againt the clock
          return Pull.eval(IO.now).flatMap((now) {
            final remainingMicros = deadline! - now.microsecondsSinceEpoch;

            if (remainingMicros <= 0) {
              &#47;&#47; timeout reached, emit whatever is buffered
              return Pull.output1(buffer).append(() => consumeLoop(Chunk.empty(), null));
            } else {
              final waitOp = IO.sleep(Duration(microseconds: remainingMicros));
              final takeOp = queue.take();

              return Pull.eval(IO.race(takeOp, waitOp)).flatMap((raceResult) {
                return raceResult.fold(
                  (takeResult) {
                    return takeResult.fold(
                      () => Pull.output1(buffer), &#47;&#47; producer finished
                      (newChunk) => consumeLoop(buffer.concat(newChunk), deadline),
                    );
                  },
                  (_) => Pull.output1(buffer).append(() => consumeLoop(Chunk.empty(), null)),
                );
              });
            }
          });
        }
      }
    }

    return Rill.bracket(
      producer.start(),
      (fiber) => fiber.cancel(),
    ).flatMap((_) => consumeLoop(Chunk.empty(), null).rill);
  });
}

handleErrorWith()

Rill<O> handleErrorWith(Rill<O> Function(Object) f)
Implementation
dart
Rill<O> handleErrorWith(Function1<Object, Rill<O>> f) =>
    underlying.handleErrorWith((err) => f(err).underlying).rillNoScope;

holdResource()

Resource<Signal<O>> holdResource(O initial)
Implementation
dart
Resource<Signal<O>> holdResource(O initial) => Resource.eval(
  SignallingRef.of(initial),
).flatTap((sig) => foreach((n) => sig.setValue(n)).compile.drain.background());

ifEmpty()

Rill<O> ifEmpty(Rill<O> Function() fallback)
Implementation
dart
Rill<O> ifEmpty(Function0<Rill<O>> fallback) =>
    pull.uncons.flatMap((hdtl) {
      return hdtl.foldN(
        () => fallback().underlying,
        (hd, tl) => Pull.output(hd).append(() => tl.underlying),
      );
    }).rillNoScope;

ifEmptyEmit()

Rill<O> ifEmptyEmit(O Function() o)
Implementation
dart
Rill<O> ifEmptyEmit(Function0<O> o) => ifEmpty(() => Rill.emit(o()));

interleave()

Rill<O> interleave(Rill<O> that)
Implementation
dart
Rill<O> interleave(Rill<O> that) => zip(that).flatMap((t) => Rill.emits([t.$1, t.$2]));

interleaveAll()

Rill<O> interleaveAll(Rill<O> that)
Implementation
dart
Rill<O> interleaveAll(Rill<O> that) => map(
  (o) => Option(o),
).zipAll<Option<O>>(that.map((o) => Option(o)), none(), none()).flatMap((t) {
  final (thisOpt, thatOpt) = t;
  return Rill.chunk(Chunk.from(thisOpt)).append(() => Rill.chunk(Chunk.from(thatOpt)));
});

interruptAfter()

Rill<O> interruptAfter(Duration duration)
Implementation
dart
Rill<O> interruptAfter(Duration duration) => interruptWhen(IO.sleep(duration));

interruptWhen()

Rill<O> interruptWhen<B>(IO<B> signal)
Implementation
dart
Rill<O> interruptWhen<B>(IO<B> signal) {
  return Rill.eval(IO.deferred<Unit>()).flatMap((stopEvent) {
    final startSignalFiber = signal.attempt().flatMap((_) => stopEvent.complete(Unit())).start();

    return Rill.eval(startSignalFiber).flatMap((fiber) {
      final haltWhen = stopEvent.value().as(Unit().asRight<Object>());
      return Pull.interruptWhen(underlying, haltWhen).rillNoScope.onFinalize(fiber.cancel());
    });
  });
}

interruptWhenSignaled()

Rill<O> interruptWhenSignaled(Signal<bool> signal)
Implementation
dart
Rill<O> interruptWhenSignaled(Signal<bool> signal) => interruptWhenTrue(signal.discrete);

interruptWhenTrue()

Rill<O> interruptWhenTrue(Rill<bool> haltWhenTrue)
Implementation
dart
Rill<O> interruptWhenTrue(Rill<bool> haltWhenTrue) {
  final rillF = (
    IO.deferred<Unit>(),
    IO.deferred<Unit>(),
    IO.deferred<Either<Object, Unit>>(),
  ).mapN((
    interruptL,
    interruptR,
    backResult,
  ) {
    final watch =
        haltWhenTrue.exists((x) => x).interruptWhen(interruptR.value().attempt()).compile.drain;

    final wakeWatch =
        watch.guaranteeCase((oc) {
          final Either<Object, Unit> r = oc.fold(
            () => Unit().asRight(),
            (err, _) => err.asLeft(),
            (_) => Unit().asRight(),
          );

          return backResult.complete(r).productR(interruptL.complete(Unit()).voided());
        }).voidError();

    final stopWatch = interruptR
        .complete(Unit())
        .productR(backResult.value().flatMap(IO.fromEither));

    final backWatch = Rill.bracket(wakeWatch.start(), (_) => stopWatch);

    return backWatch.flatMap((_) => interruptWhen(interruptL.value().attempt()));
  });

  return Rill.force(rillF);
}

intersperse()

Rill<O> intersperse(O separator)
Implementation
dart
Rill<O> intersperse(O separator) {
  Chunk<O> doChunk(Chunk<O> hd, bool isFirst) {
    final bldr = <O>[];

    final iter = hd.iterator;

    if (isFirst) bldr.add(iter.next());

    iter.foreach((o) {
      bldr.add(separator);
      bldr.add(o);
    });

    return Chunk.fromList(bldr);
  }

  Pull<O, Unit> go(Rill<O> str) {
    return str.pull.uncons.flatMap((hdtl) {
      return hdtl.foldN(
        () => Pull.done,
        (hd, tl) => Pull.output(doChunk(hd, false)).append(() => go(tl)),
      );
    });
  }

  return pull.uncons.flatMap((hdtl) {
    return hdtl.foldN(
      () => Pull.done,
      (hd, tl) => Pull.output(doChunk(hd, true)).append(() => go(tl)),
    );
  }).rillNoScope;
}

keepAlive()

Rill<O> keepAlive(Duration maxIdle, IO<O> heartbeat)
Implementation
dart
Rill<O> keepAlive(Duration maxIdle, IO<O> heartbeat) {
  return Rill.eval(Queue.unbounded<Option<Chunk<O>>>()).flatMap((queue) {
    final producer = chunks()
        .map((c) => Some(c))
        .evalMap(queue.offer)
        .compile
        .drain
        .guarantee(queue.offer(none()));

    Pull<O, Unit> consumeLoop() {
      final takeOp = queue.take();
      final timerOp = IO.sleep(maxIdle);

      return Pull.eval(IO.race(takeOp, timerOp)).flatMap((raceResult) {
        return raceResult.fold(
          (queueResult) => queueResult.fold(
            () => Pull.done,
            (chunk) => Pull.output(chunk).append(consumeLoop),
          ),
          (_) => Pull.eval(heartbeat).flatMap((o) => Pull.output1(o)).append(consumeLoop),
        );
      });
    }

    return Rill.bracket(
      producer.start(),
      (fiber) => fiber.cancel(),
    ).flatMap((_) => consumeLoop().rillNoScope);
  });
}

lastOr()

Rill<O> lastOr(O Function() fallback)
Implementation
dart
Rill<O> lastOr(Function0<O> fallback) =>
    pull.last
        .flatMap((o) => o.fold(() => Pull.output1(fallback()), (o) => Pull.output1(o)))
        .rillNoScope;

map()

Rill<O2> map<O2>(O2 Function(O) f)
Implementation
dart
Rill<O2> map<O2>(Function1<O, O2> f) =>
    pull.echo.unconsFlatMap((hd) => Pull.output(hd.map(f))).rillNoScope;

mapAccumulate()

Rill<Record> mapAccumulate<S, O2>(S initial, Record Function(S, O) f)
Implementation
dart
Rill<(S, O2)> mapAccumulate<S, O2>(S initial, Function2<S, O, (S, O2)> f) {
  (S, (S, O2)) go(S s, O a) {
    final (newS, newO) = f(s, a);
    return (newS, (newS, newO));
  }

  return scanChunks(initial, (acc, c) => c.mapAccumulate(acc, go));
}

mapAsync()

Rill<O2> mapAsync<O2>(int maxConcurrent, IO<O2> Function(O) f)

Alias for parEvalMap.

Implementation
dart
Rill<O2> mapAsync<O2>(int maxConcurrent, Function1<O, IO<O2>> f) => parEvalMap(maxConcurrent, f);

mapAsyncUnordered()

Rill<O2> mapAsyncUnordered<O2>(int maxConcurrent, IO<O2> Function(O) f)

Alias for parEvalMapUnordered.

Implementation
dart
Rill<O2> mapAsyncUnordered<O2>(int maxConcurrent, Function1<O, IO<O2>> f) =>
    parEvalMapUnordered(maxConcurrent, f);

mapChunks()

Rill<O2> mapChunks<O2>(Chunk<O2> Function(Chunk<O>) f)
Implementation
dart
Rill<O2> mapChunks<O2>(Function1<Chunk<O>, Chunk<O2>> f) =>
    underlying.unconsFlatMap((hd) => Pull.output(f(hd))).rillNoScope;

merge()

Rill<O> merge(Rill<O> that)
Implementation
dart
Rill<O> merge(Rill<O> that) => _merge(that, (s, fin) => Rill.exec<O>(fin).append(() => s));

mergeAndAwaitDownstream()

Rill<O> mergeAndAwaitDownstream(Rill<O> that)
Implementation
dart
Rill<O> mergeAndAwaitDownstream(Rill<O> that) => _merge(that, (s, fin) => s.onFinalize(fin));

mergeHaltBoth()

Rill<O> mergeHaltBoth(Rill<O> that)
Implementation
dart
Rill<O> mergeHaltBoth(Rill<O> that) =>
    noneTerminate().merge(that.noneTerminate()).unNoneTerminate;

mergeHaltL()

Rill<O> mergeHaltL(Rill<O> that)
Implementation
dart
Rill<O> mergeHaltL(Rill<O> that) =>
    noneTerminate().merge(that.map((o) => Option(o))).unNoneTerminate;

mergeHaltR()

Rill<O> mergeHaltR(Rill<O> that)
Implementation
dart
Rill<O> mergeHaltR(Rill<O> that) => that.mergeHaltL(this);

metered()

Rill<O> metered(Duration rate)
Implementation
dart
Rill<O> metered(Duration rate) => Rill.fixedRate(rate).zipRight(this);

meteredStartImmediately()

Rill<O> meteredStartImmediately(Duration rate)
Implementation
dart
Rill<O> meteredStartImmediately(Duration rate) =>
    Rill.fixedRateStartImmediately(rate).zipRight(this);

noneTerminate()

Rill<Option<O>> noneTerminate()
Implementation
dart
Rill<Option<O>> noneTerminate() => map((o) => Option(o)).append(() => Rill.emit(none()));

noSuchMethod() inherited

dynamic noSuchMethod(Invocation invocation)

Invoked when a nonexistent method or property is accessed.

A dynamic member invocation can attempt to call a member which doesn't exist on the receiving object. Example:

dart
dynamic object = 1;
object.add(42); // Statically allowed, run-time error

This invalid code will invoke the noSuchMethod method of the integer 1 with an Invocation representing the .add(42) call and arguments (which then throws).

Classes can override noSuchMethod to provide custom behavior for such invalid dynamic invocations.

A class with a non-default noSuchMethod invocation can also omit implementations for members of its interface. Example:

dart
class MockList<T> implements List<T> {
  noSuchMethod(Invocation invocation) {
    log(invocation);
    super.noSuchMethod(invocation); // Will throw.
  }
}
void main() {
  MockList().add(42);
}

This code has no compile-time warnings or errors even though the MockList class has no concrete implementation of any of the List interface methods. Calls to List methods are forwarded to noSuchMethod, so this code will log an invocation similar to Invocation.method(#add, [42]) and then throw.

If a value is returned from noSuchMethod, it becomes the result of the original invocation. If the value is not of a type that can be returned by the original invocation, a type error occurs at the invocation.

The default behavior is to throw a NoSuchMethodError.

Inherited from Object.

Implementation
dart
@pragma("vm:entry-point")
@pragma("wasm:entry-point")
external dynamic noSuchMethod(Invocation invocation);

onComplete()

Rill<O> onComplete(Rill<O> Function() s2)
Implementation
dart
Rill<O> onComplete(Function0<Rill<O>> s2) =>
    handleErrorWith((e) => s2().append(() => Pull.fail(e).rillNoScope)).append(() => s2());

onFinalize()

Rill<O> onFinalize(IO<Unit> finalizer)
Implementation
dart
Rill<O> onFinalize(IO<Unit> finalizer) => onFinalizeCase((_) => finalizer);

onFinalizeCase()

Rill<O> onFinalizeCase(IO<Unit> Function(ExitCase) finalizer)
Implementation
dart
Rill<O> onFinalizeCase(Function1<ExitCase, IO<Unit>> finalizer) =>
    Rill.bracketCase(IO.unit, (_, ec) => finalizer(ec)).flatMap((_) => this);

parEvalMap()

Rill<O2> parEvalMap<O2>(int maxConcurrent, IO<O2> Function(O) f)
Implementation
dart
Rill<O2> parEvalMap<O2>(int maxConcurrent, Function1<O, IO<O2>> f) {
  if (maxConcurrent == 1) {
    return evalMap(f);
  } else {
    assert(maxConcurrent > 0, 'maxConcurrent must be > 0, was: $maxConcurrent');

    &#47;&#47; One is taken by inner stream read.
    final concurrency = maxConcurrent == Integer.maxValue ? Integer.maxValue : maxConcurrent + 1;
    final channelF = Channel.bounded<IO<Either<Object, O2>>>(concurrency);

    return _parEvalMapImpl(concurrency, channelF, true, f);
  }
}

parEvalMapUnbounded()

Rill<O2> parEvalMapUnbounded<O2>(IO<O2> Function(O) f)
Implementation
dart
Rill<O2> parEvalMapUnbounded<O2>(Function1<O, IO<O2>> f) =>
    _parEvalMapImpl(Integer.maxValue, Channel.unbounded(), true, f);

parEvalMapUnordered()

Rill<O2> parEvalMapUnordered<O2>(int maxConcurrent, IO<O2> Function(O) f)
Implementation
dart
Rill<O2> parEvalMapUnordered<O2>(int maxConcurrent, Function1<O, IO<O2>> f) {
  if (maxConcurrent == 1) {
    return evalMap(f);
  } else {
    assert(maxConcurrent > 0, 'maxConcurrent must be > 0, was: $maxConcurrent');

    &#47;&#47; One is taken by inner stream read.
    final concurrency = maxConcurrent == Integer.maxValue ? Integer.maxValue : maxConcurrent + 1;
    final channelF = Channel.bounded<IO<Either<Object, O2>>>(concurrency);

    return _parEvalMapImpl(concurrency, channelF, false, f);
  }
}

parEvalMapUnorderedUnbounded()

Rill<O2> parEvalMapUnorderedUnbounded<O2>(IO<O2> Function(O) f)
Implementation
dart
Rill<O2> parEvalMapUnorderedUnbounded<O2>(Function1<O, IO<O2>> f) =>
    _parEvalMapImpl(Integer.maxValue, Channel.unbounded(), false, f);

pauseWhen()

Rill<O> pauseWhen(Rill<bool> pauseWhenTrue)
Implementation
dart
Rill<O> pauseWhen(Rill<bool> pauseWhenTrue) {
  return Rill.eval(SignallingRef.of(false)).flatMap((pauseSignal) {
    return pauseWhenSignal(
      pauseSignal,
    ).mergeHaltBoth(pauseWhenTrue.foreach(pauseSignal.setValue));
  });
}

pauseWhenSignal()

Rill<O> pauseWhenSignal(Signal<bool> pauseWhneTrue)
Implementation
dart
Rill<O> pauseWhenSignal(Signal<bool> pauseWhneTrue) {
  final waitToResume = pauseWhneTrue.waitUntil((x) => !x);
  final pauseIfNeeded = Rill.exec<O>(
    pauseWhneTrue.value().flatMap((paused) => waitToResume.whenA(paused)),
  );

  return pauseIfNeeded.append(
    () => chunks().flatMap((chunk) {
      return Rill.chunk(chunk).append(() => pauseIfNeeded);
    }),
  );
}

rechunkRandomly()

Rill<O> rechunkRandomly({
  double minFactor = 0.1,
  double maxFactor = 2.0,
  int? seed,
})
Implementation
dart
Rill<O> rechunkRandomly({double minFactor = 0.1, double maxFactor = 2.0, int? seed}) {
  if (minFactor <= 0 || maxFactor < minFactor) {
    throw ArgumentError('Invalid rechunk factors. Ensure 0 < minFactor <= maxFactor');
  }

  return Rill.suspend(() {
    final rng = math.Random(seed);

    Pull<O, Unit> loop(Chunk<O> acc, Rill<O> s) {
      return s.pull.uncons.flatMap((hdtl) {
        return hdtl.foldN(
          () => acc.isEmpty ? Pull.done : Pull.output(acc),
          (hd, tl) {
            final newSize =
                math
                    .max(
                      1.0,
                      (acc.size + hd.size) *
                          (minFactor + (maxFactor - minFactor) * rng.nextDouble()),
                    )
                    .toInt();
            final newAcc = acc.concat(hd);

            if (newAcc.size < newSize) {
              return loop(newAcc, tl);
            } else {
              final (toEmit, rest) = newAcc.splitAt(newSize);
              return Pull.output(toEmit).append(() => loop(rest, tl));
            }
          },
        );
      });
    }

    return loop(Chunk.empty<O>(), this).rillNoScope;
  });
}

reduce()

Rill<O> reduce(O Function(O, O) f)
Implementation
dart
Rill<O> reduce(Function2<O, O, O> f) => fold1(f);

repeat()

Rill<O> repeat()
Implementation
dart
Rill<O> repeat() => append(repeat);

repeatN()

Rill<O> repeatN(int n)
Implementation
dart
Rill<O> repeatN(int n) => n > 0 ? append(() => repeatN(n - 1)) : Rill.empty();

repeatPull()

Rill<O2> repeatPull<O2>(Pull<O2, Option<Rill<O>>> Function(ToPull<O>) f)
Implementation
dart
Rill<O2> repeatPull<O2>(Function1<ToPull<O>, Pull<O2, Option<Rill<O>>>> f) {
  Pull<O2, Unit> go(ToPull<O> tp) {
    return f(tp).flatMap((tail) {
      return tail.fold(() => Pull.done, (tail) => go(tail.pull));
    });
  }

  return go(pull).rillNoScope;
}

scan()

Rill<O2> scan<O2>(O2 z, O2 Function(O2, O) f)
Implementation
dart
Rill<O2> scan<O2>(O2 z, Function2<O2, O, O2> f) =>
    Pull.output1(z).append(() => _scan(z, f)).rillNoScope;

scan1()

Rill<O> scan1(O Function(O, O) f)
Implementation
dart
Rill<O> scan1(Function2<O, O, O> f) =>
    pull.uncons.flatMap((hdtl) {
      return hdtl.foldN(
        () => Pull.done,
        (hd, tl) {
          final (pre, post) = hd.splitAt(1);
          return Pull.output(pre).append(() => tl.cons(post)._scan(pre[0], f));
        },
      );
    }).rillNoScope;

scanChunks()

Rill<O2> scanChunks<S, O2>(S initial, Record Function(S, Chunk<O>) f)
Implementation
dart
Rill<O2> scanChunks<S, O2>(S initial, Function2<S, Chunk<O>, (S, Chunk<O2>)> f) =>
    scanChunksOpt(initial, (s) => Some((c) => f(s, c)));

scanChunksOpt()

Rill<O2> scanChunksOpt<S, O2>(
  S initial,
  Option<Record Function(Chunk<O>)> Function(S) f,
)
Implementation
dart
Rill<O2> scanChunksOpt<S, O2>(
  S initial,
  Function1<S, Option<Function1<Chunk<O>, (S, Chunk<O2>)>>> f,
) => pull.scanChunksOpt(initial, f).voided.rillNoScope;

sliding()

Rill<Chunk<O>> sliding(int size, {int step = 1})
Implementation
dart
Rill<Chunk<O>> sliding(int size, {int step = 1}) {
  assert(size > 0 && step > 0, 'size and step must be positive');
  Pull<Chunk<O>, Unit> stepNotSmallerThanSize(Rill<O> s, Chunk<O> prev) {
    return s.pull.uncons.flatMap(
      (hdtl) => hdtl.foldN(
        () => prev.isEmpty ? Pull.done : Pull.output1(prev.take(size)),
        (hd, tl) {
          final bldr = <Chunk<O>>[];

          var current = prev.concat(hd);

          while (current.size >= step) {
            final (nHeads, nTails) = current.splitAt(step);
            bldr.add(nHeads.take(size));
            current = nTails;
          }

          return Pull.output(
            Chunk.fromList(bldr),
          ).append(() => stepNotSmallerThanSize(tl, current));
        },
      ),
    );
  }

  Pull<Chunk<O>, Unit> stepSmallerThanSize(Rill<O> s, Chunk<O> window, Chunk<O> prev) {
    return s.pull.uncons.flatMap(
      (hdtl) => hdtl.foldN(
        () => prev.isEmpty ? Pull.done : Pull.output1(window.concat(prev).take(size)),
        (hd, tl) {
          final bldr = <Chunk<O>>[];

          var w = window;
          var current = prev.concat(hd);

          while (current.size >= step) {
            final (head, tail) = current.splitAt(step);
            final wind = w.concat(head);

            bldr.add(wind);
            w = wind.drop(step);

            current = tail;
          }

          return Pull.output(
            Chunk.fromList(bldr),
          ).append(() => stepSmallerThanSize(tl, w, current));
        },
      ),
    );
  }

  if (step < size) {
    return pull
        .unconsN(size, allowFewer: true)
        .flatMap(
          (hdtl) => hdtl.foldN(
            () => Pull.done,
            (hd, tl) => Pull.output1(
              hd,
            ).append(() => stepSmallerThanSize(tl, hd.drop(step), Chunk.empty())),
          ),
        )
        .rillNoScope;
  } else {
    return stepNotSmallerThanSize(this, Chunk.empty()).rillNoScope;
  }
}

spaced()

Rill<O> spaced(Duration delay, {bool startImmediately = true})
Implementation
dart
Rill<O> spaced(Duration delay, {bool startImmediately = true}) {
  final start = startImmediately ? Rill.unit : Rill.empty<Unit>();
  return start.append(() => Rill.fixedDelay(delay)).zipRight(this);
}

split()

Rill<Chunk<O>> split(bool Function(O) p)
Implementation
dart
Rill<Chunk<O>> split(Function1<O, bool> p) {
  Pull<Chunk<O>, Unit> go(Chunk<O> buffer, Rill<O> s) {
    return s.pull.uncons.flatMap((hdtl) {
      return hdtl.foldN(
        () => buffer.nonEmpty ? Pull.output1(buffer) : Pull.done,
        (hd, tl) {
          return hd.indexWhere(p).fold(
            () => go(buffer.concat(hd), tl),
            (idx) {
              final pfx = hd.take(idx);
              final b2 = buffer.concat(pfx);

              return Pull.output1(b2).append(() => go(Chunk.empty(), tl.cons(hd.drop(idx + 1))));
            },
          );
        },
      );
    });
  }

  return go(Chunk.empty(), this).rillNoScope;
}

switchMap()

Rill<O2> switchMap<O2>(Rill<O2> Function(O) f)
Implementation
dart
Rill<O2> switchMap<O2>(Function1<O, Rill<O2>> f) {
  final IO<Rill<O2>> rillF = Semaphore.permits(1).flatMap((guard) {
    return IO.ref(none<Deferred<Unit>>()).map((haltRef) {
      Rill<O2> runInner(O o, Deferred<Unit> halt) {
        return Rill.bracketFull(
          (poll) => poll(guard.acquire()), &#47;&#47; guard against simultaneously running rills
          (_, ec) {
            return ec.fold(
              () => guard.release(),
              (_, _) => IO.unit, &#47;&#47; on error don't start next stream
              () => guard.release(),
            );
          },
        ).flatMap((_) => f(o).interruptWhen(halt.value().attempt()));
      }

      IO<Rill<O2>> haltedF(O o) {
        return IO.deferred<Unit>().flatMap((halt) {
          return haltRef.getAndSet(halt.some).flatMap((prev) {
            return prev
                .fold(() => IO.unit, (prev) => prev.complete(Unit()))
                .map((_) => runInner(o, halt));
          });
        });
      }

      return evalMap(haltedF).parJoin(2);
    });
  });

  return Rill.force(rillF);
}

take()

Rill<O> take(int n)
Implementation
dart
Rill<O> take(int n) => pull.take(n).voided.rillNoScope;

takeRight()

Rill<O> takeRight(int n)
Implementation
dart
Rill<O> takeRight(int n) => pull.takeRight(n).flatMap(Pull.output).rillNoScope;

takeThrough()

Rill<O> takeThrough(bool Function(O) p)
Implementation
dart
Rill<O> takeThrough(Function1<O, bool> p) => pull.takeThrough(p).voided.rillNoScope;

takeWhile()

Rill<O> takeWhile(bool Function(O) p, {bool takeFailure = false})
Implementation
dart
Rill<O> takeWhile(Function1<O, bool> p, {bool takeFailure = false}) =>
    pull.takeWhile(p, takeFailure: takeFailure).voided.rillNoScope;

through()

Rill<O2> through<O2>(Rill<O2> Function(Rill<O>) f)
Implementation
dart
Rill<O2> through<O2>(Pipe<O, O2> f) => f(this);

timeout()

Rill<O> timeout(Duration timeout)
Implementation
dart
Rill<O> timeout(Duration timeout) => interruptWhen(IO.sleep(timeout));

toString() inherited

String toString()

A string representation of this object.

Some classes have a default textual representation, often paired with a static parse function (like int.parse). These classes will provide the textual representation as their string representation.

Other classes have no meaningful textual representation that a program will care about. Such classes will typically override toString to provide useful information when inspecting the object, mainly for debugging or logging.

Inherited from Object.

Implementation
dart
external String toString();

zip()

Rill<Record> zip<O2>(Rill<O2> that)
Implementation
dart
Rill<(O, O2)> zip<O2>(Rill<O2> that) => zipWith(that, (o, o2) => (o, o2));

zipAll()

Rill<Record> zipAll<O2>(Rill<O2> that, O padLeft, O2 padRight)
Implementation
dart
Rill<(O, O2)> zipAll<O2>(Rill<O2> that, O padLeft, O2 padRight) =>
    zipAllWith(that, padLeft, padRight, (o, o2) => (o, o2));

zipAllWith()

Rill<O3> zipAllWith<O2, O3>(
  Rill<O2> that,
  O padLeft,
  O2 padRight,
  O3 Function(O, O2) f,
)
Implementation
dart
Rill<O3> zipAllWith<O2, O3>(Rill<O2> that, O padLeft, O2 padRight, Function2<O, O2, O3> f) {
  Pull<O3, Unit> loop(Rill<O> s1, Rill<O2> s2) {
    return s1.pull.uncons.flatMap((hdtl1) {
      return hdtl1.foldN(
        () => s2.map((o2) => f(padLeft, o2)).pull.echo,
        (hd1, tl1) {
          return s2.pull.uncons.flatMap((hdtl2) {
            return hdtl2.foldN(
              () {
                final zippedChunk = hd1.map((o) => f(o, padRight));
                return Pull.output(zippedChunk).flatMap((_) {
                  return tl1.map((o) => f(o, padRight)).pull.echo;
                });
              },
              (hd2, tl2) {
                final len = math.min(hd1.size, hd2.size);

                final nextS1 = tl1.cons(hd1.drop(len));
                final nextS2 = tl2.cons(hd2.drop(len));

                return Pull.output(
                  hd1.zip(hd2).map((t) => f(t.$1, t.$2)),
                ).flatMap((_) => loop(nextS1, nextS2));
              },
            );
          });
        },
      );
    });
  }

  return loop(this, that).rillNoScope;
}

zipLatest()

Rill<Record> zipLatest<O2>(Rill<O2> that)
Implementation
dart
Rill<(O, O2)> zipLatest<O2>(Rill<O2> that) => zipLatestWith(that, (o, o2) => (o, o2));

zipLatestWith()

Rill<O3> zipLatestWith<O2, O3>(Rill<O2> that, O3 Function(O, O2) f)
Implementation
dart
Rill<O3> zipLatestWith<O2, O3>(Rill<O2> that, Function2<O, O2, O3> f) {
  return Rill.eval(Queue.unbounded<Either<Object, Option<O3>>>()).flatMap((queue) {
    return Rill.eval(Ref.of<Option<O>>(none())).flatMap((refLeft) {
      return Rill.eval(Ref.of<Option<O2>>(none())).flatMap((refRight) {
        IO<Unit> tryEmit() {
          return refLeft.value().flatMap((optL) {
            return refRight.value().flatMap((optR) {
              return optL.fold(
                () => IO.unit,
                (l) => optR.fold(
                  () => IO.unit,
                  (r) => queue.offer(Right(Some(f(l, r)))),
                ),
              );
            });
          });
        }

        IO<Unit> runLeft() => evalMap(
          (o) => refLeft.setValue(Some(o)).productL(tryEmit()),
        ).compile.drain.handleErrorWith((err) => queue.offer(Left(err)));

        IO<Unit> runRight() => that
            .evalMap((o2) => refRight.setValue(Some(o2)).productL(tryEmit()))
            .compile
            .drain
            .handleErrorWith((err) => queue.offer(Left(err)));

        Rill<O3> consumeQueue() {
          return Rill.eval(queue.take()).flatMap((event) {
            return event.fold(
              (err) => Rill.raiseError(err),
              (opt) => opt.fold(
                () => Rill.empty(),
                (o3) => Rill.emit(o3).append(() => consumeQueue()),
              ),
            );
          });
        }

        final driver = IO.both(runLeft(), runRight()).guarantee(queue.offer(Right(none())));

        return Rill.bracket(
          driver.start(),
          (fiber) => fiber.cancel(),
        ).flatMap((_) => consumeQueue());
      });
    });
  });
}

zipLeft()

Rill<O> zipLeft<O2>(Rill<O2> other)
Implementation
dart
Rill<O> zipLeft<O2>(Rill<O2> other) => zipWith(other, (a, _) => a);

zipRight()

Rill<O2> zipRight<O2>(Rill<O2> other)
Implementation
dart
Rill<O2> zipRight<O2>(Rill<O2> other) => zipWith(other, (_, b) => b);

zipWith()

Rill<O3> zipWith<O2, O3>(Rill<O2> that, O3 Function(O, O2) f)
Implementation
dart
Rill<O3> zipWith<O2, O3>(Rill<O2> that, Function2<O, O2, O3> f) {
  Pull<O3, Unit> loop(Rill<O> s1, Rill<O2> s2) {
    return s1.pull.uncons.flatMap((hdtl1) {
      return hdtl1.foldN(
        () => Pull.done,
        (hd1, tl1) {
          return s2.pull.uncons.flatMap((hdtl2) {
            return hdtl2.foldN(
              () => Pull.done,
              (hd2, tl2) {
                final len = math.min(hd1.size, hd2.size);

                final nextS1 = tl1.cons(hd1.drop(len));
                final nextS2 = tl2.cons(hd2.drop(len));

                return Pull.output(
                  hd1.zip(hd2).map((t) => f(t.$1, t.$2)),
                ).flatMap((_) => loop(nextS1, nextS2));
              },
            );
          });
        },
      );
    });
  }

  return loop(this, that).rillNoScope;
}

zipWithIndex()

Rill<Record> zipWithIndex()
Implementation
dart
Rill<(O, int)> zipWithIndex() => scanChunks(0, (index, c) {
  var idx = index;

  final out = c.map((o) {
    final r = (o, idx);
    idx += 1;
    return r;
  });

  return (idx, out);
});

zipWithNext()

Rill<Record> zipWithNext()
Implementation
dart
Rill<(O, Option<O>)> zipWithNext() {
  Pull<(O, Option<O>), Unit> go(O last, Rill<O> s) {
    return s.pull.uncons.flatMap((hdtl) {
      return hdtl.foldN(
        () => Pull.output1((last, none())),
        (hd, tl) {
          final (newLast, out) = hd.mapAccumulate(last, (prev, next) {
            return (next, (prev, Option(next)));
          });

          return Pull.output(out).append(() => go(newLast, tl));
        },
      );
    });
  }

  return pull.uncons1.flatMap((hdtl) {
    return hdtl.foldN(
      () => Pull.done,
      (hd, tl) => go(hd, tl),
    );
  }).rillNoScope;
}

zipWithPrevious()

Rill<Record> zipWithPrevious()
Implementation
dart
Rill<(Option<O>, O)> zipWithPrevious() =>
    mapAccumulate(none<O>(), (prev, next) => (Option(next), (prev, next))).map((x) => x.$2);

zipWithPreviousAndNext()

Rill<Record> zipWithPreviousAndNext()
Implementation
dart
Rill<(Option<O>, O, Option<O>)> zipWithPreviousAndNext() =>
    zipWithPrevious().zipWithNext().map((tuple) {
      final ((prev, curr), next) = tuple;

      return next.foldN(
        () => (prev, curr, const None()),
        (_, next) => (prev, curr, Some(next)),
      );
    });

zipWithScan()

Rill<Record> zipWithScan<O2>(O2 z, O2 Function(O2, O) f)
Implementation
dart
Rill<(O, O2)> zipWithScan<O2>(O2 z, Function2<O2, O, O2> f) => mapAccumulate(
  z,
  (s, o) => (f(s, o), (o, s)),
).mapN((_, v) => v);

zipWithScan1()

Rill<Record> zipWithScan1<O2>(O2 z, O2 Function(O2, O) f)
Implementation
dart
Rill<(O, O2)> zipWithScan1<O2>(O2 z, Function2<O2, O, O2> f) => mapAccumulate(
  z,
  (s, o) {
    final s2 = f(s, o);
    return (s2, (o, s2));
  },
).mapN((_, v) => v);

Extension Methods

collectN() extension

Rill<T3> collectN<T3>(Option<T3> Function(T1, T2) f)

Available on Rill<O>, provided by the RillTuple2Ops<T1, T2> extension

Implementation
dart
Rill<T3> collectN<T3>(Function2<T1, T2, Option<T3>> f) => collect(f.tupled);

collectN() extension

Rill<T4> collectN<T4>(Option<T4> Function(T1, T2, T3) f)

Available on Rill<O>, provided by the RillTuple3Ops<T1, T2, T3> extension

Implementation
dart
Rill<T4> collectN<T4>(Function3<T1, T2, T3, Option<T4>> f) => collect(f.tupled);

collectN() extension

Rill<T5> collectN<T5>(Option<T5> Function(T1, T2, T3, T4) f)

Available on Rill<O>, provided by the RillTuple4Ops<T1, T2, T3, T4> extension

Implementation
dart
Rill<T5> collectN<T5>(Function4<T1, T2, T3, T4, Option<T5>> f) => collect(f.tupled);

collectN() extension

Rill<T6> collectN<T6>(Option<T6> Function(T1, T2, T3, T4, T5) f)

Available on Rill<O>, provided by the RillTuple5Ops<T1, T2, T3, T4, T5> extension

Implementation
dart
Rill<T6> collectN<T6>(Function5<T1, T2, T3, T4, T5, Option<T6>> f) => collect(f.tupled);

evalMapN() extension

Rill<T3> evalMapN<T3>(IO<T3> Function(T1, T2) f)

Available on Rill<O>, provided by the RillTuple2Ops<T1, T2> extension

Implementation
dart
Rill<T3> evalMapN<T3>(Function2<T1, T2, IO<T3>> f) => evalMap(f.tupled);

evalMapN() extension

Rill<T4> evalMapN<T4>(IO<T4> Function(T1, T2, T3) f)

Available on Rill<O>, provided by the RillTuple3Ops<T1, T2, T3> extension

Implementation
dart
Rill<T4> evalMapN<T4>(Function3<T1, T2, T3, IO<T4>> f) => evalMap(f.tupled);

evalMapN() extension

Rill<T6> evalMapN<T6>(IO<T6> Function(T1, T2, T3, T4, T5) f)

Available on Rill<O>, provided by the RillTuple5Ops<T1, T2, T3, T4, T5> extension

Implementation
dart
Rill<T6> evalMapN<T6>(Function5<T1, T2, T3, T4, T5, IO<T6>> f) => evalMap(f.tupled);

evalMapN() extension

Rill<T5> evalMapN<T5>(IO<T5> Function(T1, T2, T3, T4) f)

Available on Rill<O>, provided by the RillTuple4Ops<T1, T2, T3, T4> extension

Implementation
dart
Rill<T5> evalMapN<T5>(Function4<T1, T2, T3, T4, IO<T5>> f) => evalMap(f.tupled);

evalTapN() extension

Rill<Record> evalTapN<T6>(IO<T6> Function(T1, T2, T3, T4, T5) f)

Available on Rill<O>, provided by the RillTuple5Ops<T1, T2, T3, T4, T5> extension

Implementation
dart
Rill<(T1, T2, T3, T4, T5)> evalTapN<T6>(Function5<T1, T2, T3, T4, T5, IO<T6>> f) =>
    evalTap(f.tupled);

evalTapN() extension

Rill<Record> evalTapN<T5>(IO<T5> Function(T1, T2, T3, T4) f)

Available on Rill<O>, provided by the RillTuple4Ops<T1, T2, T3, T4> extension

Implementation
dart
Rill<(T1, T2, T3, T4)> evalTapN<T5>(Function4<T1, T2, T3, T4, IO<T5>> f) => evalTap(f.tupled);

evalTapN() extension

Rill<Record> evalTapN<T4>(IO<T4> Function(T1, T2, T3) f)

Available on Rill<O>, provided by the RillTuple3Ops<T1, T2, T3> extension

Implementation
dart
Rill<(T1, T2, T3)> evalTapN<T4>(Function3<T1, T2, T3, IO<T4>> f) => evalTap(f.tupled);

evalTapN() extension

Rill<Record> evalTapN<T3>(IO<T3> Function(T1, T2) f)

Available on Rill<O>, provided by the RillTuple2Ops<T1, T2> extension

Implementation
dart
Rill<(T1, T2)> evalTapN<T3>(Function2<T1, T2, IO<T3>> f) => evalTap(f.tupled);

filterN() extension

Rill<Record> filterN(bool Function(T1, T2, T3, T4, T5) f)

Available on Rill<O>, provided by the RillTuple5Ops<T1, T2, T3, T4, T5> extension

Implementation
dart
Rill<(T1, T2, T3, T4, T5)> filterN(Function5<T1, T2, T3, T4, T5, bool> f) => filter(f.tupled);

filterN() extension

Rill<Record> filterN(bool Function(T1, T2) f)

Available on Rill<O>, provided by the RillTuple2Ops<T1, T2> extension

Implementation
dart
Rill<(T1, T2)> filterN(Function2<T1, T2, bool> f) => filter(f.tupled);

filterN() extension

Rill<Record> filterN(bool Function(T1, T2, T3, T4) f)

Available on Rill<O>, provided by the RillTuple4Ops<T1, T2, T3, T4> extension

Implementation
dart
Rill<(T1, T2, T3, T4)> filterN(Function4<T1, T2, T3, T4, bool> f) => filter(f.tupled);

filterN() extension

Rill<Record> filterN(bool Function(T1, T2, T3) f)

Available on Rill<O>, provided by the RillTuple3Ops<T1, T2, T3> extension

Implementation
dart
Rill<(T1, T2, T3)> filterN(Function3<T1, T2, T3, bool> f) => filter(f.tupled);

filterNotN() extension

Rill<Record> filterNotN(bool Function(T1, T2, T3, T4, T5) f)

Available on Rill<O>, provided by the RillTuple5Ops<T1, T2, T3, T4, T5> extension

Implementation
dart
Rill<(T1, T2, T3, T4, T5)> filterNotN(Function5<T1, T2, T3, T4, T5, bool> f) =>
    filterNot(f.tupled);

filterNotN() extension

Rill<Record> filterNotN(bool Function(T1, T2, T3, T4) f)

Available on Rill<O>, provided by the RillTuple4Ops<T1, T2, T3, T4> extension

Implementation
dart
Rill<(T1, T2, T3, T4)> filterNotN(Function4<T1, T2, T3, T4, bool> f) => filterNot(f.tupled);

filterNotN() extension

Rill<Record> filterNotN(bool Function(T1, T2, T3) f)

Available on Rill<O>, provided by the RillTuple3Ops<T1, T2, T3> extension

Implementation
dart
Rill<(T1, T2, T3)> filterNotN(Function3<T1, T2, T3, bool> f) => filterNot(f.tupled);

filterNotN() extension

Rill<Record> filterNotN(bool Function(T1, T2) f)

Available on Rill<O>, provided by the RillTuple2Ops<T1, T2> extension

Implementation
dart
Rill<(T1, T2)> filterNotN(Function2<T1, T2, bool> f) => filterNot(f.tupled);

flatMapN() extension

Rill<T4> flatMapN<T4>(Rill<T4> Function(T1, T2, T3) f)

Available on Rill<O>, provided by the RillTuple3Ops<T1, T2, T3> extension

Implementation
dart
Rill<T4> flatMapN<T4>(Function3<T1, T2, T3, Rill<T4>> f) => flatMap(f.tupled);

flatMapN() extension

Rill<T3> flatMapN<T3>(Rill<T3> Function(T1, T2) f)

Available on Rill<O>, provided by the RillTuple2Ops<T1, T2> extension

Implementation
dart
Rill<T3> flatMapN<T3>(Function2<T1, T2, Rill<T3>> f) => flatMap(f.tupled);

flatMapN() extension

Rill<T6> flatMapN<T6>(Rill<T6> Function(T1, T2, T3, T4, T5) f)

Available on Rill<O>, provided by the RillTuple5Ops<T1, T2, T3, T4, T5> extension

Implementation
dart
Rill<T6> flatMapN<T6>(Function5<T1, T2, T3, T4, T5, Rill<T6>> f) => flatMap(f.tupled);

flatMapN() extension

Rill<T5> flatMapN<T5>(Rill<T5> Function(T1, T2, T3, T4) f)

Available on Rill<O>, provided by the RillTuple4Ops<T1, T2, T3, T4> extension

Implementation
dart
Rill<T5> flatMapN<T5>(Function4<T1, T2, T3, T4, Rill<T5>> f) => flatMap(f.tupled);

flatten() extension

Rill<O> flatten()

Available on Rill<O>, provided by the RillFlattenOps<O> extension

Implementation
dart
Rill<O> flatten() => flatMap((r) => r);

ifM() extension

Rill<O2> ifM<O2>(Rill<O2> Function() ifTrue, Rill<O2> Function() ifFalse)

Available on Rill<O>, provided by the RillBooleanOps extension

Implementation
dart
Rill<O2> ifM<O2>(Function0<Rill<O2>> ifTrue, Function0<Rill<O2>> ifFalse) =>
    flatMap((b) => b ? ifTrue() : ifFalse());

mapN() extension

Rill<T6> mapN<T6>(T6 Function(T1, T2, T3, T4, T5) f)

Available on Rill<O>, provided by the RillTuple5Ops<T1, T2, T3, T4, T5> extension

Implementation
dart
Rill<T6> mapN<T6>(Function5<T1, T2, T3, T4, T5, T6> f) => map(f.tupled);

mapN() extension

Rill<T3> mapN<T3>(T3 Function(T1, T2) f)

Available on Rill<O>, provided by the RillTuple2Ops<T1, T2> extension

Implementation
dart
Rill<T3> mapN<T3>(Function2<T1, T2, T3> f) => map(f.tupled);

mapN() extension

Rill<T5> mapN<T5>(T5 Function(T1, T2, T3, T4) f)

Available on Rill<O>, provided by the RillTuple4Ops<T1, T2, T3, T4> extension

Implementation
dart
Rill<T5> mapN<T5>(Function4<T1, T2, T3, T4, T5> f) => map(f.tupled);

mapN() extension

Rill<T4> mapN<T4>(T4 Function(T1, T2, T3) f)

Available on Rill<O>, provided by the RillTuple3Ops<T1, T2, T3> extension

Implementation
dart
Rill<T4> mapN<T4>(Function3<T1, T2, T3, T4> f) => map(f.tupled);

parEvalMapN() extension

Rill<T6> parEvalMapN<T6>(
  int maxConcurrent,
  IO<T6> Function(T1, T2, T3, T4, T5) f,
)

Available on Rill<O>, provided by the RillTuple5Ops<T1, T2, T3, T4, T5> extension

Implementation
dart
Rill<T6> parEvalMapN<T6>(
  int maxConcurrent,
  Function5<T1, T2, T3, T4, T5, IO<T6>> f,
) => parEvalMap(maxConcurrent, f.tupled);

parEvalMapN() extension

Rill<T3> parEvalMapN<T3>(int maxConcurrent, IO<T3> Function(T1, T2) f)

Available on Rill<O>, provided by the RillTuple2Ops<T1, T2> extension

Implementation
dart
Rill<T3> parEvalMapN<T3>(
  int maxConcurrent,
  Function2<T1, T2, IO<T3>> f,
) => parEvalMap(maxConcurrent, f.tupled);

parEvalMapN() extension

Rill<T4> parEvalMapN<T4>(int maxConcurrent, IO<T4> Function(T1, T2, T3) f)

Available on Rill<O>, provided by the RillTuple3Ops<T1, T2, T3> extension

Implementation
dart
Rill<T4> parEvalMapN<T4>(
  int maxConcurrent,
  Function3<T1, T2, T3, IO<T4>> f,
) => parEvalMap(maxConcurrent, f.tupled);

parEvalMapN() extension

Rill<T5> parEvalMapN<T5>(int maxConcurrent, IO<T5> Function(T1, T2, T3, T4) f)

Available on Rill<O>, provided by the RillTuple4Ops<T1, T2, T3, T4> extension

Implementation
dart
Rill<T5> parEvalMapN<T5>(
  int maxConcurrent,
  Function4<T1, T2, T3, T4, IO<T5>> f,
) => parEvalMap(maxConcurrent, f.tupled);

parEvalMapUnboundedN() extension

Rill<T6> parEvalMapUnboundedN<T6>(IO<T6> Function(T1, T2, T3, T4, T5) f)

Available on Rill<O>, provided by the RillTuple5Ops<T1, T2, T3, T4, T5> extension

Implementation
dart
Rill<T6> parEvalMapUnboundedN<T6>(Function5<T1, T2, T3, T4, T5, IO<T6>> f) =>
    parEvalMapUnbounded(f.tupled);

parEvalMapUnboundedN() extension

Rill<T3> parEvalMapUnboundedN<T3>(IO<T3> Function(T1, T2) f)

Available on Rill<O>, provided by the RillTuple2Ops<T1, T2> extension

Implementation
dart
Rill<T3> parEvalMapUnboundedN<T3>(Function2<T1, T2, IO<T3>> f) => parEvalMapUnbounded(f.tupled);

parEvalMapUnboundedN() extension

Rill<T5> parEvalMapUnboundedN<T5>(IO<T5> Function(T1, T2, T3, T4) f)

Available on Rill<O>, provided by the RillTuple4Ops<T1, T2, T3, T4> extension

Implementation
dart
Rill<T5> parEvalMapUnboundedN<T5>(Function4<T1, T2, T3, T4, IO<T5>> f) =>
    parEvalMapUnbounded(f.tupled);

parEvalMapUnboundedN() extension

Rill<T4> parEvalMapUnboundedN<T4>(IO<T4> Function(T1, T2, T3) f)

Available on Rill<O>, provided by the RillTuple3Ops<T1, T2, T3> extension

Implementation
dart
Rill<T4> parEvalMapUnboundedN<T4>(Function3<T1, T2, T3, IO<T4>> f) =>
    parEvalMapUnbounded(f.tupled);

parEvalMapUnorderedN() extension

Rill<T4> parEvalMapUnorderedN<T4>(
  int maxConcurrent,
  IO<T4> Function(T1, T2, T3) f,
)

Available on Rill<O>, provided by the RillTuple3Ops<T1, T2, T3> extension

Implementation
dart
Rill<T4> parEvalMapUnorderedN<T4>(
  int maxConcurrent,
  Function3<T1, T2, T3, IO<T4>> f,
) => parEvalMapUnordered(maxConcurrent, f.tupled);

parEvalMapUnorderedN() extension

Rill<T5> parEvalMapUnorderedN<T5>(
  int maxConcurrent,
  IO<T5> Function(T1, T2, T3, T4) f,
)

Available on Rill<O>, provided by the RillTuple4Ops<T1, T2, T3, T4> extension

Implementation
dart
Rill<T5> parEvalMapUnorderedN<T5>(
  int maxConcurrent,
  Function4<T1, T2, T3, T4, IO<T5>> f,
) => parEvalMapUnordered(maxConcurrent, f.tupled);

parEvalMapUnorderedN() extension

Rill<T6> parEvalMapUnorderedN<T6>(
  int maxConcurrent,
  IO<T6> Function(T1, T2, T3, T4, T5) f,
)

Available on Rill<O>, provided by the RillTuple5Ops<T1, T2, T3, T4, T5> extension

Implementation
dart
Rill<T6> parEvalMapUnorderedN<T6>(
  int maxConcurrent,
  Function5<T1, T2, T3, T4, T5, IO<T6>> f,
) => parEvalMapUnordered(maxConcurrent, f.tupled);

parEvalMapUnorderedN() extension

Rill<T3> parEvalMapUnorderedN<T3>(int maxConcurrent, IO<T3> Function(T1, T2) f)

Available on Rill<O>, provided by the RillTuple2Ops<T1, T2> extension

Implementation
dart
Rill<T3> parEvalMapUnorderedN<T3>(
  int maxConcurrent,
  Function2<T1, T2, IO<T3>> f,
) => parEvalMapUnordered(maxConcurrent, f.tupled);

parEvalMapUnorderedUnboundedN() extension

Rill<T3> parEvalMapUnorderedUnboundedN<T3>(IO<T3> Function(T1, T2) f)

Available on Rill<O>, provided by the RillTuple2Ops<T1, T2> extension

Implementation
dart
Rill<T3> parEvalMapUnorderedUnboundedN<T3>(Function2<T1, T2, IO<T3>> f) =>
    parEvalMapUnorderedUnbounded(f.tupled);

parEvalMapUnorderedUnboundedN() extension

Rill<T4> parEvalMapUnorderedUnboundedN<T4>(IO<T4> Function(T1, T2, T3) f)

Available on Rill<O>, provided by the RillTuple3Ops<T1, T2, T3> extension

Implementation
dart
Rill<T4> parEvalMapUnorderedUnboundedN<T4>(Function3<T1, T2, T3, IO<T4>> f) =>
    parEvalMapUnorderedUnbounded(f.tupled);

parEvalMapUnorderedUnboundedN() extension

Rill<T5> parEvalMapUnorderedUnboundedN<T5>(IO<T5> Function(T1, T2, T3, T4) f)

Available on Rill<O>, provided by the RillTuple4Ops<T1, T2, T3, T4> extension

Implementation
dart
Rill<T5> parEvalMapUnorderedUnboundedN<T5>(Function4<T1, T2, T3, T4, IO<T5>> f) =>
    parEvalMapUnorderedUnbounded(f.tupled);

parEvalMapUnorderedUnboundedN() extension

Rill<T6> parEvalMapUnorderedUnboundedN<T6>(
  IO<T6> Function(T1, T2, T3, T4, T5) f,
)

Available on Rill<O>, provided by the RillTuple5Ops<T1, T2, T3, T4, T5> extension

Implementation
dart
Rill<T6> parEvalMapUnorderedUnboundedN<T6>(Function5<T1, T2, T3, T4, T5, IO<T6>> f) =>
    parEvalMapUnorderedUnbounded(f.tupled);

parJoin() extension

Rill<O> parJoin(int maxOpen)

Runs maxOpen inner streams concurrently.

  • Waits for all inner streams to finish.
  • If any stream fails, the error is propagated and all other streams are cancelled.
  • If the output stream is interrupted, all running streams are cancelled.

Available on Rill<O>, provided by the ParJoinOps<O> extension

Implementation
dart
Rill<O> parJoin(int maxOpen) {
  assert(maxOpen > 0, 'maxOpen must be greater than 0, was: $maxOpen');

  if (maxOpen == 1) {
    return flatten();
  } else {
    final rillF = (
      SignallingRef.of(none<Object>()),
      Semaphore.permits(maxOpen),
      SignallingRef.of(1),
      Channel.unbounded<Unit>(),
      Channel.unbounded<Chunk<O>>(),
    ).mapN((done, available, running, outcomes, output) {
      IO<Unit> stop(Option<Object> rslt) {
        return done.update((current) {
          return current.fold(
            () => rslt,
            (prev) => rslt.fold(
              () => current, &#47;&#47; If we're trying to set None, keep existing result (if any)
              (err) => Some(
                current.fold(
                  () => Some(err),
                  (prevErr) => Some(CompositeFailure.of(prevErr, err)),
                ),
              ),
            ),
          );
        });
      }

      final incrementRunning = running.update((n) => n + 1);
      final decrementRunning = running
          .updateAndGet((n) => n - 1)
          .flatMap((now) => now == 0 ? outcomes.close().voided() : IO.unit);

      IO<Unit> onOutcome(
        Outcome<Unit> oc,
        Either<Object, Unit> cancelResult,
      ) {
        return oc.fold(
          () => cancelResult.fold((err) => stop(Some(err)), (_) => IO.unit),
          (err, st) => CompositeFailure.fromResults(
            Left(err),
            cancelResult,
          ).fold((err) => stop(Some(err)), (_) => IO.unit),
          (fu) => cancelResult.fold(
            (err) => stop(Some(err)),
            (_) => outcomes.send(fu).voided(),
          ),
        );
      }

      IO<Unit> runInner(Rill<O> inner, Scope outerScope) {
        return IO.uncancelable((_) {
          return outerScope.lease().flatMap((lease) {
            return available.acquire().productR(incrementRunning).flatMap((_) {
              return inner
                  .chunks()
                  .evalMap((chunk) => output.send(chunk).voided())
                  .interruptWhenSignaled(done.map((x) => x.nonEmpty))
                  .compile
                  .drain
                  .guaranteeCase((oc) {
                    return lease.cancel
                        .flatMap((cancelResult) => onOutcome(oc, cancelResult))
                        .guarantee(
                          available.release().productR(decrementRunning),
                        );
                  })
                  .voidError()
                  .start()
                  .voided();
            });
          });
        });
      }

      IO<Unit> runOuter() {
        return IO.uncancelable((_) {
          return flatMap(
                (inner) =>
                    Pull.getScope
                        .flatMap((outerScope) => Pull.eval(runInner(inner, outerScope)))
                        .rillNoScope,
              )
              .drain()
              .interruptWhenSignaled(done.map((x) => x.nonEmpty))
              .compile
              .drain
              .guaranteeCase(
                (oc) => onOutcome(oc, Right(Unit())).productR(decrementRunning),
              )
              .voidError();
        });
      }

      IO<Unit> outcomeJoiner() {
        return outcomes.rill.compile.drain.guaranteeCase((oc) {
          return oc.fold(
            () => stop(none()).productR(output.close().voided()),
            (err, st) => stop(Some(err)).productR(output.close().voided()),
            (fu) => stop(none()).productR(output.close().voided()),
          );
        }).voidError();
      }

      IO<Unit> signalResult(IOFiber<Unit> fiber) {
        return done.value().flatMap((opt) {
          return opt.fold(
            () => fiber.join().voided(),
            (err) => err is _Interruption ? fiber.join().voided() : IO.raiseError(err),
          );
        });
      }

      return Rill.bracket(
        runOuter().start().productR(outcomeJoiner().start()),
        (fiber) {
          return done
              .update((c) => c.fold(() => const Some(_Interruption()), (prev) => Some(prev)))
              .productR(
                &#47;&#47; in case of short-circuiting, the `fiberJoiner` would not have had a chance
                &#47;&#47; to wait until all fibers have been joined, so we need to do it manually
                &#47;&#47; by waiting on the counter
                running.waitUntil((n) => n == 0).productR(signalResult(fiber)),
              );
        },
      ).flatMap((_) => output.rill.flatMap((o) => Rill.chunk(o)));
    });

    return Rill.eval(rillF).flatten();
  }
}

parJoinUnbounded() extension

Rill<O> parJoinUnbounded()

Available on Rill<O>, provided by the ParJoinOps<O> extension

Implementation
dart
Rill<O> parJoinUnbounded() => parJoin(Integer.maxValue);

toDartStream() extension

Stream<O> toDartStream()

Available on Rill<O>, provided by the ToDartStreamOps<O> extension

Implementation
dart
Stream<O> toDartStream() {
  late StreamController<O> controller;
  IOFiber<Unit>? activeFiber;

  controller = StreamController(
    onListen: () {
      final program = evalMap(
            (o) => IO.exec(() {
              if (!controller.isClosed) controller.add(o);
            }),
          )
          .handleErrorWith((err) {
            return Rill.eval(
              IO.exec(() {
                if (!controller.isClosed) controller.addError(err);
              }),
            );
          })
          .compile
          .drain
          .guarantee(
            IO.exec(() {
              if (!controller.isClosed) controller.close();
            }),
          );

      final startIO = program.start();

      startIO.unsafeRunAsync((outcome) {
        outcome.fold(
          () {
            if (!controller.isClosed) controller.close();
          },
          (err, stackTrace) {
            if (!controller.isClosed) controller.addError(err, stackTrace);
          },
          (fiber) => activeFiber = fiber,
        );
      });
    },
    onCancel: () {
      if (activeFiber != null) {
        final completer = Completer<void>();
        activeFiber!.cancel().unsafeRunAsync((_) => completer.complete());
        return completer.future;
      }
    },
  );

  return controller.stream;
}

widen() extension

Rill<O2> widen<O2>()

Widens this Rill<Never> to Rill<O2>.

Useful when an API expects Rill<O2> and you have a Rill<Never> that produces no elements (e.g. after Rill.drain or Rill.foreach).

Why this is type-safe: the ideal signature would use a lower type bound:

dart
Rill<O2> widen<O2 super O>()

where O2 super O means "O2 is a supertype of O". Dart does not support lower type bounds, so restricting the receiver to Rill<Never> encodes the same constraint at the type level. Never is the bottom type — Never <: O2 for every O2 — so the cast Pull<Never, Unit> as Pull<O2, Unit> always succeeds at runtime.

See also Rill.append for the type-preserving concatenation variant.

Available on Rill<O>, provided by the RillNeverOps extension

Implementation
dart
Rill<O2> widen<O2>() => (underlying as Pull<O2, Unit>).rillNoScope;

Operators

operator +()

Rill<O> operator +(Rill<O> other)
Implementation
dart
Rill<O> operator +(Rill<O> other) => underlying.flatMap((_) => other.underlying).rillNoScope;

operator ==() inherited

bool operator ==(Object other)

The equality operator.

The default behavior for all Objects is to return true if and only if this object and other are the same object.

Override this method to specify a different equality relation on a class. The overriding method must still be an equivalence relation. That is, it must be:

  • Total: It must return a boolean for all arguments. It should never throw.

  • Reflexive: For all objects o, o == o must be true.

  • Symmetric: For all objects o1 and o2, o1 == o2 and o2 == o1 must either both be true, or both be false.

  • Transitive: For all objects o1, o2, and o3, if o1 == o2 and o2 == o3 are true, then o1 == o3 must be true.

The method should also be consistent over time, so whether two objects are equal should only change if at least one of the objects was modified.

If a subclass overrides the equality operator, it should override the hashCode method as well to maintain consistency.

Inherited from Object.

Implementation
dart
external bool operator ==(Object other);

Static Properties

never final

final Rill<Never> never
Implementation
dart
static final Rill<Never> never = Rill.eval(IO.never());

unit final

final Rill<Unit> unit
Implementation
dart
static final Rill<Unit> unit = Pull.outUnit.rillNoScope;

Static Methods

awakeEvery()

Rill<Duration> awakeEvery(Duration period, {bool dampen = true})
Implementation
dart
static Rill<Duration> awakeEvery(Duration period, {bool dampen = true}) {
  return Rill.eval(IO.now).flatMap((start) {
    return _fixedRate(
      period,
      start,
      dampen,
    ).flatMap((_) => Rill.eval(IO.now.map((now) => now.difference(start))));
  });
}

bracket()

Rill<R> bracket<R>(IO<R> acquire, IO<Unit> Function(R) release)
Implementation
dart
static Rill<R> bracket<R>(IO<R> acquire, Function1<R, IO<Unit>> release) =>
    bracketFull((_) => acquire, (r, _) => release(r));

bracketCase()

Rill<R> bracketCase<R>(IO<R> acquire, IO<Unit> Function(R, ExitCase) release)
Implementation
dart
static Rill<R> bracketCase<R>(IO<R> acquire, Function2<R, ExitCase, IO<Unit>> release) =>
    bracketFull((_) => acquire, release);

bracketFull()

Rill<R> bracketFull<R>(
  IO<R> Function(Poll) acquire,
  IO<Unit> Function(R, ExitCase) release,
)
Implementation
dart
static Rill<R> bracketFull<R>(
  Function1<Poll, IO<R>> acquire,
  Function2<R, ExitCase, IO<Unit>> release,
) => Pull.acquireCancelable(acquire, release).flatMap(Pull.output1).rillNoScope;

chunk()

Rill<O> chunk<O>(Chunk<O> values)
Implementation
dart
static Rill<O> chunk<O>(Chunk<O> values) => Pull.output(values).rillNoScope;

constant()

Rill<O> constant<O>(O o, {int chunkSize = 256})
Implementation
dart
static Rill<O> constant<O>(O o, {int chunkSize = 256}) =>
    chunkSize > 0 ? chunk(Chunk.fill(chunkSize, o)).repeat() : Rill.empty();

duration()

Rill<Duration> duration()
Implementation
dart
static Rill<Duration> duration() =>
    Rill.eval(IO.now).flatMap((t0) => Rill.repeatEval(IO.now.map((now) => now.difference(t0))));

emit()

Rill<O2> emit<O>(O value)
Implementation
dart
static Rill<O> emit<O>(O value) => Pull.output1(value).rillNoScope;

emits()

Rill<O> emits<O>(List<O> values)
Implementation
dart
static Rill<O> emits<O>(List<O> values) => Rill.chunk(Chunk.fromList(values));

empty()

Rill<O2> empty<O>()
Implementation
dart
static Rill<O> empty<O>() => Rill._noScope(Pull.done);

eval()

Rill<O> eval<O>(IO<O> io)
Implementation
dart
static Rill<O> eval<O>(IO<O> io) => Pull.eval(io).flatMap(Pull.output1).rillNoScope;

exec()

Rill<O> exec<O>(IO<Unit> io)
Implementation
dart
static Rill<O> exec<O>(IO<Unit> io) => Rill._noScope(Pull.eval(io).flatMap((_) => Pull.done));

fixedDelay()

Rill<Unit> fixedDelay(Duration period)
Implementation
dart
static Rill<Unit> fixedDelay(Duration period) => sleep(period).repeat();

fixedRate()

Rill<Unit> fixedRate(Duration period, {bool dampen = true})
Implementation
dart
static Rill<Unit> fixedRate(Duration period, {bool dampen = true}) =>
    Rill.eval(IO.now).flatMap((t) => _fixedRate(period, t, dampen));

fixedRateStartImmediately()

Rill<Unit> fixedRateStartImmediately(Duration period, {bool dampen = true})
Implementation
dart
static Rill<Unit> fixedRateStartImmediately(Duration period, {bool dampen = true}) =>
    Rill.eval(IO.now).flatMap((t) => Rill.unit.append(() => _fixedRate(period, t, dampen)));

force()

Rill<O> force<O>(IO<Rill<O>> io)
Implementation
dart
static Rill<O> force<O>(IO<Rill<O>> io) => Rill.eval(io).flatMap(identity);

fromEither()

Rill<O> fromEither<E extends Object, O>(Either<E, O> either)
Implementation
dart
static Rill<O> fromEither<E extends Object, O>(Either<E, O> either) =>
    either.fold((err) => Rill.raiseError(err), (o) => Rill.emit(o));

fromIterator()

Rill<O> fromIterator<O>(Iterator<O> it, {int chunkSize = 64})
Implementation
dart
static Rill<O> fromIterator<O>(Iterator<O> it, {int chunkSize = 64}) =>
    fromRIterator(RIterator.fromDart(it), chunkSize: chunkSize);

fromOption()

Rill<O> fromOption<E extends Object, O>(Option<O> option)
Implementation
dart
static Rill<O> fromOption<E extends Object, O>(Option<O> option) =>
    option.fold(() => Rill.empty(), (o) => Rill.emit(o));

fromQueueNoneUnterminated()

Rill<O> fromQueueNoneUnterminated<O>(Queue<Option<O>> queue, {int? limit})
Implementation
dart
static Rill<O> fromQueueNoneUnterminated<O>(
  Queue<Option<O>> queue, {
  int? limit,
}) => _awaitNoneTerminated(queue, limit ?? Integer.maxValue);

fromQueueNoneUnterminatedChunk()

Rill<O> fromQueueNoneUnterminatedChunk<O>(
  Queue<Option<Chunk<O>>> queue, {
  int? limit,
})
Implementation
dart
static Rill<O> fromQueueNoneUnterminatedChunk<O>(
  Queue<Option<Chunk<O>>> queue, {
  int? limit,
}) => _fromQueueNoneUnterminatedChunk(queue.take(), queue.tryTake(), limit: limit);

fromQueueUnterminated()

Rill<O> fromQueueUnterminated<O>(Queue<O> queue, {int? limit})
Implementation
dart
static Rill<O> fromQueueUnterminated<O>(
  Queue<O> queue, {
  int? limit,
}) {
  final lim = limit ?? Integer.maxValue;

  if (lim > 1) {
    &#47;&#47; First, try non-blocking batch dequeue.
    &#47;&#47; Only if the result is an empty list, semantically block to get one element,
    &#47;&#47; then attempt 2nd tryTakeN to get any other elements that are immediately available.

    final osf = queue
        .tryTakeN(Some(lim))
        .flatMap((os) {
          if (os.isEmpty) {
            return (
              queue.take(),
              queue.tryTakeN(Some(lim - 1)),
            ).mapN((elem, elems) => elems.prepended(elem));
          } else {
            return IO.pure(os);
          }
        })
        .map((l) => Chunk.from(l));

    return Rill.eval(osf).flatMap(Rill.chunk).repeat();
  } else {
    return Rill.repeatEval(queue.take());
  }
}

fromQueueUnterminatedChunk()

Rill<O> fromQueueUnterminatedChunk<O>(Queue<Chunk<O>> queue, {int? limit})
Implementation
dart
static Rill<O> fromQueueUnterminatedChunk<O>(
  Queue<Chunk<O>> queue, {
  int? limit,
}) => _fromQueueNoneUnterminatedChunk(
  queue.take().map((chunk) => Some(chunk)),
  queue.tryTake().map((chunkOpt) => chunkOpt.map((chunk) => Some(chunk))),
  limit: limit,
);

fromRIterator()

Rill<O> fromRIterator<O>(RIterator<O> it, {int chunkSize = 64})
Implementation
dart
static Rill<O> fromRIterator<O>(RIterator<O> it, {int chunkSize = 64}) {
  assert(chunkSize > 0, 'Rill.fromRIterator: chunkSize must be positive');
  IO<Option<(Chunk<O>, RIterator<O>)>> getNextChunk(RIterator<O> i) {
    return IO.delay(() {
      final bldr = <O>[];
      int cnt = 0;

      while (cnt < chunkSize && i.hasNext) {
        bldr.add(i.next());
        cnt += 1;
      }

      return cnt == 0 ? const None() : Some((Chunk.fromList(bldr), i));
    });
  }

  return Rill.unfoldChunkEval(it, getNextChunk);
}

fromStream()

Rill<O> fromStream<O>(
  Stream<O> stream, {
  OverflowStrategy strategy = const _DropOldest(100),
})
Implementation
dart
static Rill<O> fromStream<O>(
  Stream<O> stream, {
  OverflowStrategy strategy = const _DropOldest(100),
}) {
  final IO<Queue<Either<Object, Option<O>>>> createQueue = switch (strategy) {
    _DropNewest(:final bufferSize) => Queue.dropping(bufferSize),
    _DropOldest(:final bufferSize) => Queue.circularBuffer(bufferSize),
    _Unbounded() => Queue.unbounded(),
  };

  return Rill.eval(createQueue).flatMap((queue) {
    Rill<O> consumeStreamQueue() {
      return Rill.eval(queue.take()).flatMap((event) {
        return event.fold(
          (err) => Rill.raiseError(err),
          (opt) => opt.fold(
            () => Rill.empty(),
            (o) => Rill.emit(o).append(() => consumeStreamQueue()),
          ),
        );
      });
    }

    final acquire = IO.delay(() {
      return stream.listen(
        (data) => queue.offer(Right(Some(data))).unsafeRunAndForget(),
        onError: (Object err) => queue.offer(Left(err)).unsafeRunAndForget(),
        onDone: () => queue.offer(Right(none())).unsafeRunAndForget(),
        cancelOnError: false, &#47;&#47; handle manually
      );
    });

    IO<Unit> release(StreamSubscription<O> sub) => IO.fromFutureF(() => sub.cancel()).voided();

    return Rill.bracket(acquire, release).flatMap((_) => consumeStreamQueue());
  });
}

iterate()

Rill<O> iterate<O>(O start, O Function(O) f, {int chunkSize = 64})
Implementation
dart
static Rill<O> iterate<O>(
  O start,
  Function1<O, O> f, {
  int chunkSize = 64,
}) {
  if (chunkSize <= 0) throw ArgumentError('Rill.iterate: chunkSize must be positive.');

  Pull<O, Unit> loop(O current) {
    final elements = <O>[];
    O nextVal = current;

    for (var i = 0; i < chunkSize; i++) {
      elements.add(nextVal);
      nextVal = f(nextVal);
    }

    return Pull.output(Chunk.fromList(elements)).append(() => loop(nextVal));
  }

  return loop(start).rillNoScope;
}

iterateEval()

Rill<O> iterateEval<O>(O start, IO<O> Function(O) f)
Implementation
dart
static Rill<O> iterateEval<O>(O start, Function1<O, IO<O>> f) =>
    Rill.emit(start).append(() => Rill.eval(f(start)).flatMap((o) => iterateEval(o, f)));

pure()

Rill<O2> pure<O>(O value)
Implementation
dart
static Rill<O> pure<O>(O value) => Rill._noScope(Pull.output1(value));

raiseError()

Rill<O> raiseError<O>(Object error, [StackTrace? stackTrace])
Implementation
dart
static Rill<O> raiseError<O>(Object error, [StackTrace? stackTrace]) =>
    Pull.raiseError(error, stackTrace).rillNoScope;

range()

Rill<int> range(
  int start,
  int stopExclusive, {
  int step = 1,
  int chunkSize = 64,
})
Implementation
dart
static Rill<int> range(
  int start,
  int stopExclusive, {
  int step = 1,
  int chunkSize = 64,
}) {
  assert(chunkSize > 0, 'chunkSize must be positive');
  if (step == 0) {
    return Rill.empty();
  } else {
    Pull<int, Unit> loop(int current) {
      &#47;&#47; Termination check (direction is constant across iterations)
      if (step > 0 && current >= stopExclusive) {
        return Pull.done;
      } else if (step < 0 && current <= stopExclusive) {
        return Pull.done;
      } else {
        &#47;&#47; Compute exact count for this chunk
        final available =
            step > 0
                ? (stopExclusive - current + step - 1) ~&#47; step
                : (current - stopExclusive + (-step) - 1) ~&#47; (-step);

        final count = available < chunkSize ? available : chunkSize;

        return Pull.output(
          Chunk.tabulate(count, (j) => current + j * step),
        ).append(() => loop(current + count * step));
      }
    }

    return loop(start).rillNoScope;
  }
}

repeatEval()

Rill<O> repeatEval<O>(IO<O> fo)
Implementation
dart
static Rill<O> repeatEval<O>(IO<O> fo) => Rill.eval(fo).repeat();

resource()

Rill<O> resource<O>(Resource<O> r)
Implementation
dart
static Rill<O> resource<O>(Resource<O> r) {
  return switch (r) {
    Pure(:final value) => Rill.emit(value),
    Eval(:final task) => Rill.eval(task),
    Bind(:final source, :final f) => Rill.resource(source).flatMap((o) => Rill.resource(f(o))),
    Allocate(:final resource) => Rill.bracketFull(
      (poll) => resource(poll),
      (resAndRelease, exitCase) => resAndRelease.$2(exitCase),
    )._mapNoScope((x) => x.$1),
    _ => throw StateError('Unhandled Resource ADT: $r'),
  }.scope;
}

retry()

Rill<O> retry<O>(
  IO<O> fo,
  Duration delay,
  Duration Function(Duration) nextDelay,
  int maxAttempts, {
  (bool Function(Object))? retriable,
})
Implementation
dart
static Rill<O> retry<O>(
  IO<O> fo,
  Duration delay,
  Function1<Duration, Duration> nextDelay,
  int maxAttempts, {
  Function1<Object, bool>? retriable,
}) {
  final delays = Rill.unfold(delay, (d) => Some((d, nextDelay(d))));
  final canRetry = retriable ?? (_) => true;

  return Rill.eval(fo)
      .attempts(delays)
      .take(maxAttempts)
      .takeThrough((r) => r.fold((err) => canRetry(err), (_) => false))
      .last
      .unNone
      .rethrowError;
}

sleep()

Rill<Unit> sleep(Duration duration)
Implementation
dart
static Rill<Unit> sleep(Duration duration) => Rill.eval(IO.sleep(duration));

sleep_()

Rill<O> sleep_<O>(Duration duration)
Implementation
dart
static Rill<O> sleep_<O>(Duration duration) => Rill.exec(IO.sleep(duration));

supervise()

Rill<IOFiber<O>> supervise<O>(IO<O> fo)
Implementation
dart
static Rill<IOFiber<O>> supervise<O>(IO<O> fo) =>
    Rill.bracket(fo.start(), (fiber) => fiber.cancel());

suspend()

Rill<O> suspend<O>(Rill<O> Function() f)
Implementation
dart
static Rill<O> suspend<O>(Function0<Rill<O>> f) => Pull.suspend(() => f().underlying).rillNoScope;

unfold()

Rill<O> unfold<S, O>(S s, Option<Record> Function(S) f, {int chunkSize = 64})
Implementation
dart
static Rill<O> unfold<S, O>(
  S s,
  Function1<S, Option<(O, S)>> f, {
  int chunkSize = 64,
}) {
  if (chunkSize <= 0) throw ArgumentError('Rill.unfold: chunkSize must be positive.');

  Pull<O, Unit> loop(S currentState) {
    final elements = <O>[];
    S nextState = currentState;
    bool isDone = false;

    for (int i = 0; i < chunkSize; i++) {
      final result = f(nextState);

      result.foldN(
        () => isDone = true,
        (elem, newState) {
          elements.add(elem);
          nextState = newState;
        },
      );

      if (isDone) break;
    }

    if (elements.isEmpty) {
      return Pull.done;
    } else if (isDone) {
      return Pull.output(Chunk.fromList(elements));
    } else {
      return Pull.output(Chunk.fromList(elements)).append(() => loop(nextState));
    }
  }

  return loop(s).rillNoScope;
}

unfoldChunk()

Rill<O> unfoldChunk<S, O>(
  S s,
  Option<Record> Function(S) f, {
  int chunkSize = 64,
})
Implementation
dart
static Rill<O> unfoldChunk<S, O>(
  S s,
  Function1<S, Option<(Chunk<O>, S)>> f, {
  int chunkSize = 64,
}) => unfold(s, f, chunkSize: chunkSize).flatMap(Rill.chunk);

unfoldChunkEval()

Rill<O> unfoldChunkEval<S, O>(S s, IO<Option<Record>> Function(S) f)
Implementation
dart
static Rill<O> unfoldChunkEval<S, O>(S s, Function1<S, IO<Option<(Chunk<O>, S)>>> f) {
  Pull<O, Unit> loop(S currentState) {
    return Pull.eval(f(currentState)).flatMap((opt) {
      return opt.foldN(
        () => Pull.done,
        (elems, nextState) => Pull.output(elems).append(() => loop(nextState)),
      );
    });
  }

  return loop(s).rillNoScope;
}

unfoldEval()

Rill<O> unfoldEval<S, O>(S s, IO<Option<Record>> Function(S) f)
Implementation
dart
static Rill<O> unfoldEval<S, O>(S s, Function1<S, IO<Option<(O, S)>>> f) {
  Pull<O, Unit> loop(S currentState) {
    return Pull.eval(f(currentState)).flatMap((opt) {
      return opt.foldN(
        () => Pull.done,
        (elem, nextState) => Pull.output1(elem).append(() => loop(nextState)),
      );
    });
  }

  return loop(s).rillNoScope;
}