Rill<O>
class Rill<O>Available Extensions
Properties
compile no setter
RillCompile<O> get compileImplementation
RillCompile<O> get compile => RillCompile(underlying);distinct no setter
Rill<O> get distinctWARN: For long streams and/or large elements, this can be a memory hog. Use with caution;
Implementation
Rill<O> get distinct {
return scanChunksOpt(ISet.empty<O>(), (seen) {
return Some((chunk) {
return (seen.concat(chunk), chunk);
});
});
}dropLast no setter
Rill<O> get dropLastImplementation
Rill<O> get dropLast => dropLastIf((_) => true);hashCode no setter inherited
int get hashCodeThe hash code for this object.
A hash code is a single integer which represents the state of the object that affects operator == comparisons.
All objects have hash codes. The default hash code implemented by Object represents only the identity of the object, the same way as the default operator == implementation only considers objects equal if they are identical (see identityHashCode).
If operator == is overridden to use the object state instead, the hash code must also be changed to represent that state, otherwise the object cannot be used in hash based data structures like the default Set and Map implementations.
Hash codes must be the same for objects that are equal to each other according to operator ==. The hash code of an object should only change if the object changes in a way that affects equality. There are no further requirements for the hash codes. They need not be consistent between executions of the same program and there are no distribution guarantees.
Objects that are not equal are allowed to have the same hash code. It is even technically allowed that all instances have the same hash code, but if clashes happen too often, it may reduce the efficiency of hash-based data structures like HashSet or HashMap.
If a subclass overrides hashCode, it should override the operator == operator as well to maintain consistency.
Inherited from Object.
Implementation
external int get hashCode;head no setter
Rill<O> get headImplementation
Rill<O> get head => take(1);last no setter
Implementation
Rill<Option<O>> get last => pull.last.flatMap(Pull.output1).rillNoScope;mask no setter
Rill<O> get maskImplementation
Rill<O> get mask => handleErrorWith((_) => Rill.empty());pull no setter
ToPull<O> get pullAccess the Pull API for this Rill.
Implementation
ToPull<O> get pull => ToPull(this);runtimeType no setter inherited
Type get runtimeTypeA representation of the runtime type of the object.
Inherited from Object.
Implementation
external Type get runtimeType;scope no setter
Rill<O> get scopeImplementation
Rill<O> get scope => Pull.scope(underlying).rillNoScope;tail no setter
Rill<O> get tailImplementation
Rill<O> get tail => drop(1);underlying final
Implementation
final Pull<O, Unit> underlying;Extension Properties
rethrowError extension no setter
Rill<A> get rethrowErrorAvailable on Rill<O>, provided by the RethrowOps<A> extension
Implementation
Rill<A> get rethrowError {
return chunks().flatMap((c) {
Option<Object> errOpt = none();
final size = c.size;
var i = 0;
final bldr = <A>[];
while (i < size && errOpt.isEmpty) {
c[i].fold(
(ex) => errOpt = Some(ex),
(o) {
bldr.add(o);
i++;
},
);
}
final chunk = Chunk.fromList(bldr);
return Rill.chunk(chunk).append(
() => errOpt.fold(
() => Rill.empty(),
(err) => Rill.raiseError(err),
),
);
});
}unchunks extension no setter
Rill<A> get unchunksAvailable on Rill<O>, provided by the RillChunkOps<A> extension
Implementation
Rill<A> get unchunks => underlying.flatMapOutput((c) => Pull.output(c)).rillNoScope;unNone extension no setter
Rill<A> get unNoneAvailable on Rill<O>, provided by the RillOptionOps<A> extension
Implementation
Rill<A> get unNone => collect(identity);unNoneTerminate extension no setter
Rill<A> get unNoneTerminateAvailable on Rill<O>, provided by the RillOptionOps<A> extension
Implementation
Rill<A> get unNoneTerminate {
Pull<A, Unit> loop(Pull<Option<A>, Unit> p) {
return p.uncons.flatMap((hdtl) {
return hdtl.foldN(
() => Pull.done,
(hd, tl) => hd
.indexWhere((opt) => opt.isEmpty)
.fold(
() => Pull.output(hd.map(_getopt)).append(() => loop(tl)),
(idx) => idx == 0 ? Pull.done : Pull.output(hd.take(idx).map(_getopt)),
),
);
});
}
return loop(underlying).rillNoScope;
}Methods
andWait()
Rill<O> andWait(Duration duration)Implementation
Rill<O> andWait(Duration duration) => append(() => Rill.sleep_<O>(duration));append()
Implementation
Rill<O> append(Function0<Rill<O>> s2) => underlying.append(() => s2().underlying).rillNoScope;as()
Rill<O2> as<O2>(O2 o2)Implementation
Rill<O2> as<O2>(O2 o2) => map((_) => o2);attempt()
Implementation
Rill<Either<Object, O>> attempt() =>
map((o) => o.asRight<Object>()).handleErrorWith((err) => Rill.emit(Left<Object, O>(err)));attempts()
Implementation
Rill<Either<Object, O>> attempts(Rill<Duration> delays) => attempt().append(
() => delays.flatMap((delay) => Rill.sleep(delay).flatMap((_) => attempt())),
);broadcastThrough()
Implementation
Rill<O2> broadcastThrough<O2>(IList<Pipe<O, O2>> pipes) {
final rillF = (
Topic.create<Chunk<O>>(),
CountDownLatch.create(pipes.size),
).mapN((topic, allReady) {
final checkIn = allReady.release().productR(allReady.await());
Rill<O2> dump(Pipe<O, O2> pipe) {
return Rill.resource(topic.subscribeAwait(1)).flatMap((sub) {
return Rill.exec<O2>(checkIn).append(() => pipe(sub.unchunks));
});
}
final dumpAll = Rill.emits(pipes.toList()).map(dump).parJoinUnbounded();
final pump = Rill.exec<Never>(allReady.await()).append(() => topic.publish(chunks()));
return dumpAll.concurrently(pump);
});
return Rill.force(rillF);
}buffer()
Rill<O> buffer(int n)Implementation
Rill<O> buffer(int n) {
if (n <= 0) {
return this;
} else {
return repeatPull((tp) {
return tp.unconsN(n, allowFewer: true).flatMap((hdtl) {
return hdtl.foldN(
() => Pull.pure(none()),
(hd, tl) => Pull.output(hd).as(Some(tl)),
);
});
});
}
}changes()
Rill<O> changes({(bool Function(O, O))? eq})Implementation
Rill<O> changes({Function2<O, O, bool>? eq}) => filterWithPrevious(eq ?? (a, b) => a != b);changesBy()
Rill<O> changesBy<O2>(O2 Function(O) f)Implementation
Rill<O> changesBy<O2>(Function1<O, O2> f) => filterWithPrevious((a, b) => f(a) != f(b));changesWith()
Rill<O> changesWith(bool Function(O, O) f)Implementation
Rill<O> changesWith(Function2<O, O, bool> f) => filterWithPrevious((a, b) => f(a, b));chunkAll()
Implementation
Rill<Chunk<O>> chunkAll() {
Pull<Chunk<O>, Unit> loop(Rill<O> s, Chunk<O> acc) {
return s.pull.uncons.flatMap((hdtl) {
return hdtl.foldN(
() => Pull.output1(acc),
(hd, tl) => loop(tl, acc.concat(hd)),
);
});
}
return loop(this, Chunk.empty()).rillNoScope;
}chunkLimit()
Implementation
Rill<Chunk<O>> chunkLimit(int n) {
Pull<Chunk<O>, Unit> breakup(Chunk<O> ch) {
if (ch.size <= n) {
return Pull.output1(ch);
} else {
final (pre, rest) = ch.splitAt(n);
return Pull.output1(pre).append(() => breakup(rest));
}
}
return underlying.unconsFlatMap(breakup).rillNoScope;
}chunkMin()
Implementation
Rill<Chunk<O>> chunkMin(int n, {bool allowFewerTotal = true}) => repeatPull((p) {
return p.unconsMin(n, allowFewerTotal: allowFewerTotal).flatMap((hdtl) {
return hdtl.foldN(
() => Pull.pure(none()),
(hd, tl) => Pull.output1(hd).as(Some(tl)),
);
});
});chunkN()
Implementation
Rill<Chunk<O>> chunkN(int n, {bool allowFewer = true}) {
assert(n > 0, 'n must be positive');
return repeatPull((tp) {
return tp.unconsN(n, allowFewer: allowFewer).flatMap((hdtl) {
return hdtl.foldN(
() => Pull.pure(none()),
(hd, tl) => Pull.output1(hd).as(Some(tl)),
);
});
});
}chunks()
Implementation
Rill<Chunk<O>> chunks() => underlying.unconsFlatMap(Pull.output1).rillNoScope;collect()
Implementation
Rill<O2> collect<O2>(Function1<O, Option<O2>> f) => mapChunks((c) => c.collect(f));collectFirst()
Implementation
Rill<O2> collectFirst<O2>(Function1<O, Option<O2>> f) => collect(f).take(1);collectWhile()
Implementation
Rill<O2> collectWhile<O2>(Function1<O, Option<O2>> f) =>
map(f).takeWhile((b) => b.isDefined).unNone;concurrently()
Implementation
Rill<O> concurrently<O2>(Rill<O2> that) {
return _concurrentlyAux(that).flatMapN((startBack, fore) => startBack.flatMap((_) => fore));
}cons()
Implementation
Rill<O> cons(Chunk<O> c) => c.isEmpty ? this : Rill.chunk(c).append(() => this);cons1()
Rill<O> cons1(O o)Implementation
Rill<O> cons1(O o) => Rill.emit(o).append(() => this);debounce()
Rill<O> debounce(Duration d)Implementation
Rill<O> debounce(Duration d) {
final rillF = Channel.bounded<O>(1).flatMap((chan) {
return IO.ref(none<O>()).map((ref) {
final sendLatest = ref.getAndSet(const None()).flatMap((opt) => opt.traverseIO_(chan.send));
IO<Unit> sendItem(O o) => ref.getAndSet(Some(o)).flatMap((prev) {
if (prev.isEmpty) {
return IO.sleep(d).productR(sendLatest).start().voided();
} else {
return IO.unit;
}
});
Pull<Never, Unit> go(Pull<O, Unit> pull) {
return pull.uncons.flatMap((hdtl) {
return hdtl.foldN(
() => Pull.eval(sendLatest.productR(chan.close()).voided()),
(hd, tl) => Pull.eval(sendItem(hd.last)).append(() => go(tl)),
);
});
}
final debouncedSend = go(underlying).rillNoScope;
return chan.rill.concurrently(debouncedSend);
});
});
return Rill.force(rillF);
}delayBy()
Rill<O> delayBy(Duration duration)Implementation
Rill<O> delayBy(Duration duration) => Rill.sleep_<O>(duration).append(() => this);delete()
Rill<O> delete(bool Function(O) p)Implementation
Rill<O> delete(Function1<O, bool> p) =>
pull
.takeWhile((o) => !p(o))
.flatMap((r) => r.fold(() => Pull.done, (s) => s.drop(1).pull.echo))
.rillNoScope;drain()
Rill<Never> drain()Implementation
Rill<Never> drain() => underlying.unconsFlatMap((_) => Pull.done).rillNoScope;drop()
Rill<O> drop(int n)Implementation
Rill<O> drop(int n) =>
pull
.drop(n)
.flatMap((opt) => opt.fold(() => Pull.done, (rest) => rest.pull.echo))
.rillNoScope;dropLastIf()
Rill<O> dropLastIf(bool Function(O) p)Implementation
Rill<O> dropLastIf(Function1<O, bool> p) {
Pull<O, Unit> go(Chunk<O> last, Rill<O> s) {
return s.pull.uncons.flatMap((hdtl) {
return hdtl.foldN(
() {
final o = last[last.size - 1];
if (p(o)) {
final (prefix, _) = last.splitAt(last.size - 1);
return Pull.output(prefix);
} else {
return Pull.output(last);
}
},
(hd, tl) => Pull.output(last).append(() => go(hd, tl)),
);
});
}
return pull.uncons.flatMap((hdtl) {
return hdtl.foldN(
() => Pull.done,
(hd, tl) => go(hd, tl),
);
}).rillNoScope;
}dropRight()
Rill<O> dropRight(int n)Implementation
Rill<O> dropRight(int n) {
if (n <= 0) {
return this;
} else {
Pull<O, Unit> go(Chunk<O> acc, Rill<O> s) {
return s.pull.uncons.flatMap((hdtl) {
return hdtl.foldN(
() => Pull.done,
(hd, tl) {
final all = acc.concat(hd);
return Pull.output(all.dropRight(n)).append(() => go(all.takeRight(n), tl));
},
);
});
}
return go(Chunk.empty(), this).rillNoScope;
}
}dropThrough()
Rill<O> dropThrough(bool Function(O) p)Implementation
Rill<O> dropThrough(Function1<O, bool> p) =>
pull
.dropThrough(p)
.flatMap((tl) => tl.map((tl) => tl.pull.echo).getOrElse(() => Pull.done))
.rillNoScope;dropWhile()
Rill<O> dropWhile(bool Function(O) p)Implementation
Rill<O> dropWhile(Function1<O, bool> p) =>
pull
.dropWhile(p)
.flatMap((tl) => tl.map((tl) => tl.pull.echo).getOrElse(() => Pull.done))
.rillNoScope;either()
Implementation
Rill<Either<O, O2>> either<O2>(Rill<O2> that) =>
map((o) => o.asLeft<O2>()).merge(that.map((o2) => o2.asRight<O>()));evalFilter()
Implementation
Rill<O> evalFilter(Function1<O, IO<bool>> p) =>
underlying
.flatMapOutput(
(o) => Pull.eval(p(o)).flatMap((pass) => pass ? Pull.output1(o) : Pull.done),
)
.rillNoScope;evalFilterNot()
Implementation
Rill<O> evalFilterNot(Function1<O, IO<bool>> p) =>
flatMap((o) => Rill.eval(p(o)).ifM(() => Rill.empty(), () => Rill.emit(o)));evalFold()
Implementation
Rill<O2> evalFold<O2>(O2 z, Function2<O2, O, IO<O2>> f) {
Pull<O2, Unit> go(O2 z, Rill<O> r) {
return r.pull.uncons1.flatMap((hdtl) {
return hdtl.foldN(
() => Pull.output1(z),
(hd, tl) => Pull.eval(f(z, hd)).flatMap((ns) => go(ns, tl)),
);
});
}
return go(z, this).rillNoScope;
}evalMap()
Implementation
Rill<O2> evalMap<O2>(Function1<O, IO<O2>> f) =>
underlying.flatMapOutput((o) => Pull.eval(f(o)).flatMap(Pull.output1)).rillNoScope;evalMapFilter()
Implementation
Rill<O> evalMapFilter(Function1<O, IO<Option<O>>> f) =>
underlying
.flatMapOutput((o) => Pull.eval(f(o)).flatMap((opt) => Pull.outputOption1(opt)))
.rillNoScope;evalScan()
Implementation
Rill<O2> evalScan<O2>(O2 z, Function2<O2, O, IO<O2>> f) {
Pull<O2, Unit> go(O2 z, Rill<O> s) {
return s.pull.uncons1.flatMap((hdtl) {
return hdtl.foldN(
() => Pull.done,
(hd, tl) => Pull.eval(f(z, hd)).flatMap((o) => Pull.output1(o).append(() => go(o, tl))),
);
});
}
return Pull.output1(z).append(() => go(z, this)).rillNoScope;
}evalTap()
Implementation
Rill<O> evalTap<O2>(Function1<O, IO<O2>> f) =>
underlying.flatMapOutput((o) => Pull.eval(f(o)).flatMap((_) => Pull.output1(o))).rillNoScope;exists()
Rill<bool> exists(bool Function(O) p)Implementation
Rill<bool> exists(Function1<O, bool> p) =>
pull.forall((o) => !p(o)).flatMap((r) => Pull.output1(!r)).rillNoScope;filter()
Rill<O> filter(bool Function(O) p)Implementation
Rill<O> filter(Function1<O, bool> p) => mapChunks((c) => c.filter(p));filterNot()
Rill<O> filterNot(bool Function(O) p)Implementation
Rill<O> filterNot(Function1<O, bool> p) => mapChunks((c) => c.filterNot(p));filterWithPrevious()
Rill<O> filterWithPrevious(bool Function(O, O) p)Implementation
Rill<O> filterWithPrevious(Function2<O, O, bool> p) {
Pull<O, Unit> go(O last, Rill<O> s) {
return s.pull.uncons.flatMap((hdtl) {
return hdtl.foldN(
() => Pull.done,
(hd, tl) {
// can it be emitted unmodified?
final (allPass, newLast) = hd.foldLeft((
true,
last,
), (acc, a) => (acc.$1 && p(acc.$2, a), a));
if (allPass) {
return Pull.output(hd).append(() => go(newLast, tl));
} else {
final (acc, newLast) = hd.foldLeft((IVector.empty<O>(), last), (acc, a) {
if (p(acc.$2, a)) {
return (acc.$1.appended(a), a);
} else {
return (acc.$1, acc.$2);
}
});
return Pull.output(Chunk.from(acc)).append(() => go(newLast, tl));
}
},
);
});
}
return pull.uncons1.flatMap((hdtl) {
return hdtl.foldN(
() => Pull.done,
(hd, tl) => Pull.output1(hd).append(() => go(hd, tl)),
);
}).rillNoScope;
}flatMap()
Implementation
Rill<O2> flatMap<O2>(Function1<O, Rill<O2>> f) =>
underlying.flatMapOutput((o) => f(o).underlying).rillNoScope;fold()
Rill<O2> fold<O2>(O2 z, O2 Function(O2, O) f)Implementation
Rill<O2> fold<O2>(O2 z, Function2<O2, O, O2> f) =>
pull.fold(z, f).flatMap(Pull.output1).rillNoScope;fold1()
Rill<O> fold1(O Function(O, O) f)Implementation
Rill<O> fold1(Function2<O, O, O> f) =>
pull.fold1(f).flatMap((opt) => opt.map(Pull.output1).getOrElse(() => Pull.done)).rillNoScope;forall()
Rill<bool> forall(bool Function(O) p)Implementation
Rill<bool> forall(Function1<O, bool> p) =>
pull.forall(p).flatMap((res) => Pull.output1(res)).rillNoScope;foreach()
Implementation
Rill<Never> foreach(Function1<O, IO<Unit>> f) =>
underlying.flatMapOutput((o) => Pull.eval(f(o))).rillNoScope;groupAdjacentBy()
Rill<Record> groupAdjacentBy<O2>(O2 Function(O) f)Implementation
Rill<(O2, Chunk<O>)> groupAdjacentBy<O2>(Function1<O, O2> f) =>
groupAdjacentByLimit(Integer.maxValue, f);groupAdjacentByLimit()
Rill<Record> groupAdjacentByLimit<O2>(int limit, O2 Function(O) f)Implementation
Rill<(O2, Chunk<O>)> groupAdjacentByLimit<O2>(int limit, Function1<O, O2> f) {
Pull<(O2, Chunk<O>), Unit> go(Option<(O2, Chunk<O>)> current, Rill<O> s) {
Pull<(O2, Chunk<O>), Unit> doChunk(
Chunk<O> chunk,
Rill<O> s,
O2 k1,
Chunk<O> out,
IQueue<(O2, Chunk<O>)> acc,
) {
final differsAt = chunk.indexWhere((v) => f(v) != k1).getOrElse(() => -1);
if (differsAt == -1) {
// whole chunk matches the current key, add this chunk to the accumulated output
if (out.size + chunk.size < limit) {
final newCurrent = Some((k1, out.concat(chunk)));
return Pull.output(Chunk.from(acc)).append(() => go(newCurrent, s));
} else {
final (prefix, suffix) = chunk.splitAt(limit - out.size);
return Pull.output(
Chunk.from(acc.appended((k1, out.concat(prefix)))),
).append(() => go(Some((k1, suffix)), s));
}
} else {
// at least part of this chunk does not match the current key, need to group and retain chunkiness
// split the chunk into the bit where the keys match and the bit where they don't
final matching = chunk.take(differsAt);
late IQueue<(O2, Chunk<O>)> newAcc;
final newOutSize = out.size + matching.size;
if (newOutSize == 0) {
newAcc = acc;
} else if (newOutSize > limit) {
final (prefix, suffix) = matching.splitAt(limit - out.size);
newAcc = acc.appended((k1, out.concat(prefix))).appended((k1, suffix));
} else {
newAcc = acc.appended((k1, out.concat(matching)));
}
final nonMatching = chunk.drop(differsAt);
// nonMatching is guaranteed to be non-empty here, because we know the last element of the chunk doesn't have
// the same key as the first
final k2 = f(nonMatching[0]);
return doChunk(nonMatching, s, k2, Chunk.empty(), newAcc);
}
}
return s.pull.unconsLimit(limit).flatMap((hdtl) {
return hdtl.foldN(
() => current
.mapN(
(k1, out) => out.size == 0 ? Pull.done : Pull.output1((k1, out)),
)
.getOrElse(() => Pull.done),
(hd, tl) {
final (k1, out) = current.getOrElse(() => (f(hd[0]), Chunk.empty()));
return doChunk(hd, tl, k1, out, IQueue.empty());
},
);
});
}
return go(none(), this).rillNoScope;
}groupWithin()
Implementation
Rill<Chunk<O>> groupWithin(int chunkSize, Duration timeout) {
if (chunkSize <= 0) throw ArgumentError('Rill.groupWithin: chunkSize must be > 0');
return Rill.eval(Queue.unbounded<Option<Chunk<O>>>()).flatMap((queue) {
// Producer: runs in background pushing chunks to the queue
final producer = chunks()
.map((c) => Some(c))
.evalMap(queue.offer)
.compile
.drain
.guarantee(queue.offer(const None()));
// Consumer: buffers data and races agaist timeout
//
// [deadline]: Monotonic time (micros) the current buffer MUST be emitted or null for empty buffer
Pull<Chunk<O>, Unit> consumeLoop(Chunk<O> buffer, int? deadline) {
if (buffer.size >= chunkSize) {
final (toEmit, remainder) = buffer.splitAt(chunkSize);
if (remainder.isEmpty) {
return Pull.output1(toEmit).append(() => consumeLoop(Chunk.empty(), null));
} else {
// set new deadline
return Pull.eval(IO.now).flatMap((now) {
return Pull.output1(toEmit).append(() {
return consumeLoop(remainder, now.microsecondsSinceEpoch + timeout.inMicroseconds);
});
});
}
} else {
// Buffer isn't full so wait for data or timeout
if (buffer.isEmpty) {
// no buffered data, don't care about time. wait for next chunk
return Pull.eval(queue.take()).flatMap((opt) {
return opt.fold(
() => Pull.done,
(chunk) {
// data has arrived, start the clock
return Pull.eval(IO.now).flatMap((now) {
return consumeLoop(chunk, now.microsecondsSinceEpoch + timeout.inMicroseconds);
});
},
);
});
} else {
// buffer has data, race queue againt the clock
return Pull.eval(IO.now).flatMap((now) {
final remainingMicros = deadline! - now.microsecondsSinceEpoch;
if (remainingMicros <= 0) {
// timeout reached, emit whatever is buffered
return Pull.output1(buffer).append(() => consumeLoop(Chunk.empty(), null));
} else {
final waitOp = IO.sleep(Duration(microseconds: remainingMicros));
final takeOp = queue.take();
return Pull.eval(IO.race(takeOp, waitOp)).flatMap((raceResult) {
return raceResult.fold(
(takeResult) {
return takeResult.fold(
() => Pull.output1(buffer), // producer finished
(newChunk) => consumeLoop(buffer.concat(newChunk), deadline),
);
},
(_) => Pull.output1(buffer).append(() => consumeLoop(Chunk.empty(), null)),
);
});
}
});
}
}
}
return Rill.bracket(
producer.start(),
(fiber) => fiber.cancel(),
).flatMap((_) => consumeLoop(Chunk.empty(), null).rill);
});
}handleErrorWith()
Implementation
Rill<O> handleErrorWith(Function1<Object, Rill<O>> f) =>
underlying.handleErrorWith((err) => f(err).underlying).rillNoScope;holdResource()
Implementation
Resource<Signal<O>> holdResource(O initial) => Resource.eval(
SignallingRef.of(initial),
).flatTap((sig) => foreach((n) => sig.setValue(n)).compile.drain.background());ifEmpty()
Implementation
Rill<O> ifEmpty(Function0<Rill<O>> fallback) =>
pull.uncons.flatMap((hdtl) {
return hdtl.foldN(
() => fallback().underlying,
(hd, tl) => Pull.output(hd).append(() => tl.underlying),
);
}).rillNoScope;ifEmptyEmit()
Rill<O> ifEmptyEmit(O Function() o)Implementation
Rill<O> ifEmptyEmit(Function0<O> o) => ifEmpty(() => Rill.emit(o()));interleave()
Implementation
Rill<O> interleave(Rill<O> that) => zip(that).flatMap((t) => Rill.emits([t.$1, t.$2]));interleaveAll()
Implementation
Rill<O> interleaveAll(Rill<O> that) => map(
(o) => Option(o),
).zipAll<Option<O>>(that.map((o) => Option(o)), none(), none()).flatMap((t) {
final (thisOpt, thatOpt) = t;
return Rill.chunk(Chunk.from(thisOpt)).append(() => Rill.chunk(Chunk.from(thatOpt)));
});interruptAfter()
Rill<O> interruptAfter(Duration duration)Implementation
Rill<O> interruptAfter(Duration duration) => interruptWhen(IO.sleep(duration));interruptWhen()
Implementation
Rill<O> interruptWhen<B>(IO<B> signal) {
return Rill.eval(IO.deferred<Unit>()).flatMap((stopEvent) {
final startSignalFiber = signal.attempt().flatMap((_) => stopEvent.complete(Unit())).start();
return Rill.eval(startSignalFiber).flatMap((fiber) {
final haltWhen = stopEvent.value().as(Unit().asRight<Object>());
return Pull.interruptWhen(underlying, haltWhen).rillNoScope.onFinalize(fiber.cancel());
});
});
}interruptWhenSignaled()
Implementation
Rill<O> interruptWhenSignaled(Signal<bool> signal) => interruptWhenTrue(signal.discrete);interruptWhenTrue()
Implementation
Rill<O> interruptWhenTrue(Rill<bool> haltWhenTrue) {
final rillF = (
IO.deferred<Unit>(),
IO.deferred<Unit>(),
IO.deferred<Either<Object, Unit>>(),
).mapN((
interruptL,
interruptR,
backResult,
) {
final watch =
haltWhenTrue.exists((x) => x).interruptWhen(interruptR.value().attempt()).compile.drain;
final wakeWatch =
watch.guaranteeCase((oc) {
final Either<Object, Unit> r = oc.fold(
() => Unit().asRight(),
(err, _) => err.asLeft(),
(_) => Unit().asRight(),
);
return backResult.complete(r).productR(interruptL.complete(Unit()).voided());
}).voidError();
final stopWatch = interruptR
.complete(Unit())
.productR(backResult.value().flatMap(IO.fromEither));
final backWatch = Rill.bracket(wakeWatch.start(), (_) => stopWatch);
return backWatch.flatMap((_) => interruptWhen(interruptL.value().attempt()));
});
return Rill.force(rillF);
}intersperse()
Rill<O> intersperse(O separator)Implementation
Rill<O> intersperse(O separator) {
Chunk<O> doChunk(Chunk<O> hd, bool isFirst) {
final bldr = <O>[];
final iter = hd.iterator;
if (isFirst) bldr.add(iter.next());
iter.foreach((o) {
bldr.add(separator);
bldr.add(o);
});
return Chunk.fromList(bldr);
}
Pull<O, Unit> go(Rill<O> str) {
return str.pull.uncons.flatMap((hdtl) {
return hdtl.foldN(
() => Pull.done,
(hd, tl) => Pull.output(doChunk(hd, false)).append(() => go(tl)),
);
});
}
return pull.uncons.flatMap((hdtl) {
return hdtl.foldN(
() => Pull.done,
(hd, tl) => Pull.output(doChunk(hd, true)).append(() => go(tl)),
);
}).rillNoScope;
}keepAlive()
Implementation
Rill<O> keepAlive(Duration maxIdle, IO<O> heartbeat) {
return Rill.eval(Queue.unbounded<Option<Chunk<O>>>()).flatMap((queue) {
final producer = chunks()
.map((c) => Some(c))
.evalMap(queue.offer)
.compile
.drain
.guarantee(queue.offer(none()));
Pull<O, Unit> consumeLoop() {
final takeOp = queue.take();
final timerOp = IO.sleep(maxIdle);
return Pull.eval(IO.race(takeOp, timerOp)).flatMap((raceResult) {
return raceResult.fold(
(queueResult) => queueResult.fold(
() => Pull.done,
(chunk) => Pull.output(chunk).append(consumeLoop),
),
(_) => Pull.eval(heartbeat).flatMap((o) => Pull.output1(o)).append(consumeLoop),
);
});
}
return Rill.bracket(
producer.start(),
(fiber) => fiber.cancel(),
).flatMap((_) => consumeLoop().rillNoScope);
});
}lastOr()
Rill<O> lastOr(O Function() fallback)Implementation
Rill<O> lastOr(Function0<O> fallback) =>
pull.last
.flatMap((o) => o.fold(() => Pull.output1(fallback()), (o) => Pull.output1(o)))
.rillNoScope;map()
Rill<O2> map<O2>(O2 Function(O) f)Implementation
Rill<O2> map<O2>(Function1<O, O2> f) =>
pull.echo.unconsFlatMap((hd) => Pull.output(hd.map(f))).rillNoScope;mapAccumulate()
Rill<Record> mapAccumulate<S, O2>(S initial, Record Function(S, O) f)Implementation
Rill<(S, O2)> mapAccumulate<S, O2>(S initial, Function2<S, O, (S, O2)> f) {
(S, (S, O2)) go(S s, O a) {
final (newS, newO) = f(s, a);
return (newS, (newS, newO));
}
return scanChunks(initial, (acc, c) => c.mapAccumulate(acc, go));
}mapAsync()
Alias for parEvalMap.
Implementation
Rill<O2> mapAsync<O2>(int maxConcurrent, Function1<O, IO<O2>> f) => parEvalMap(maxConcurrent, f);mapAsyncUnordered()
Alias for parEvalMapUnordered.
Implementation
Rill<O2> mapAsyncUnordered<O2>(int maxConcurrent, Function1<O, IO<O2>> f) =>
parEvalMapUnordered(maxConcurrent, f);mapChunks()
Implementation
Rill<O2> mapChunks<O2>(Function1<Chunk<O>, Chunk<O2>> f) =>
underlying.unconsFlatMap((hd) => Pull.output(f(hd))).rillNoScope;merge()
Implementation
Rill<O> merge(Rill<O> that) => _merge(that, (s, fin) => Rill.exec<O>(fin).append(() => s));mergeAndAwaitDownstream()
Implementation
Rill<O> mergeAndAwaitDownstream(Rill<O> that) => _merge(that, (s, fin) => s.onFinalize(fin));mergeHaltBoth()
Implementation
Rill<O> mergeHaltBoth(Rill<O> that) =>
noneTerminate().merge(that.noneTerminate()).unNoneTerminate;mergeHaltL()
Implementation
Rill<O> mergeHaltL(Rill<O> that) =>
noneTerminate().merge(that.map((o) => Option(o))).unNoneTerminate;mergeHaltR()
Implementation
Rill<O> mergeHaltR(Rill<O> that) => that.mergeHaltL(this);metered()
Rill<O> metered(Duration rate)Implementation
Rill<O> metered(Duration rate) => Rill.fixedRate(rate).zipRight(this);meteredStartImmediately()
Rill<O> meteredStartImmediately(Duration rate)Implementation
Rill<O> meteredStartImmediately(Duration rate) =>
Rill.fixedRateStartImmediately(rate).zipRight(this);noneTerminate()
Implementation
Rill<Option<O>> noneTerminate() => map((o) => Option(o)).append(() => Rill.emit(none()));noSuchMethod() inherited
dynamic noSuchMethod(Invocation invocation)Invoked when a nonexistent method or property is accessed.
A dynamic member invocation can attempt to call a member which doesn't exist on the receiving object. Example:
dynamic object = 1;
object.add(42); // Statically allowed, run-time errorThis invalid code will invoke the noSuchMethod method of the integer 1 with an Invocation representing the .add(42) call and arguments (which then throws).
Classes can override noSuchMethod to provide custom behavior for such invalid dynamic invocations.
A class with a non-default noSuchMethod invocation can also omit implementations for members of its interface. Example:
class MockList<T> implements List<T> {
noSuchMethod(Invocation invocation) {
log(invocation);
super.noSuchMethod(invocation); // Will throw.
}
}
void main() {
MockList().add(42);
}This code has no compile-time warnings or errors even though the MockList class has no concrete implementation of any of the List interface methods. Calls to List methods are forwarded to noSuchMethod, so this code will log an invocation similar to Invocation.method(#add, [42]) and then throw.
If a value is returned from noSuchMethod, it becomes the result of the original invocation. If the value is not of a type that can be returned by the original invocation, a type error occurs at the invocation.
The default behavior is to throw a NoSuchMethodError.
Inherited from Object.
Implementation
@pragma("vm:entry-point")
@pragma("wasm:entry-point")
external dynamic noSuchMethod(Invocation invocation);onComplete()
Implementation
Rill<O> onComplete(Function0<Rill<O>> s2) =>
handleErrorWith((e) => s2().append(() => Pull.fail(e).rillNoScope)).append(() => s2());onFinalize()
Implementation
Rill<O> onFinalize(IO<Unit> finalizer) => onFinalizeCase((_) => finalizer);onFinalizeCase()
Implementation
Rill<O> onFinalizeCase(Function1<ExitCase, IO<Unit>> finalizer) =>
Rill.bracketCase(IO.unit, (_, ec) => finalizer(ec)).flatMap((_) => this);parEvalMap()
Implementation
Rill<O2> parEvalMap<O2>(int maxConcurrent, Function1<O, IO<O2>> f) {
if (maxConcurrent == 1) {
return evalMap(f);
} else {
assert(maxConcurrent > 0, 'maxConcurrent must be > 0, was: $maxConcurrent');
// One is taken by inner stream read.
final concurrency = maxConcurrent == Integer.maxValue ? Integer.maxValue : maxConcurrent + 1;
final channelF = Channel.bounded<IO<Either<Object, O2>>>(concurrency);
return _parEvalMapImpl(concurrency, channelF, true, f);
}
}parEvalMapUnbounded()
Implementation
Rill<O2> parEvalMapUnbounded<O2>(Function1<O, IO<O2>> f) =>
_parEvalMapImpl(Integer.maxValue, Channel.unbounded(), true, f);parEvalMapUnordered()
Implementation
Rill<O2> parEvalMapUnordered<O2>(int maxConcurrent, Function1<O, IO<O2>> f) {
if (maxConcurrent == 1) {
return evalMap(f);
} else {
assert(maxConcurrent > 0, 'maxConcurrent must be > 0, was: $maxConcurrent');
// One is taken by inner stream read.
final concurrency = maxConcurrent == Integer.maxValue ? Integer.maxValue : maxConcurrent + 1;
final channelF = Channel.bounded<IO<Either<Object, O2>>>(concurrency);
return _parEvalMapImpl(concurrency, channelF, false, f);
}
}parEvalMapUnorderedUnbounded()
Implementation
Rill<O2> parEvalMapUnorderedUnbounded<O2>(Function1<O, IO<O2>> f) =>
_parEvalMapImpl(Integer.maxValue, Channel.unbounded(), false, f);pauseWhen()
Implementation
Rill<O> pauseWhen(Rill<bool> pauseWhenTrue) {
return Rill.eval(SignallingRef.of(false)).flatMap((pauseSignal) {
return pauseWhenSignal(
pauseSignal,
).mergeHaltBoth(pauseWhenTrue.foreach(pauseSignal.setValue));
});
}pauseWhenSignal()
Implementation
Rill<O> pauseWhenSignal(Signal<bool> pauseWhneTrue) {
final waitToResume = pauseWhneTrue.waitUntil((x) => !x);
final pauseIfNeeded = Rill.exec<O>(
pauseWhneTrue.value().flatMap((paused) => waitToResume.whenA(paused)),
);
return pauseIfNeeded.append(
() => chunks().flatMap((chunk) {
return Rill.chunk(chunk).append(() => pauseIfNeeded);
}),
);
}rechunkRandomly()
Rill<O> rechunkRandomly({
double minFactor = 0.1,
double maxFactor = 2.0,
int? seed,
})Implementation
Rill<O> rechunkRandomly({double minFactor = 0.1, double maxFactor = 2.0, int? seed}) {
if (minFactor <= 0 || maxFactor < minFactor) {
throw ArgumentError('Invalid rechunk factors. Ensure 0 < minFactor <= maxFactor');
}
return Rill.suspend(() {
final rng = math.Random(seed);
Pull<O, Unit> loop(Chunk<O> acc, Rill<O> s) {
return s.pull.uncons.flatMap((hdtl) {
return hdtl.foldN(
() => acc.isEmpty ? Pull.done : Pull.output(acc),
(hd, tl) {
final newSize =
math
.max(
1.0,
(acc.size + hd.size) *
(minFactor + (maxFactor - minFactor) * rng.nextDouble()),
)
.toInt();
final newAcc = acc.concat(hd);
if (newAcc.size < newSize) {
return loop(newAcc, tl);
} else {
final (toEmit, rest) = newAcc.splitAt(newSize);
return Pull.output(toEmit).append(() => loop(rest, tl));
}
},
);
});
}
return loop(Chunk.empty<O>(), this).rillNoScope;
});
}reduce()
Rill<O> reduce(O Function(O, O) f)Implementation
Rill<O> reduce(Function2<O, O, O> f) => fold1(f);repeat()
Rill<O> repeat()Implementation
Rill<O> repeat() => append(repeat);repeatN()
Rill<O> repeatN(int n)Implementation
Rill<O> repeatN(int n) => n > 0 ? append(() => repeatN(n - 1)) : Rill.empty();repeatPull()
Implementation
Rill<O2> repeatPull<O2>(Function1<ToPull<O>, Pull<O2, Option<Rill<O>>>> f) {
Pull<O2, Unit> go(ToPull<O> tp) {
return f(tp).flatMap((tail) {
return tail.fold(() => Pull.done, (tail) => go(tail.pull));
});
}
return go(pull).rillNoScope;
}scan()
Rill<O2> scan<O2>(O2 z, O2 Function(O2, O) f)Implementation
Rill<O2> scan<O2>(O2 z, Function2<O2, O, O2> f) =>
Pull.output1(z).append(() => _scan(z, f)).rillNoScope;scan1()
Rill<O> scan1(O Function(O, O) f)Implementation
Rill<O> scan1(Function2<O, O, O> f) =>
pull.uncons.flatMap((hdtl) {
return hdtl.foldN(
() => Pull.done,
(hd, tl) {
final (pre, post) = hd.splitAt(1);
return Pull.output(pre).append(() => tl.cons(post)._scan(pre[0], f));
},
);
}).rillNoScope;scanChunks()
Implementation
Rill<O2> scanChunks<S, O2>(S initial, Function2<S, Chunk<O>, (S, Chunk<O2>)> f) =>
scanChunksOpt(initial, (s) => Some((c) => f(s, c)));scanChunksOpt()
Implementation
Rill<O2> scanChunksOpt<S, O2>(
S initial,
Function1<S, Option<Function1<Chunk<O>, (S, Chunk<O2>)>>> f,
) => pull.scanChunksOpt(initial, f).voided.rillNoScope;sliding()
Implementation
Rill<Chunk<O>> sliding(int size, {int step = 1}) {
assert(size > 0 && step > 0, 'size and step must be positive');
Pull<Chunk<O>, Unit> stepNotSmallerThanSize(Rill<O> s, Chunk<O> prev) {
return s.pull.uncons.flatMap(
(hdtl) => hdtl.foldN(
() => prev.isEmpty ? Pull.done : Pull.output1(prev.take(size)),
(hd, tl) {
final bldr = <Chunk<O>>[];
var current = prev.concat(hd);
while (current.size >= step) {
final (nHeads, nTails) = current.splitAt(step);
bldr.add(nHeads.take(size));
current = nTails;
}
return Pull.output(
Chunk.fromList(bldr),
).append(() => stepNotSmallerThanSize(tl, current));
},
),
);
}
Pull<Chunk<O>, Unit> stepSmallerThanSize(Rill<O> s, Chunk<O> window, Chunk<O> prev) {
return s.pull.uncons.flatMap(
(hdtl) => hdtl.foldN(
() => prev.isEmpty ? Pull.done : Pull.output1(window.concat(prev).take(size)),
(hd, tl) {
final bldr = <Chunk<O>>[];
var w = window;
var current = prev.concat(hd);
while (current.size >= step) {
final (head, tail) = current.splitAt(step);
final wind = w.concat(head);
bldr.add(wind);
w = wind.drop(step);
current = tail;
}
return Pull.output(
Chunk.fromList(bldr),
).append(() => stepSmallerThanSize(tl, w, current));
},
),
);
}
if (step < size) {
return pull
.unconsN(size, allowFewer: true)
.flatMap(
(hdtl) => hdtl.foldN(
() => Pull.done,
(hd, tl) => Pull.output1(
hd,
).append(() => stepSmallerThanSize(tl, hd.drop(step), Chunk.empty())),
),
)
.rillNoScope;
} else {
return stepNotSmallerThanSize(this, Chunk.empty()).rillNoScope;
}
}spaced()
Rill<O> spaced(Duration delay, {bool startImmediately = true})Implementation
Rill<O> spaced(Duration delay, {bool startImmediately = true}) {
final start = startImmediately ? Rill.unit : Rill.empty<Unit>();
return start.append(() => Rill.fixedDelay(delay)).zipRight(this);
}split()
Implementation
Rill<Chunk<O>> split(Function1<O, bool> p) {
Pull<Chunk<O>, Unit> go(Chunk<O> buffer, Rill<O> s) {
return s.pull.uncons.flatMap((hdtl) {
return hdtl.foldN(
() => buffer.nonEmpty ? Pull.output1(buffer) : Pull.done,
(hd, tl) {
return hd.indexWhere(p).fold(
() => go(buffer.concat(hd), tl),
(idx) {
final pfx = hd.take(idx);
final b2 = buffer.concat(pfx);
return Pull.output1(b2).append(() => go(Chunk.empty(), tl.cons(hd.drop(idx + 1))));
},
);
},
);
});
}
return go(Chunk.empty(), this).rillNoScope;
}switchMap()
Implementation
Rill<O2> switchMap<O2>(Function1<O, Rill<O2>> f) {
final IO<Rill<O2>> rillF = Semaphore.permits(1).flatMap((guard) {
return IO.ref(none<Deferred<Unit>>()).map((haltRef) {
Rill<O2> runInner(O o, Deferred<Unit> halt) {
return Rill.bracketFull(
(poll) => poll(guard.acquire()), // guard against simultaneously running rills
(_, ec) {
return ec.fold(
() => guard.release(),
(_, _) => IO.unit, // on error don't start next stream
() => guard.release(),
);
},
).flatMap((_) => f(o).interruptWhen(halt.value().attempt()));
}
IO<Rill<O2>> haltedF(O o) {
return IO.deferred<Unit>().flatMap((halt) {
return haltRef.getAndSet(halt.some).flatMap((prev) {
return prev
.fold(() => IO.unit, (prev) => prev.complete(Unit()))
.map((_) => runInner(o, halt));
});
});
}
return evalMap(haltedF).parJoin(2);
});
});
return Rill.force(rillF);
}take()
Rill<O> take(int n)Implementation
Rill<O> take(int n) => pull.take(n).voided.rillNoScope;takeRight()
Rill<O> takeRight(int n)Implementation
Rill<O> takeRight(int n) => pull.takeRight(n).flatMap(Pull.output).rillNoScope;takeThrough()
Rill<O> takeThrough(bool Function(O) p)Implementation
Rill<O> takeThrough(Function1<O, bool> p) => pull.takeThrough(p).voided.rillNoScope;takeWhile()
Rill<O> takeWhile(bool Function(O) p, {bool takeFailure = false})Implementation
Rill<O> takeWhile(Function1<O, bool> p, {bool takeFailure = false}) =>
pull.takeWhile(p, takeFailure: takeFailure).voided.rillNoScope;through()
Implementation
Rill<O2> through<O2>(Pipe<O, O2> f) => f(this);timeout()
Rill<O> timeout(Duration timeout)Implementation
Rill<O> timeout(Duration timeout) => interruptWhen(IO.sleep(timeout));toString() inherited
String toString()A string representation of this object.
Some classes have a default textual representation, often paired with a static parse function (like int.parse). These classes will provide the textual representation as their string representation.
Other classes have no meaningful textual representation that a program will care about. Such classes will typically override toString to provide useful information when inspecting the object, mainly for debugging or logging.
Inherited from Object.
Implementation
external String toString();zip()
Implementation
Rill<(O, O2)> zip<O2>(Rill<O2> that) => zipWith(that, (o, o2) => (o, o2));zipAll()
Implementation
Rill<(O, O2)> zipAll<O2>(Rill<O2> that, O padLeft, O2 padRight) =>
zipAllWith(that, padLeft, padRight, (o, o2) => (o, o2));zipAllWith()
Implementation
Rill<O3> zipAllWith<O2, O3>(Rill<O2> that, O padLeft, O2 padRight, Function2<O, O2, O3> f) {
Pull<O3, Unit> loop(Rill<O> s1, Rill<O2> s2) {
return s1.pull.uncons.flatMap((hdtl1) {
return hdtl1.foldN(
() => s2.map((o2) => f(padLeft, o2)).pull.echo,
(hd1, tl1) {
return s2.pull.uncons.flatMap((hdtl2) {
return hdtl2.foldN(
() {
final zippedChunk = hd1.map((o) => f(o, padRight));
return Pull.output(zippedChunk).flatMap((_) {
return tl1.map((o) => f(o, padRight)).pull.echo;
});
},
(hd2, tl2) {
final len = math.min(hd1.size, hd2.size);
final nextS1 = tl1.cons(hd1.drop(len));
final nextS2 = tl2.cons(hd2.drop(len));
return Pull.output(
hd1.zip(hd2).map((t) => f(t.$1, t.$2)),
).flatMap((_) => loop(nextS1, nextS2));
},
);
});
},
);
});
}
return loop(this, that).rillNoScope;
}zipLatest()
Implementation
Rill<(O, O2)> zipLatest<O2>(Rill<O2> that) => zipLatestWith(that, (o, o2) => (o, o2));zipLatestWith()
Implementation
Rill<O3> zipLatestWith<O2, O3>(Rill<O2> that, Function2<O, O2, O3> f) {
return Rill.eval(Queue.unbounded<Either<Object, Option<O3>>>()).flatMap((queue) {
return Rill.eval(Ref.of<Option<O>>(none())).flatMap((refLeft) {
return Rill.eval(Ref.of<Option<O2>>(none())).flatMap((refRight) {
IO<Unit> tryEmit() {
return refLeft.value().flatMap((optL) {
return refRight.value().flatMap((optR) {
return optL.fold(
() => IO.unit,
(l) => optR.fold(
() => IO.unit,
(r) => queue.offer(Right(Some(f(l, r)))),
),
);
});
});
}
IO<Unit> runLeft() => evalMap(
(o) => refLeft.setValue(Some(o)).productL(tryEmit()),
).compile.drain.handleErrorWith((err) => queue.offer(Left(err)));
IO<Unit> runRight() => that
.evalMap((o2) => refRight.setValue(Some(o2)).productL(tryEmit()))
.compile
.drain
.handleErrorWith((err) => queue.offer(Left(err)));
Rill<O3> consumeQueue() {
return Rill.eval(queue.take()).flatMap((event) {
return event.fold(
(err) => Rill.raiseError(err),
(opt) => opt.fold(
() => Rill.empty(),
(o3) => Rill.emit(o3).append(() => consumeQueue()),
),
);
});
}
final driver = IO.both(runLeft(), runRight()).guarantee(queue.offer(Right(none())));
return Rill.bracket(
driver.start(),
(fiber) => fiber.cancel(),
).flatMap((_) => consumeQueue());
});
});
});
}zipLeft()
Implementation
Rill<O> zipLeft<O2>(Rill<O2> other) => zipWith(other, (a, _) => a);zipRight()
Implementation
Rill<O2> zipRight<O2>(Rill<O2> other) => zipWith(other, (_, b) => b);zipWith()
Implementation
Rill<O3> zipWith<O2, O3>(Rill<O2> that, Function2<O, O2, O3> f) {
Pull<O3, Unit> loop(Rill<O> s1, Rill<O2> s2) {
return s1.pull.uncons.flatMap((hdtl1) {
return hdtl1.foldN(
() => Pull.done,
(hd1, tl1) {
return s2.pull.uncons.flatMap((hdtl2) {
return hdtl2.foldN(
() => Pull.done,
(hd2, tl2) {
final len = math.min(hd1.size, hd2.size);
final nextS1 = tl1.cons(hd1.drop(len));
final nextS2 = tl2.cons(hd2.drop(len));
return Pull.output(
hd1.zip(hd2).map((t) => f(t.$1, t.$2)),
).flatMap((_) => loop(nextS1, nextS2));
},
);
});
},
);
});
}
return loop(this, that).rillNoScope;
}zipWithIndex()
Rill<Record> zipWithIndex()Implementation
Rill<(O, int)> zipWithIndex() => scanChunks(0, (index, c) {
var idx = index;
final out = c.map((o) {
final r = (o, idx);
idx += 1;
return r;
});
return (idx, out);
});zipWithNext()
Rill<Record> zipWithNext()Implementation
Rill<(O, Option<O>)> zipWithNext() {
Pull<(O, Option<O>), Unit> go(O last, Rill<O> s) {
return s.pull.uncons.flatMap((hdtl) {
return hdtl.foldN(
() => Pull.output1((last, none())),
(hd, tl) {
final (newLast, out) = hd.mapAccumulate(last, (prev, next) {
return (next, (prev, Option(next)));
});
return Pull.output(out).append(() => go(newLast, tl));
},
);
});
}
return pull.uncons1.flatMap((hdtl) {
return hdtl.foldN(
() => Pull.done,
(hd, tl) => go(hd, tl),
);
}).rillNoScope;
}zipWithPrevious()
Rill<Record> zipWithPrevious()Implementation
Rill<(Option<O>, O)> zipWithPrevious() =>
mapAccumulate(none<O>(), (prev, next) => (Option(next), (prev, next))).map((x) => x.$2);zipWithPreviousAndNext()
Rill<Record> zipWithPreviousAndNext()Implementation
Rill<(Option<O>, O, Option<O>)> zipWithPreviousAndNext() =>
zipWithPrevious().zipWithNext().map((tuple) {
final ((prev, curr), next) = tuple;
return next.foldN(
() => (prev, curr, const None()),
(_, next) => (prev, curr, Some(next)),
);
});zipWithScan()
Rill<Record> zipWithScan<O2>(O2 z, O2 Function(O2, O) f)Implementation
Rill<(O, O2)> zipWithScan<O2>(O2 z, Function2<O2, O, O2> f) => mapAccumulate(
z,
(s, o) => (f(s, o), (o, s)),
).mapN((_, v) => v);zipWithScan1()
Rill<Record> zipWithScan1<O2>(O2 z, O2 Function(O2, O) f)Implementation
Rill<(O, O2)> zipWithScan1<O2>(O2 z, Function2<O2, O, O2> f) => mapAccumulate(
z,
(s, o) {
final s2 = f(s, o);
return (s2, (o, s2));
},
).mapN((_, v) => v);Extension Methods
collectN() extension
Available on Rill<O>, provided by the RillTuple2Ops<T1, T2> extension
Implementation
Rill<T3> collectN<T3>(Function2<T1, T2, Option<T3>> f) => collect(f.tupled);collectN() extension
Available on Rill<O>, provided by the RillTuple3Ops<T1, T2, T3> extension
Implementation
Rill<T4> collectN<T4>(Function3<T1, T2, T3, Option<T4>> f) => collect(f.tupled);collectN() extension
Available on Rill<O>, provided by the RillTuple4Ops<T1, T2, T3, T4> extension
Implementation
Rill<T5> collectN<T5>(Function4<T1, T2, T3, T4, Option<T5>> f) => collect(f.tupled);collectN() extension
Available on Rill<O>, provided by the RillTuple5Ops<T1, T2, T3, T4, T5> extension
Implementation
Rill<T6> collectN<T6>(Function5<T1, T2, T3, T4, T5, Option<T6>> f) => collect(f.tupled);evalMapN() extension
Available on Rill<O>, provided by the RillTuple2Ops<T1, T2> extension
Implementation
Rill<T3> evalMapN<T3>(Function2<T1, T2, IO<T3>> f) => evalMap(f.tupled);evalMapN() extension
Available on Rill<O>, provided by the RillTuple3Ops<T1, T2, T3> extension
Implementation
Rill<T4> evalMapN<T4>(Function3<T1, T2, T3, IO<T4>> f) => evalMap(f.tupled);evalMapN() extension
Available on Rill<O>, provided by the RillTuple5Ops<T1, T2, T3, T4, T5> extension
Implementation
Rill<T6> evalMapN<T6>(Function5<T1, T2, T3, T4, T5, IO<T6>> f) => evalMap(f.tupled);evalMapN() extension
Available on Rill<O>, provided by the RillTuple4Ops<T1, T2, T3, T4> extension
Implementation
Rill<T5> evalMapN<T5>(Function4<T1, T2, T3, T4, IO<T5>> f) => evalMap(f.tupled);evalTapN() extension
Available on Rill<O>, provided by the RillTuple5Ops<T1, T2, T3, T4, T5> extension
Implementation
Rill<(T1, T2, T3, T4, T5)> evalTapN<T6>(Function5<T1, T2, T3, T4, T5, IO<T6>> f) =>
evalTap(f.tupled);evalTapN() extension
Available on Rill<O>, provided by the RillTuple4Ops<T1, T2, T3, T4> extension
Implementation
Rill<(T1, T2, T3, T4)> evalTapN<T5>(Function4<T1, T2, T3, T4, IO<T5>> f) => evalTap(f.tupled);evalTapN() extension
Available on Rill<O>, provided by the RillTuple3Ops<T1, T2, T3> extension
Implementation
Rill<(T1, T2, T3)> evalTapN<T4>(Function3<T1, T2, T3, IO<T4>> f) => evalTap(f.tupled);evalTapN() extension
Available on Rill<O>, provided by the RillTuple2Ops<T1, T2> extension
Implementation
Rill<(T1, T2)> evalTapN<T3>(Function2<T1, T2, IO<T3>> f) => evalTap(f.tupled);filterN() extension
Rill<Record> filterN(bool Function(T1, T2, T3, T4, T5) f)Available on Rill<O>, provided by the RillTuple5Ops<T1, T2, T3, T4, T5> extension
Implementation
Rill<(T1, T2, T3, T4, T5)> filterN(Function5<T1, T2, T3, T4, T5, bool> f) => filter(f.tupled);filterN() extension
Rill<Record> filterN(bool Function(T1, T2) f)Available on Rill<O>, provided by the RillTuple2Ops<T1, T2> extension
Implementation
Rill<(T1, T2)> filterN(Function2<T1, T2, bool> f) => filter(f.tupled);filterN() extension
Rill<Record> filterN(bool Function(T1, T2, T3, T4) f)Available on Rill<O>, provided by the RillTuple4Ops<T1, T2, T3, T4> extension
Implementation
Rill<(T1, T2, T3, T4)> filterN(Function4<T1, T2, T3, T4, bool> f) => filter(f.tupled);filterN() extension
Rill<Record> filterN(bool Function(T1, T2, T3) f)Available on Rill<O>, provided by the RillTuple3Ops<T1, T2, T3> extension
Implementation
Rill<(T1, T2, T3)> filterN(Function3<T1, T2, T3, bool> f) => filter(f.tupled);filterNotN() extension
Rill<Record> filterNotN(bool Function(T1, T2, T3, T4, T5) f)Available on Rill<O>, provided by the RillTuple5Ops<T1, T2, T3, T4, T5> extension
Implementation
Rill<(T1, T2, T3, T4, T5)> filterNotN(Function5<T1, T2, T3, T4, T5, bool> f) =>
filterNot(f.tupled);filterNotN() extension
Rill<Record> filterNotN(bool Function(T1, T2, T3, T4) f)Available on Rill<O>, provided by the RillTuple4Ops<T1, T2, T3, T4> extension
Implementation
Rill<(T1, T2, T3, T4)> filterNotN(Function4<T1, T2, T3, T4, bool> f) => filterNot(f.tupled);filterNotN() extension
Rill<Record> filterNotN(bool Function(T1, T2, T3) f)Available on Rill<O>, provided by the RillTuple3Ops<T1, T2, T3> extension
Implementation
Rill<(T1, T2, T3)> filterNotN(Function3<T1, T2, T3, bool> f) => filterNot(f.tupled);filterNotN() extension
Rill<Record> filterNotN(bool Function(T1, T2) f)Available on Rill<O>, provided by the RillTuple2Ops<T1, T2> extension
Implementation
Rill<(T1, T2)> filterNotN(Function2<T1, T2, bool> f) => filterNot(f.tupled);flatMapN() extension
Available on Rill<O>, provided by the RillTuple3Ops<T1, T2, T3> extension
Implementation
Rill<T4> flatMapN<T4>(Function3<T1, T2, T3, Rill<T4>> f) => flatMap(f.tupled);flatMapN() extension
Available on Rill<O>, provided by the RillTuple2Ops<T1, T2> extension
Implementation
Rill<T3> flatMapN<T3>(Function2<T1, T2, Rill<T3>> f) => flatMap(f.tupled);flatMapN() extension
Available on Rill<O>, provided by the RillTuple5Ops<T1, T2, T3, T4, T5> extension
Implementation
Rill<T6> flatMapN<T6>(Function5<T1, T2, T3, T4, T5, Rill<T6>> f) => flatMap(f.tupled);flatMapN() extension
Available on Rill<O>, provided by the RillTuple4Ops<T1, T2, T3, T4> extension
Implementation
Rill<T5> flatMapN<T5>(Function4<T1, T2, T3, T4, Rill<T5>> f) => flatMap(f.tupled);flatten() extension
Rill<O> flatten()Available on Rill<O>, provided by the RillFlattenOps<O> extension
Implementation
Rill<O> flatten() => flatMap((r) => r);ifM() extension
Available on Rill<O>, provided by the RillBooleanOps extension
Implementation
Rill<O2> ifM<O2>(Function0<Rill<O2>> ifTrue, Function0<Rill<O2>> ifFalse) =>
flatMap((b) => b ? ifTrue() : ifFalse());mapN() extension
Rill<T6> mapN<T6>(T6 Function(T1, T2, T3, T4, T5) f)Available on Rill<O>, provided by the RillTuple5Ops<T1, T2, T3, T4, T5> extension
Implementation
Rill<T6> mapN<T6>(Function5<T1, T2, T3, T4, T5, T6> f) => map(f.tupled);mapN() extension
Rill<T3> mapN<T3>(T3 Function(T1, T2) f)Available on Rill<O>, provided by the RillTuple2Ops<T1, T2> extension
Implementation
Rill<T3> mapN<T3>(Function2<T1, T2, T3> f) => map(f.tupled);mapN() extension
Rill<T5> mapN<T5>(T5 Function(T1, T2, T3, T4) f)Available on Rill<O>, provided by the RillTuple4Ops<T1, T2, T3, T4> extension
Implementation
Rill<T5> mapN<T5>(Function4<T1, T2, T3, T4, T5> f) => map(f.tupled);mapN() extension
Rill<T4> mapN<T4>(T4 Function(T1, T2, T3) f)Available on Rill<O>, provided by the RillTuple3Ops<T1, T2, T3> extension
Implementation
Rill<T4> mapN<T4>(Function3<T1, T2, T3, T4> f) => map(f.tupled);parEvalMapN() extension
Available on Rill<O>, provided by the RillTuple5Ops<T1, T2, T3, T4, T5> extension
Implementation
Rill<T6> parEvalMapN<T6>(
int maxConcurrent,
Function5<T1, T2, T3, T4, T5, IO<T6>> f,
) => parEvalMap(maxConcurrent, f.tupled);parEvalMapN() extension
Available on Rill<O>, provided by the RillTuple2Ops<T1, T2> extension
Implementation
Rill<T3> parEvalMapN<T3>(
int maxConcurrent,
Function2<T1, T2, IO<T3>> f,
) => parEvalMap(maxConcurrent, f.tupled);parEvalMapN() extension
Available on Rill<O>, provided by the RillTuple3Ops<T1, T2, T3> extension
Implementation
Rill<T4> parEvalMapN<T4>(
int maxConcurrent,
Function3<T1, T2, T3, IO<T4>> f,
) => parEvalMap(maxConcurrent, f.tupled);parEvalMapN() extension
Available on Rill<O>, provided by the RillTuple4Ops<T1, T2, T3, T4> extension
Implementation
Rill<T5> parEvalMapN<T5>(
int maxConcurrent,
Function4<T1, T2, T3, T4, IO<T5>> f,
) => parEvalMap(maxConcurrent, f.tupled);parEvalMapUnboundedN() extension
Available on Rill<O>, provided by the RillTuple5Ops<T1, T2, T3, T4, T5> extension
Implementation
Rill<T6> parEvalMapUnboundedN<T6>(Function5<T1, T2, T3, T4, T5, IO<T6>> f) =>
parEvalMapUnbounded(f.tupled);parEvalMapUnboundedN() extension
Available on Rill<O>, provided by the RillTuple2Ops<T1, T2> extension
Implementation
Rill<T3> parEvalMapUnboundedN<T3>(Function2<T1, T2, IO<T3>> f) => parEvalMapUnbounded(f.tupled);parEvalMapUnboundedN() extension
Available on Rill<O>, provided by the RillTuple4Ops<T1, T2, T3, T4> extension
Implementation
Rill<T5> parEvalMapUnboundedN<T5>(Function4<T1, T2, T3, T4, IO<T5>> f) =>
parEvalMapUnbounded(f.tupled);parEvalMapUnboundedN() extension
Available on Rill<O>, provided by the RillTuple3Ops<T1, T2, T3> extension
Implementation
Rill<T4> parEvalMapUnboundedN<T4>(Function3<T1, T2, T3, IO<T4>> f) =>
parEvalMapUnbounded(f.tupled);parEvalMapUnorderedN() extension
Available on Rill<O>, provided by the RillTuple3Ops<T1, T2, T3> extension
Implementation
Rill<T4> parEvalMapUnorderedN<T4>(
int maxConcurrent,
Function3<T1, T2, T3, IO<T4>> f,
) => parEvalMapUnordered(maxConcurrent, f.tupled);parEvalMapUnorderedN() extension
Available on Rill<O>, provided by the RillTuple4Ops<T1, T2, T3, T4> extension
Implementation
Rill<T5> parEvalMapUnorderedN<T5>(
int maxConcurrent,
Function4<T1, T2, T3, T4, IO<T5>> f,
) => parEvalMapUnordered(maxConcurrent, f.tupled);parEvalMapUnorderedN() extension
Available on Rill<O>, provided by the RillTuple5Ops<T1, T2, T3, T4, T5> extension
Implementation
Rill<T6> parEvalMapUnorderedN<T6>(
int maxConcurrent,
Function5<T1, T2, T3, T4, T5, IO<T6>> f,
) => parEvalMapUnordered(maxConcurrent, f.tupled);parEvalMapUnorderedN() extension
Available on Rill<O>, provided by the RillTuple2Ops<T1, T2> extension
Implementation
Rill<T3> parEvalMapUnorderedN<T3>(
int maxConcurrent,
Function2<T1, T2, IO<T3>> f,
) => parEvalMapUnordered(maxConcurrent, f.tupled);parEvalMapUnorderedUnboundedN() extension
Available on Rill<O>, provided by the RillTuple2Ops<T1, T2> extension
Implementation
Rill<T3> parEvalMapUnorderedUnboundedN<T3>(Function2<T1, T2, IO<T3>> f) =>
parEvalMapUnorderedUnbounded(f.tupled);parEvalMapUnorderedUnboundedN() extension
Available on Rill<O>, provided by the RillTuple3Ops<T1, T2, T3> extension
Implementation
Rill<T4> parEvalMapUnorderedUnboundedN<T4>(Function3<T1, T2, T3, IO<T4>> f) =>
parEvalMapUnorderedUnbounded(f.tupled);parEvalMapUnorderedUnboundedN() extension
Available on Rill<O>, provided by the RillTuple4Ops<T1, T2, T3, T4> extension
Implementation
Rill<T5> parEvalMapUnorderedUnboundedN<T5>(Function4<T1, T2, T3, T4, IO<T5>> f) =>
parEvalMapUnorderedUnbounded(f.tupled);parEvalMapUnorderedUnboundedN() extension
Available on Rill<O>, provided by the RillTuple5Ops<T1, T2, T3, T4, T5> extension
Implementation
Rill<T6> parEvalMapUnorderedUnboundedN<T6>(Function5<T1, T2, T3, T4, T5, IO<T6>> f) =>
parEvalMapUnorderedUnbounded(f.tupled);parJoin() extension
Rill<O> parJoin(int maxOpen)Runs maxOpen inner streams concurrently.
- Waits for all inner streams to finish.
- If any stream fails, the error is propagated and all other streams are cancelled.
- If the output stream is interrupted, all running streams are cancelled.
Available on Rill<O>, provided by the ParJoinOps<O> extension
Implementation
Rill<O> parJoin(int maxOpen) {
assert(maxOpen > 0, 'maxOpen must be greater than 0, was: $maxOpen');
if (maxOpen == 1) {
return flatten();
} else {
final rillF = (
SignallingRef.of(none<Object>()),
Semaphore.permits(maxOpen),
SignallingRef.of(1),
Channel.unbounded<Unit>(),
Channel.unbounded<Chunk<O>>(),
).mapN((done, available, running, outcomes, output) {
IO<Unit> stop(Option<Object> rslt) {
return done.update((current) {
return current.fold(
() => rslt,
(prev) => rslt.fold(
() => current, // If we're trying to set None, keep existing result (if any)
(err) => Some(
current.fold(
() => Some(err),
(prevErr) => Some(CompositeFailure.of(prevErr, err)),
),
),
),
);
});
}
final incrementRunning = running.update((n) => n + 1);
final decrementRunning = running
.updateAndGet((n) => n - 1)
.flatMap((now) => now == 0 ? outcomes.close().voided() : IO.unit);
IO<Unit> onOutcome(
Outcome<Unit> oc,
Either<Object, Unit> cancelResult,
) {
return oc.fold(
() => cancelResult.fold((err) => stop(Some(err)), (_) => IO.unit),
(err, st) => CompositeFailure.fromResults(
Left(err),
cancelResult,
).fold((err) => stop(Some(err)), (_) => IO.unit),
(fu) => cancelResult.fold(
(err) => stop(Some(err)),
(_) => outcomes.send(fu).voided(),
),
);
}
IO<Unit> runInner(Rill<O> inner, Scope outerScope) {
return IO.uncancelable((_) {
return outerScope.lease().flatMap((lease) {
return available.acquire().productR(incrementRunning).flatMap((_) {
return inner
.chunks()
.evalMap((chunk) => output.send(chunk).voided())
.interruptWhenSignaled(done.map((x) => x.nonEmpty))
.compile
.drain
.guaranteeCase((oc) {
return lease.cancel
.flatMap((cancelResult) => onOutcome(oc, cancelResult))
.guarantee(
available.release().productR(decrementRunning),
);
})
.voidError()
.start()
.voided();
});
});
});
}
IO<Unit> runOuter() {
return IO.uncancelable((_) {
return flatMap(
(inner) =>
Pull.getScope
.flatMap((outerScope) => Pull.eval(runInner(inner, outerScope)))
.rillNoScope,
)
.drain()
.interruptWhenSignaled(done.map((x) => x.nonEmpty))
.compile
.drain
.guaranteeCase(
(oc) => onOutcome(oc, Right(Unit())).productR(decrementRunning),
)
.voidError();
});
}
IO<Unit> outcomeJoiner() {
return outcomes.rill.compile.drain.guaranteeCase((oc) {
return oc.fold(
() => stop(none()).productR(output.close().voided()),
(err, st) => stop(Some(err)).productR(output.close().voided()),
(fu) => stop(none()).productR(output.close().voided()),
);
}).voidError();
}
IO<Unit> signalResult(IOFiber<Unit> fiber) {
return done.value().flatMap((opt) {
return opt.fold(
() => fiber.join().voided(),
(err) => err is _Interruption ? fiber.join().voided() : IO.raiseError(err),
);
});
}
return Rill.bracket(
runOuter().start().productR(outcomeJoiner().start()),
(fiber) {
return done
.update((c) => c.fold(() => const Some(_Interruption()), (prev) => Some(prev)))
.productR(
// in case of short-circuiting, the `fiberJoiner` would not have had a chance
// to wait until all fibers have been joined, so we need to do it manually
// by waiting on the counter
running.waitUntil((n) => n == 0).productR(signalResult(fiber)),
);
},
).flatMap((_) => output.rill.flatMap((o) => Rill.chunk(o)));
});
return Rill.eval(rillF).flatten();
}
}parJoinUnbounded() extension
Rill<O> parJoinUnbounded()Available on Rill<O>, provided by the ParJoinOps<O> extension
Implementation
Rill<O> parJoinUnbounded() => parJoin(Integer.maxValue);toDartStream() extension
Stream<O> toDartStream()Available on Rill<O>, provided by the ToDartStreamOps<O> extension
Implementation
Stream<O> toDartStream() {
late StreamController<O> controller;
IOFiber<Unit>? activeFiber;
controller = StreamController(
onListen: () {
final program = evalMap(
(o) => IO.exec(() {
if (!controller.isClosed) controller.add(o);
}),
)
.handleErrorWith((err) {
return Rill.eval(
IO.exec(() {
if (!controller.isClosed) controller.addError(err);
}),
);
})
.compile
.drain
.guarantee(
IO.exec(() {
if (!controller.isClosed) controller.close();
}),
);
final startIO = program.start();
startIO.unsafeRunAsync((outcome) {
outcome.fold(
() {
if (!controller.isClosed) controller.close();
},
(err, stackTrace) {
if (!controller.isClosed) controller.addError(err, stackTrace);
},
(fiber) => activeFiber = fiber,
);
});
},
onCancel: () {
if (activeFiber != null) {
final completer = Completer<void>();
activeFiber!.cancel().unsafeRunAsync((_) => completer.complete());
return completer.future;
}
},
);
return controller.stream;
}widen() extension
Rill<O2> widen<O2>()Widens this Rill<Never> to Rill<O2>.
Useful when an API expects Rill<O2> and you have a Rill<Never> that produces no elements (e.g. after Rill.drain or Rill.foreach).
Why this is type-safe: the ideal signature would use a lower type bound:
Rill<O2> widen<O2 super O>()where O2 super O means "O2 is a supertype of O". Dart does not support lower type bounds, so restricting the receiver to Rill<Never> encodes the same constraint at the type level. Never is the bottom type — Never <: O2 for every O2 — so the cast Pull<Never, Unit> as Pull<O2, Unit> always succeeds at runtime.
See also Rill.append for the type-preserving concatenation variant.
Available on Rill<O>, provided by the RillNeverOps extension
Implementation
Rill<O2> widen<O2>() => (underlying as Pull<O2, Unit>).rillNoScope;Operators
operator +()
Implementation
Rill<O> operator +(Rill<O> other) => underlying.flatMap((_) => other.underlying).rillNoScope;operator ==() inherited
bool operator ==(Object other)The equality operator.
The default behavior for all Objects is to return true if and only if this object and other are the same object.
Override this method to specify a different equality relation on a class. The overriding method must still be an equivalence relation. That is, it must be:
Total: It must return a boolean for all arguments. It should never throw.
Reflexive: For all objects
o,o == omust be true.Symmetric: For all objects
o1ando2,o1 == o2ando2 == o1must either both be true, or both be false.Transitive: For all objects
o1,o2, ando3, ifo1 == o2ando2 == o3are true, theno1 == o3must be true.
The method should also be consistent over time, so whether two objects are equal should only change if at least one of the objects was modified.
If a subclass overrides the equality operator, it should override the hashCode method as well to maintain consistency.
Inherited from Object.
Implementation
external bool operator ==(Object other);Static Properties
never final
final Rill<Never> neverImplementation
static final Rill<Never> never = Rill.eval(IO.never());unit final
Implementation
static final Rill<Unit> unit = Pull.outUnit.rillNoScope;Static Methods
awakeEvery()
Rill<Duration> awakeEvery(Duration period, {bool dampen = true})Implementation
static Rill<Duration> awakeEvery(Duration period, {bool dampen = true}) {
return Rill.eval(IO.now).flatMap((start) {
return _fixedRate(
period,
start,
dampen,
).flatMap((_) => Rill.eval(IO.now.map((now) => now.difference(start))));
});
}bracket()
Implementation
static Rill<R> bracket<R>(IO<R> acquire, Function1<R, IO<Unit>> release) =>
bracketFull((_) => acquire, (r, _) => release(r));bracketCase()
Implementation
static Rill<R> bracketCase<R>(IO<R> acquire, Function2<R, ExitCase, IO<Unit>> release) =>
bracketFull((_) => acquire, release);bracketFull()
Implementation
static Rill<R> bracketFull<R>(
Function1<Poll, IO<R>> acquire,
Function2<R, ExitCase, IO<Unit>> release,
) => Pull.acquireCancelable(acquire, release).flatMap(Pull.output1).rillNoScope;chunk()
Implementation
static Rill<O> chunk<O>(Chunk<O> values) => Pull.output(values).rillNoScope;constant()
Rill<O> constant<O>(O o, {int chunkSize = 256})Implementation
static Rill<O> constant<O>(O o, {int chunkSize = 256}) =>
chunkSize > 0 ? chunk(Chunk.fill(chunkSize, o)).repeat() : Rill.empty();duration()
Rill<Duration> duration()Implementation
static Rill<Duration> duration() =>
Rill.eval(IO.now).flatMap((t0) => Rill.repeatEval(IO.now.map((now) => now.difference(t0))));emit()
Rill<O2> emit<O>(O value)Implementation
static Rill<O> emit<O>(O value) => Pull.output1(value).rillNoScope;emits()
Rill<O> emits<O>(List<O> values)Implementation
static Rill<O> emits<O>(List<O> values) => Rill.chunk(Chunk.fromList(values));empty()
Rill<O2> empty<O>()Implementation
static Rill<O> empty<O>() => Rill._noScope(Pull.done);eval()
Implementation
static Rill<O> eval<O>(IO<O> io) => Pull.eval(io).flatMap(Pull.output1).rillNoScope;exec()
Implementation
static Rill<O> exec<O>(IO<Unit> io) => Rill._noScope(Pull.eval(io).flatMap((_) => Pull.done));fixedDelay()
Implementation
static Rill<Unit> fixedDelay(Duration period) => sleep(period).repeat();fixedRate()
Implementation
static Rill<Unit> fixedRate(Duration period, {bool dampen = true}) =>
Rill.eval(IO.now).flatMap((t) => _fixedRate(period, t, dampen));fixedRateStartImmediately()
Implementation
static Rill<Unit> fixedRateStartImmediately(Duration period, {bool dampen = true}) =>
Rill.eval(IO.now).flatMap((t) => Rill.unit.append(() => _fixedRate(period, t, dampen)));force()
Implementation
static Rill<O> force<O>(IO<Rill<O>> io) => Rill.eval(io).flatMap(identity);fromEither()
Implementation
static Rill<O> fromEither<E extends Object, O>(Either<E, O> either) =>
either.fold((err) => Rill.raiseError(err), (o) => Rill.emit(o));fromIterator()
Rill<O> fromIterator<O>(Iterator<O> it, {int chunkSize = 64})Implementation
static Rill<O> fromIterator<O>(Iterator<O> it, {int chunkSize = 64}) =>
fromRIterator(RIterator.fromDart(it), chunkSize: chunkSize);fromOption()
Implementation
static Rill<O> fromOption<E extends Object, O>(Option<O> option) =>
option.fold(() => Rill.empty(), (o) => Rill.emit(o));fromQueueNoneUnterminated()
Implementation
static Rill<O> fromQueueNoneUnterminated<O>(
Queue<Option<O>> queue, {
int? limit,
}) => _awaitNoneTerminated(queue, limit ?? Integer.maxValue);fromQueueNoneUnterminatedChunk()
Implementation
static Rill<O> fromQueueNoneUnterminatedChunk<O>(
Queue<Option<Chunk<O>>> queue, {
int? limit,
}) => _fromQueueNoneUnterminatedChunk(queue.take(), queue.tryTake(), limit: limit);fromQueueUnterminated()
Implementation
static Rill<O> fromQueueUnterminated<O>(
Queue<O> queue, {
int? limit,
}) {
final lim = limit ?? Integer.maxValue;
if (lim > 1) {
// First, try non-blocking batch dequeue.
// Only if the result is an empty list, semantically block to get one element,
// then attempt 2nd tryTakeN to get any other elements that are immediately available.
final osf = queue
.tryTakeN(Some(lim))
.flatMap((os) {
if (os.isEmpty) {
return (
queue.take(),
queue.tryTakeN(Some(lim - 1)),
).mapN((elem, elems) => elems.prepended(elem));
} else {
return IO.pure(os);
}
})
.map((l) => Chunk.from(l));
return Rill.eval(osf).flatMap(Rill.chunk).repeat();
} else {
return Rill.repeatEval(queue.take());
}
}fromQueueUnterminatedChunk()
Implementation
static Rill<O> fromQueueUnterminatedChunk<O>(
Queue<Chunk<O>> queue, {
int? limit,
}) => _fromQueueNoneUnterminatedChunk(
queue.take().map((chunk) => Some(chunk)),
queue.tryTake().map((chunkOpt) => chunkOpt.map((chunk) => Some(chunk))),
limit: limit,
);fromRIterator()
Implementation
static Rill<O> fromRIterator<O>(RIterator<O> it, {int chunkSize = 64}) {
assert(chunkSize > 0, 'Rill.fromRIterator: chunkSize must be positive');
IO<Option<(Chunk<O>, RIterator<O>)>> getNextChunk(RIterator<O> i) {
return IO.delay(() {
final bldr = <O>[];
int cnt = 0;
while (cnt < chunkSize && i.hasNext) {
bldr.add(i.next());
cnt += 1;
}
return cnt == 0 ? const None() : Some((Chunk.fromList(bldr), i));
});
}
return Rill.unfoldChunkEval(it, getNextChunk);
}fromStream()
Rill<O> fromStream<O>(
Stream<O> stream, {
OverflowStrategy strategy = const _DropOldest(100),
})Implementation
static Rill<O> fromStream<O>(
Stream<O> stream, {
OverflowStrategy strategy = const _DropOldest(100),
}) {
final IO<Queue<Either<Object, Option<O>>>> createQueue = switch (strategy) {
_DropNewest(:final bufferSize) => Queue.dropping(bufferSize),
_DropOldest(:final bufferSize) => Queue.circularBuffer(bufferSize),
_Unbounded() => Queue.unbounded(),
};
return Rill.eval(createQueue).flatMap((queue) {
Rill<O> consumeStreamQueue() {
return Rill.eval(queue.take()).flatMap((event) {
return event.fold(
(err) => Rill.raiseError(err),
(opt) => opt.fold(
() => Rill.empty(),
(o) => Rill.emit(o).append(() => consumeStreamQueue()),
),
);
});
}
final acquire = IO.delay(() {
return stream.listen(
(data) => queue.offer(Right(Some(data))).unsafeRunAndForget(),
onError: (Object err) => queue.offer(Left(err)).unsafeRunAndForget(),
onDone: () => queue.offer(Right(none())).unsafeRunAndForget(),
cancelOnError: false, // handle manually
);
});
IO<Unit> release(StreamSubscription<O> sub) => IO.fromFutureF(() => sub.cancel()).voided();
return Rill.bracket(acquire, release).flatMap((_) => consumeStreamQueue());
});
}iterate()
Rill<O> iterate<O>(O start, O Function(O) f, {int chunkSize = 64})Implementation
static Rill<O> iterate<O>(
O start,
Function1<O, O> f, {
int chunkSize = 64,
}) {
if (chunkSize <= 0) throw ArgumentError('Rill.iterate: chunkSize must be positive.');
Pull<O, Unit> loop(O current) {
final elements = <O>[];
O nextVal = current;
for (var i = 0; i < chunkSize; i++) {
elements.add(nextVal);
nextVal = f(nextVal);
}
return Pull.output(Chunk.fromList(elements)).append(() => loop(nextVal));
}
return loop(start).rillNoScope;
}iterateEval()
Implementation
static Rill<O> iterateEval<O>(O start, Function1<O, IO<O>> f) =>
Rill.emit(start).append(() => Rill.eval(f(start)).flatMap((o) => iterateEval(o, f)));pure()
Rill<O2> pure<O>(O value)Implementation
static Rill<O> pure<O>(O value) => Rill._noScope(Pull.output1(value));raiseError()
Rill<O> raiseError<O>(Object error, [StackTrace? stackTrace])Implementation
static Rill<O> raiseError<O>(Object error, [StackTrace? stackTrace]) =>
Pull.raiseError(error, stackTrace).rillNoScope;range()
Rill<int> range(
int start,
int stopExclusive, {
int step = 1,
int chunkSize = 64,
})Implementation
static Rill<int> range(
int start,
int stopExclusive, {
int step = 1,
int chunkSize = 64,
}) {
assert(chunkSize > 0, 'chunkSize must be positive');
if (step == 0) {
return Rill.empty();
} else {
Pull<int, Unit> loop(int current) {
// Termination check (direction is constant across iterations)
if (step > 0 && current >= stopExclusive) {
return Pull.done;
} else if (step < 0 && current <= stopExclusive) {
return Pull.done;
} else {
// Compute exact count for this chunk
final available =
step > 0
? (stopExclusive - current + step - 1) ~/ step
: (current - stopExclusive + (-step) - 1) ~/ (-step);
final count = available < chunkSize ? available : chunkSize;
return Pull.output(
Chunk.tabulate(count, (j) => current + j * step),
).append(() => loop(current + count * step));
}
}
return loop(start).rillNoScope;
}
}repeatEval()
Implementation
static Rill<O> repeatEval<O>(IO<O> fo) => Rill.eval(fo).repeat();resource()
Implementation
static Rill<O> resource<O>(Resource<O> r) {
return switch (r) {
Pure(:final value) => Rill.emit(value),
Eval(:final task) => Rill.eval(task),
Bind(:final source, :final f) => Rill.resource(source).flatMap((o) => Rill.resource(f(o))),
Allocate(:final resource) => Rill.bracketFull(
(poll) => resource(poll),
(resAndRelease, exitCase) => resAndRelease.$2(exitCase),
)._mapNoScope((x) => x.$1),
_ => throw StateError('Unhandled Resource ADT: $r'),
}.scope;
}retry()
Rill<O> retry<O>(
IO<O> fo,
Duration delay,
Duration Function(Duration) nextDelay,
int maxAttempts, {
(bool Function(Object))? retriable,
})Implementation
static Rill<O> retry<O>(
IO<O> fo,
Duration delay,
Function1<Duration, Duration> nextDelay,
int maxAttempts, {
Function1<Object, bool>? retriable,
}) {
final delays = Rill.unfold(delay, (d) => Some((d, nextDelay(d))));
final canRetry = retriable ?? (_) => true;
return Rill.eval(fo)
.attempts(delays)
.take(maxAttempts)
.takeThrough((r) => r.fold((err) => canRetry(err), (_) => false))
.last
.unNone
.rethrowError;
}sleep()
Implementation
static Rill<Unit> sleep(Duration duration) => Rill.eval(IO.sleep(duration));sleep_()
Rill<O> sleep_<O>(Duration duration)Implementation
static Rill<O> sleep_<O>(Duration duration) => Rill.exec(IO.sleep(duration));supervise()
Implementation
static Rill<IOFiber<O>> supervise<O>(IO<O> fo) =>
Rill.bracket(fo.start(), (fiber) => fiber.cancel());suspend()
Implementation
static Rill<O> suspend<O>(Function0<Rill<O>> f) => Pull.suspend(() => f().underlying).rillNoScope;unfold()
Implementation
static Rill<O> unfold<S, O>(
S s,
Function1<S, Option<(O, S)>> f, {
int chunkSize = 64,
}) {
if (chunkSize <= 0) throw ArgumentError('Rill.unfold: chunkSize must be positive.');
Pull<O, Unit> loop(S currentState) {
final elements = <O>[];
S nextState = currentState;
bool isDone = false;
for (int i = 0; i < chunkSize; i++) {
final result = f(nextState);
result.foldN(
() => isDone = true,
(elem, newState) {
elements.add(elem);
nextState = newState;
},
);
if (isDone) break;
}
if (elements.isEmpty) {
return Pull.done;
} else if (isDone) {
return Pull.output(Chunk.fromList(elements));
} else {
return Pull.output(Chunk.fromList(elements)).append(() => loop(nextState));
}
}
return loop(s).rillNoScope;
}unfoldChunk()
Implementation
static Rill<O> unfoldChunk<S, O>(
S s,
Function1<S, Option<(Chunk<O>, S)>> f, {
int chunkSize = 64,
}) => unfold(s, f, chunkSize: chunkSize).flatMap(Rill.chunk);unfoldChunkEval()
Implementation
static Rill<O> unfoldChunkEval<S, O>(S s, Function1<S, IO<Option<(Chunk<O>, S)>>> f) {
Pull<O, Unit> loop(S currentState) {
return Pull.eval(f(currentState)).flatMap((opt) {
return opt.foldN(
() => Pull.done,
(elems, nextState) => Pull.output(elems).append(() => loop(nextState)),
);
});
}
return loop(s).rillNoScope;
}unfoldEval()
Implementation
static Rill<O> unfoldEval<S, O>(S s, Function1<S, IO<Option<(O, S)>>> f) {
Pull<O, Unit> loop(S currentState) {
return Pull.eval(f(currentState)).flatMap((opt) {
return opt.foldN(
() => Pull.done,
(elem, nextState) => Pull.output1(elem).append(() => loop(nextState)),
);
});
}
return loop(s).rillNoScope;
}