Rill
Rill<O> is Ribs' purely functional, effectful stream type. It lazily describes a sequence of zero or more values of type O, where producing each value may involve arbitrary IO effects.
Motivation: Why not Dart's Stream?
Dart's built-in Stream is fine for simple event handling, but its properties make it unsuitable for purely functional programs.
Eagerness. Subscribing to a Stream starts effects immediately — you cannot pass a stream around as a value without the work already being in flight.
// The HTTP request starts the moment this line runs
final stream = Stream.fromFuture(fetchUserFromDatabase(id));Single subscription. Most Dart streams can only be listened to once. Reusing or composing them requires broadcasting boilerplate.
Manual resource management. There is no built-in way to tie a resource's lifetime — a file, a socket, a database connection — to a stream's lifecycle. Forgetting to close a StreamController or cancel a subscription silently leaks resources.
Awkward IO integration. Mixing async/await with stream combinators requires leaving the pure model, resulting in scattered callbacks and async* generators.
Rill addresses all of these:
Dart Stream | Rill | |
|---|---|---|
| Evaluation | Eager — effects start on subscribe | Lazy — just a description |
| Referential transparency | No | Yes |
| Resource safety | Manual | Built-in bracket |
| Effect model | async/await + callbacks | IO throughout |
| Reuse | Single-subscription by default | Compile as many times as needed |
| Concurrency | Limited | Fine-grained parEvalMap and merge |
What is a Rill?
A Rill<O> is a lazy, pure description of a stream of O values. Nothing runs until you call .compile and then execute the resulting IO:
// A Rill is inert — no effects have run
final rill = Rill.range(1, 6).map((int n) => n * 2);
// compile produces an IO that, when run, collects the elements
final io = rill.compile.toIList;
// The actual work only happens when the IO is run (e.g. unsafeRunFuture())Because a Rill is referentially transparent, you can pass it to functions, store it in a variable, and compile it more than once — each compilation produces an independent IO that runs the full pipeline from scratch.
Internally, Rill<O> wraps Pull<O, Unit>, a trampolined computation that emits Chunk<O> values and is stack-safe regardless of rill length. You rarely work with Pull directly; Rill exposes a high-level API on top of it.
Relationship to IO
Every terminal operation on a Rill returns an IO:
| Terminal | Returns | Description |
|---|---|---|
compile.toIList | IO<IList<O>> | Collect all elements |
compile.drain | IO<Unit> | Run for effects, discard results |
compile.fold(init, f) | IO<B> | Reduce to a single value |
compile.count | IO<int> | Count elements |
compile.last | IO<Option<O>> | Get the last element |
This means a Rill pipeline always runs inside IO's fiber system, inheriting IO's cancelation, error handling, and concurrency semantics. It composes naturally with every other IO primitive: Ref, Deferred, Semaphore, etc.
Effects within the rill are expressed with evalMap and evalTap, which lift IO actions into the pipeline without breaking the pure model.
Creating a Rill
IO<Unit> rillCreate() {
// Rill.emits: emit from a fixed Dart list
final fromList = Rill.emits([1, 2, 3, 4, 5]);
// Rill.range: lazily generate integers in [start, stopExclusive)
final range = Rill.range(1, 11);
// Rill.eval: embed a single IO as a one-element stream
final fromIO = Rill.eval(IO.delay(() => DateTime.now().millisecondsSinceEpoch));
// Rill.repeatEval: repeat an IO forever — combine with .take or .interruptAfter
final ticks = Rill.repeatEval(IO.delay(() => DateTime.now()));
// compile.toIList runs the Rill and collects all elements: IO<IList<O>>
// compile.drain runs the Rill and discards results: IO<Unit>
return fromList
.map((int n) => n * 2)
.compile
.toIList
.flatMap((IList<int> xs) => IO.print('doubled: $xs'));
}Other useful constructors:
| Constructor | Description |
|---|---|
Rill.chunk(chunk) | Emit an entire Chunk<O> |
Rill.unfold(s, f) | Stateful generation: S → Option<(O, S)> |
Rill.unfoldEval(s, f) | Same but f returns IO<Option<(O, S)>> |
Rill.fromStream(stream) | Wrap a Dart Stream |
Rill.resource(resource) | Emit the value produced by a Resource |
Rill.fixedRate(period) | Emit Unit at a fixed interval |
Rill.never | A rill that never emits or terminates |
Transforming a Rill
IO<Unit> rillTransform() => Rill.range(1, 11) // 1 2 3 4 5 6 7 8 9 10
.filter((int n) => n.isOdd) // 1 3 5 7 9
.map((int n) => n * n) // 1 9 25 49 81
.take(3) // 1 9 25
.scan(0, (int acc, int n) => acc + n) // running sum: 0 1 10 35
.compile
.toIList
.flatMap((IList<int> xs) => IO.print('sums: $xs')); // [0, 1, 10, 35]scan is an inclusive prefix-fold: it emits the initial accumulator followed by each intermediate result, making the running state visible as a rill element.
A selection of the most commonly used combinators:
| Combinator | Description |
|---|---|
map(f) | Transform each element |
flatMap(f) | Map then flatten (each element expands into a sub-rill) |
filter(pred) | Keep elements satisfying pred |
take(n) / drop(n) | First / skip n elements |
takeWhile(pred) | Take while predicate holds |
scan(init, f) | Emit running accumulated state |
zipWithIndex() | Pair each element with its index |
changes() | Emit only when the value differs from the previous |
concat | Append one rill after another |
INFO
These are only the most common combinators Rill provides. Review the API docs to get the full picture of what is provided.
IO effects in a Rill
IO<Unit> rillEffects() =>
Rill.emits([1, 2, 3, 4, 5])
// evalTap: execute an IO per element and pass the element through unchanged
.evalTap((int n) => IO.print('input: $n'))
// evalMap: replace each element with the result of an IO
.evalMap((int n) => IO.pure(n * n))
.evalTap((int n) => IO.print('squared: $n'))
.compile
.drain;evalTap is the rill equivalent of flatTap on IO — it executes a side-effect per element and passes the original element downstream unchanged. Use it for logging, metrics, or state updates that should not alter the rill type.
Resource safety
Rill.bracket(acquire, release) ties a resource's lifetime to the rill's lifetime. The release action runs unconditionally when the rill ends — whether it completes normally, raises an error, or is canceled by the fiber runtime.
IO<Unit> rillResource() => Rill.bracket(
// acquire: runs when the stream starts
IO.print('opened connection').productR(IO.pure('conn')),
// release: always runs when the stream ends — success, error, or cancelation
(String conn) => IO.print('closed: $conn'),
)
.flatMap(
(String conn) => Rill.emits([
'query-1',
'query-2',
'query-3',
]).evalMap((String q) => IO.pure('[$conn] $q result')),
)
.compile
.toIList
.flatMap((IList<String> results) => IO.print('results: $results'));TIP
For attaching a finalizer at an arbitrary point in the pipeline rather than at the source, use rill.onFinalize(io). For access to the ExitCase (succeeded / errored / canceled), use rill.onFinalizeCase(f).
Interruption
Any Rill can be stopped early — cleanly, without leaking resources — using one of the interrupt combinators. When a rill is interrupted, all bracket finalizers and onFinalize hooks still run, exactly as they do on normal completion.
| Combinator | Stops when |
|---|---|
interruptAfter(duration) | A fixed duration elapses |
interruptWhen(io) | An IO<B> completes (success or error) |
interruptWhenTrue(rill) | A Rill<bool> emits true |
interruptWhenSignaled(signal) | A Signal<bool> becomes true |
// interruptAfter: stop the stream after a fixed wall-clock duration.
// Any elements already emitted are kept; the stream ends cleanly.
IO<IList<int>> interruptAfterExample() =>
Rill.repeatEval(IO.delay(() => DateTime.now().millisecond))
.metered(20.milliseconds) // emit one value every 20 ms
.interruptAfter(110.milliseconds) // stop after ~110 ms ≈ 5 elements
.compile
.toIList;
// interruptWhen: stop the stream when an external IO completes.
// The IO can be a Deferred completed by another fiber — a clean shutdown signal.
IO<IList<int>> interruptWhenExample() => IO.deferred<Unit>().flatMap((stop) {
final source = Rill.range(0, 1000)
.evalMap((int n) => IO.sleep(20.milliseconds).as(n))
.interruptWhen(stop.value()); // halts as soon as stop is completed
// Complete `stop` from outside after 3 elements have had time to arrive
final trigger = IO.sleep(70.milliseconds).productR(stop.complete(Unit()));
return IO.both(source.compile.toIList, trigger).map((t) => t.$1);
});interruptWhen(io) is the most flexible form. Passing a Deferred.value() as the signal gives any other fiber a handle to stop the rill on demand — a clean alternative to canceling the whole fiber. The rill drains whatever it has already produced before halting.
Concurrency with parJoin
parEvalMap covers the common case of applying one IO-valued function to each element concurrently. For more general concurrency — where each element expands into a multi-element sub-rill — use parJoin.
parJoin lives on Rill<Rill<O>>. It subscribes to up to maxOpen inner rills simultaneously and merges their outputs into a single rill. Results arrive in completion order: whichever inner rill emits next appears next, regardless of its position in the outer rill.
Fixed set of concurrent rills
/// Run [inner] streams concurrently and merge their outputs.
///
/// [parJoin(n)] subscribes to at most [n] inner streams simultaneously.
/// Results arrive in **completion order** — faster inner streams
/// appear first regardless of their position in the outer stream.
IO<IList<String>> parJoinExample() {
Rill<String> source(String name, int delayMs, int count) => Rill.range(
1,
count + 1,
).evalMap((int n) => IO.sleep(Duration(milliseconds: delayMs)).as('$name-$n'));
return Rill.emits([
source('slow', 40, 3), // emits at 40 ms intervals
source('medium', 20, 4), // emits at 20 ms intervals
source('fast', 10, 2), // emits at 10 ms intervals
])
.parJoin(3) // run all three concurrently
.compile
.toIList; // elements arrive interleaved by speed
}All three sources run concurrently from the moment parJoin starts. Fast sources do not wait for slow ones; the merged rill ends only after every inner rill has completed.
Dynamic concurrency with flatMap + parJoin
flatMap + parJoin is the general pattern when the number of inner rills is not known up front or each outer element should fan out into multiple values.
/// [flatMap] produces one inner stream per element.
/// [parJoin] then runs those inner streams concurrently.
///
/// This is the general pattern behind [parEvalMap] — use it directly
/// when each element should expand into a multi-element sub-stream.
IO<IList<String>> dynamicParJoin() =>
Rill.range(1, 6)
.map(
(int id) =>
// Each element fans out into its own sub-stream of events
Rill.range(1, 4).evalMap(
(int ev) => IO.sleep(Duration(milliseconds: (6 - id) * 15)).as('source-$id/event-$ev'),
),
)
.parJoin(3) // at most 3 sub-streams active at once
.compile
.toIList;INFO
rill.parEvalMap(n, f) is exactly rill.map((o) => Rill.eval(f(o))).parJoin(n). Use parEvalMap for the one-IO-per-element case; reach for flatMap + parJoin when each element should produce a full sub-rill.
Error propagation and cancelation
If any inner rill raises an error, parJoin immediately cancels all remaining inner rills and propagates the error to the outer rill. If the outer rill itself is interrupted (e.g. via interruptAfter), all running inner rills are canceled before the combined rill terminates.
parJoinUnbounded() is a convenience alias for parJoin(Integer.MaxValue) — use it when you know the number of inner rills is bounded by the outer rill and you do not need an explicit cap.
Real-world example: parallel fetch pipeline
A common pattern is to fan out work across many concurrent operations while keeping the number of simultaneous in-flight requests bounded — preventing resource exhaustion without sacrificing throughput.
parEvalMap(maxConcurrent, f) runs f on up to maxConcurrent elements simultaneously and emits results in request order, so the downstream pipeline sees a consistent ordering even though the work runs out-of-order.
/// Simulate an async lookup — in practice an HTTP call or database query.
IO<String> fetchRecord(int id) =>
IO.sleep(Duration(milliseconds: 20 + (id * 7) % 50)).productR(IO.pure('record_$id'));
/// Fetch [count] records with at most [maxConcurrent] in-flight at once.
///
/// Results are emitted in **request order** regardless of which fetch
/// finishes first. The running tally is tracked with a [Ref].
IO<IList<String>> parallelFetchPipeline({int count = 10, int maxConcurrent = 3}) => IO
.ref(0)
.flatMap(
(Ref<int> completed) =>
Rill.range(1, count + 1)
// Dispatch up to [maxConcurrent] fetches concurrently.
// Back-pressure prevents unbounded in-flight work.
.parEvalMap(maxConcurrent, fetchRecord)
// evalTap runs sequentially on the ordered output.
.evalTap(
(String record) => completed
.modify((int n) => (n + 1, n + 1))
.flatMap((int n) => IO.print('[$n/$count] $record')),
)
.compile
.toIList,
);Because every step is an IO, the progress counter in the Ref is updated atomically and the log output is correctly sequenced. The same code that counts, logs, and collects results would be awkward to write correctly with Stream and async/await — with Rill it follows directly from the compositionality of IO.
TIP
For cases where ordering does not matter and throughput is the priority, swap parEvalMap for parEvalMapUnordered. To run a background rill concurrently with the main pipeline, see rill.concurrently(other).