Skip to content

Rill

Rill<O> is Ribs' purely functional, effectful stream type. It lazily describes a sequence of zero or more values of type O, where producing each value may involve arbitrary IO effects.

Motivation: Why not Dart's Stream?

Dart's built-in Stream is fine for simple event handling, but its properties make it unsuitable for purely functional programs.

Eagerness. Subscribing to a Stream starts effects immediately — you cannot pass a stream around as a value without the work already being in flight.

dart
// The HTTP request starts the moment this line runs
final stream = Stream.fromFuture(fetchUserFromDatabase(id));

Single subscription. Most Dart streams can only be listened to once. Reusing or composing them requires broadcasting boilerplate.

Manual resource management. There is no built-in way to tie a resource's lifetime — a file, a socket, a database connection — to a stream's lifecycle. Forgetting to close a StreamController or cancel a subscription silently leaks resources.

Awkward IO integration. Mixing async/await with stream combinators requires leaving the pure model, resulting in scattered callbacks and async* generators.

Rill addresses all of these:

Dart StreamRill
EvaluationEager — effects start on subscribeLazy — just a description
Referential transparencyNoYes
Resource safetyManualBuilt-in bracket
Effect modelasync/await + callbacksIO throughout
ReuseSingle-subscription by defaultCompile as many times as needed
ConcurrencyLimitedFine-grained parEvalMap and merge

What is a Rill?

A Rill<O> is a lazy, pure description of a stream of O values. Nothing runs until you call .compile and then execute the resulting IO:

dart
// A Rill is inert — no effects have run
final rill = Rill.range(1, 6).map((int n) => n * 2);

// compile produces an IO that, when run, collects the elements
final io = rill.compile.toIList;

// The actual work only happens when the IO is run (e.g. unsafeRunFuture())

Because a Rill is referentially transparent, you can pass it to functions, store it in a variable, and compile it more than once — each compilation produces an independent IO that runs the full pipeline from scratch.

Internally, Rill<O> wraps Pull<O, Unit>, a trampolined computation that emits Chunk<O> values and is stack-safe regardless of rill length. You rarely work with Pull directly; Rill exposes a high-level API on top of it.

Relationship to IO

Every terminal operation on a Rill returns an IO:

TerminalReturnsDescription
compile.toIListIO<IList<O>>Collect all elements
compile.drainIO<Unit>Run for effects, discard results
compile.fold(init, f)IO<B>Reduce to a single value
compile.countIO<int>Count elements
compile.lastIO<Option<O>>Get the last element

This means a Rill pipeline always runs inside IO's fiber system, inheriting IO's cancelation, error handling, and concurrency semantics. It composes naturally with every other IO primitive: Ref, Deferred, Semaphore, etc.

Effects within the rill are expressed with evalMap and evalTap, which lift IO actions into the pipeline without breaking the pure model.


Creating a Rill

dart
IO<Unit> rillCreate() {
  // Rill.emits: emit from a fixed Dart list
  final fromList = Rill.emits([1, 2, 3, 4, 5]);

  // Rill.range: lazily generate integers in [start, stopExclusive)
  final range = Rill.range(1, 11);

  // Rill.eval: embed a single IO as a one-element stream
  final fromIO = Rill.eval(IO.delay(() => DateTime.now().millisecondsSinceEpoch));

  // Rill.repeatEval: repeat an IO forever — combine with .take or .interruptAfter
  final ticks = Rill.repeatEval(IO.delay(() => DateTime.now()));

  // compile.toIList runs the Rill and collects all elements: IO<IList<O>>
  // compile.drain runs the Rill and discards results:        IO<Unit>
  return fromList
      .map((int n) => n * 2)
      .compile
      .toIList
      .flatMap((IList<int> xs) => IO.print('doubled: $xs'));
}

Other useful constructors:

ConstructorDescription
Rill.chunk(chunk)Emit an entire Chunk<O>
Rill.unfold(s, f)Stateful generation: S → Option<(O, S)>
Rill.unfoldEval(s, f)Same but f returns IO<Option<(O, S)>>
Rill.fromStream(stream)Wrap a Dart Stream
Rill.resource(resource)Emit the value produced by a Resource
Rill.fixedRate(period)Emit Unit at a fixed interval
Rill.neverA rill that never emits or terminates

Transforming a Rill

dart
IO<Unit> rillTransform() => Rill.range(1, 11) // 1  2  3  4  5  6  7  8  9  10
    .filter((int n) => n.isOdd) // 1  3  5  7  9
    .map((int n) => n * n) // 1  9  25 49 81
    .take(3) // 1  9  25
    .scan(0, (int acc, int n) => acc + n) // running sum: 0  1  10  35
    .compile
    .toIList
    .flatMap((IList<int> xs) => IO.print('sums: $xs')); // [0, 1, 10, 35]

scan is an inclusive prefix-fold: it emits the initial accumulator followed by each intermediate result, making the running state visible as a rill element.

A selection of the most commonly used combinators:

CombinatorDescription
map(f)Transform each element
flatMap(f)Map then flatten (each element expands into a sub-rill)
filter(pred)Keep elements satisfying pred
take(n) / drop(n)First / skip n elements
takeWhile(pred)Take while predicate holds
scan(init, f)Emit running accumulated state
zipWithIndex()Pair each element with its index
changes()Emit only when the value differs from the previous
concatAppend one rill after another

INFO

These are only the most common combinators Rill provides. Review the API docs to get the full picture of what is provided.

IO effects in a Rill

dart
IO<Unit> rillEffects() =>
    Rill.emits([1, 2, 3, 4, 5])
        // evalTap: execute an IO per element and pass the element through unchanged
        .evalTap((int n) => IO.print('input: $n'))
        // evalMap: replace each element with the result of an IO
        .evalMap((int n) => IO.pure(n * n))
        .evalTap((int n) => IO.print('squared: $n'))
        .compile
        .drain;

evalTap is the rill equivalent of flatTap on IO — it executes a side-effect per element and passes the original element downstream unchanged. Use it for logging, metrics, or state updates that should not alter the rill type.

Resource safety

Rill.bracket(acquire, release) ties a resource's lifetime to the rill's lifetime. The release action runs unconditionally when the rill ends — whether it completes normally, raises an error, or is canceled by the fiber runtime.

dart
IO<Unit> rillResource() => Rill.bracket(
      // acquire: runs when the stream starts
      IO.print('opened connection').productR(IO.pure('conn')),
      // release: always runs when the stream ends — success, error, or cancelation
      (String conn) => IO.print('closed: $conn'),
    )
    .flatMap(
      (String conn) => Rill.emits([
        'query-1',
        'query-2',
        'query-3',
      ]).evalMap((String q) => IO.pure('[$conn] $q result')),
    )
    .compile
    .toIList
    .flatMap((IList<String> results) => IO.print('results: $results'));

TIP

For attaching a finalizer at an arbitrary point in the pipeline rather than at the source, use rill.onFinalize(io). For access to the ExitCase (succeeded / errored / canceled), use rill.onFinalizeCase(f).

Interruption

Any Rill can be stopped early — cleanly, without leaking resources — using one of the interrupt combinators. When a rill is interrupted, all bracket finalizers and onFinalize hooks still run, exactly as they do on normal completion.

CombinatorStops when
interruptAfter(duration)A fixed duration elapses
interruptWhen(io)An IO<B> completes (success or error)
interruptWhenTrue(rill)A Rill<bool> emits true
interruptWhenSignaled(signal)A Signal<bool> becomes true
dart
// interruptAfter: stop the stream after a fixed wall-clock duration.
// Any elements already emitted are kept; the stream ends cleanly.
IO<IList<int>> interruptAfterExample() =>
    Rill.repeatEval(IO.delay(() => DateTime.now().millisecond))
        .metered(20.milliseconds) // emit one value every 20 ms
        .interruptAfter(110.milliseconds) // stop after ~110 ms ≈ 5 elements
        .compile
        .toIList;

// interruptWhen: stop the stream when an external IO completes.
// The IO can be a Deferred completed by another fiber — a clean shutdown signal.
IO<IList<int>> interruptWhenExample() => IO.deferred<Unit>().flatMap((stop) {
  final source = Rill.range(0, 1000)
      .evalMap((int n) => IO.sleep(20.milliseconds).as(n))
      .interruptWhen(stop.value()); // halts as soon as stop is completed

  // Complete `stop` from outside after 3 elements have had time to arrive
  final trigger = IO.sleep(70.milliseconds).productR(stop.complete(Unit()));

  return IO.both(source.compile.toIList, trigger).map((t) => t.$1);
});

interruptWhen(io) is the most flexible form. Passing a Deferred.value() as the signal gives any other fiber a handle to stop the rill on demand — a clean alternative to canceling the whole fiber. The rill drains whatever it has already produced before halting.


Concurrency with parJoin

parEvalMap covers the common case of applying one IO-valued function to each element concurrently. For more general concurrency — where each element expands into a multi-element sub-rill — use parJoin.

parJoin lives on Rill<Rill<O>>. It subscribes to up to maxOpen inner rills simultaneously and merges their outputs into a single rill. Results arrive in completion order: whichever inner rill emits next appears next, regardless of its position in the outer rill.

Fixed set of concurrent rills

dart
/// Run [inner] streams concurrently and merge their outputs.
///
/// [parJoin(n)] subscribes to at most [n] inner streams simultaneously.
/// Results arrive in **completion order** — faster inner streams
/// appear first regardless of their position in the outer stream.
IO<IList<String>> parJoinExample() {
  Rill<String> source(String name, int delayMs, int count) => Rill.range(
    1,
    count + 1,
  ).evalMap((int n) => IO.sleep(Duration(milliseconds: delayMs)).as('$name-$n'));

  return Rill.emits([
        source('slow', 40, 3), // emits at 40 ms intervals
        source('medium', 20, 4), // emits at 20 ms intervals
        source('fast', 10, 2), // emits at 10 ms intervals
      ])
      .parJoin(3) // run all three concurrently
      .compile
      .toIList; // elements arrive interleaved by speed
}

All three sources run concurrently from the moment parJoin starts. Fast sources do not wait for slow ones; the merged rill ends only after every inner rill has completed.

Dynamic concurrency with flatMap + parJoin

flatMap + parJoin is the general pattern when the number of inner rills is not known up front or each outer element should fan out into multiple values.

dart
/// [flatMap] produces one inner stream per element.
/// [parJoin] then runs those inner streams concurrently.
///
/// This is the general pattern behind [parEvalMap] — use it directly
/// when each element should expand into a multi-element sub-stream.
IO<IList<String>> dynamicParJoin() =>
    Rill.range(1, 6)
        .map(
          (int id) =>
          // Each element fans out into its own sub-stream of events
          Rill.range(1, 4).evalMap(
            (int ev) => IO.sleep(Duration(milliseconds: (6 - id) * 15)).as('source-$id/event-$ev'),
          ),
        )
        .parJoin(3) // at most 3 sub-streams active at once
        .compile
        .toIList;

INFO

rill.parEvalMap(n, f) is exactly rill.map((o) => Rill.eval(f(o))).parJoin(n). Use parEvalMap for the one-IO-per-element case; reach for flatMap + parJoin when each element should produce a full sub-rill.

Error propagation and cancelation

If any inner rill raises an error, parJoin immediately cancels all remaining inner rills and propagates the error to the outer rill. If the outer rill itself is interrupted (e.g. via interruptAfter), all running inner rills are canceled before the combined rill terminates.

parJoinUnbounded() is a convenience alias for parJoin(Integer.MaxValue) — use it when you know the number of inner rills is bounded by the outer rill and you do not need an explicit cap.


Real-world example: parallel fetch pipeline

A common pattern is to fan out work across many concurrent operations while keeping the number of simultaneous in-flight requests bounded — preventing resource exhaustion without sacrificing throughput.

parEvalMap(maxConcurrent, f) runs f on up to maxConcurrent elements simultaneously and emits results in request order, so the downstream pipeline sees a consistent ordering even though the work runs out-of-order.

dart
/// Simulate an async lookup — in practice an HTTP call or database query.
IO<String> fetchRecord(int id) =>
    IO.sleep(Duration(milliseconds: 20 + (id * 7) % 50)).productR(IO.pure('record_$id'));

/// Fetch [count] records with at most [maxConcurrent] in-flight at once.
///
/// Results are emitted in **request order** regardless of which fetch
/// finishes first. The running tally is tracked with a [Ref].
IO<IList<String>> parallelFetchPipeline({int count = 10, int maxConcurrent = 3}) => IO
    .ref(0)
    .flatMap(
      (Ref<int> completed) =>
          Rill.range(1, count + 1)
              // Dispatch up to [maxConcurrent] fetches concurrently.
              // Back-pressure prevents unbounded in-flight work.
              .parEvalMap(maxConcurrent, fetchRecord)
              // evalTap runs sequentially on the ordered output.
              .evalTap(
                (String record) => completed
                    .modify((int n) => (n + 1, n + 1))
                    .flatMap((int n) => IO.print('[$n/$count] $record')),
              )
              .compile
              .toIList,
    );

Because every step is an IO, the progress counter in the Ref is updated atomically and the log output is correctly sequenced. The same code that counts, logs, and collects results would be awkward to write correctly with Stream and async/await — with Rill it follows directly from the compositionality of IO.

TIP

For cases where ordering does not matter and throughput is the priority, swap parEvalMap for parEvalMapUnordered. To run a background rill concurrently with the main pipeline, see rill.concurrently(other).