Signal
A Signal<A> is observable state — a value that can be read at any time
and also observed as a stream. Where a plain Ref<A> only lets you read and
write, a Signal additionally lets you subscribe to changes via a Rill.
abstract class Signal<A> {
IO<A> value(); // read the current value
Rill<A> get discrete; // stream of changes (deduped)
Rill<A> get continuous; // stream of snapshots (every poll)
IO<Unit> waitUntil(Function1<A, bool> p); // block until condition holds
Signal<B> map<B>(Function1<A, B> f); // derived signal
}
SignallingRef<A> is the concrete read-write implementation. It extends both
Ref<A> and Signal<A>, giving you atomic updates alongside stream
observation.
When to use a Signal
| Use case | Recommended type |
|---|---|
| Cancellation token — stop a rill from outside | Signal<bool> + interruptWhenSignaled |
| Pause / resume a rill | Signal<bool> + pauseWhenSignal |
| Live progress counter observed by multiple fibers | SignallingRef<int> |
| Read-only view of changing state | Signal<A> (from map or holdResource) |
Convert a Rill into always-readable state | Rill.holdResource |
Creating a SignallingRef
SignallingRef.of(initial) returns IO<SignallingRef<A>>. Because
SignallingRef extends Ref, all the familiar atomic operations are
available — update, modify, getAndUpdate, etc.
IO<Unit> signalBasics() {
return SignallingRef.of(0).flatMap((SignallingRef<int> counter) {
// SignallingRef is both a Ref and a Signal — all atomic Ref operations
// (update, modify, getAndUpdate, etc.) are available alongside stream
// observation.
final increment = counter.update((int n) => n + 1);
// value() reads the current value as a one-shot IO.
final readOnce = counter.value();
// map produces a derived Signal without modifying the underlying value.
// The mapping is lazy: it is applied whenever the signal is observed.
final Signal<String> label = counter.map((int n) => 'count=$n');
return increment
.productR(() => increment)
.productR(() => readOnce)
.flatMap((int n) => IO.print('value after two increments: $n'));
// value after two increments: 2
});
}
map produces a derived Signal whose observed value is the result of
applying the function to the underlying signal's current value. The mapping is
lazy — it runs on every read, not at construction time.
Observing changes: discrete vs continuous
discrete — emit on change
discrete emits the current value immediately, then emits again each time the
value changes. If the producer updates the signal faster than the consumer
reads, intermediate values are dropped — the consumer always receives the
latest value, never a stale one.
IO<IList<int>> signalDiscrete() {
return SignallingRef.of(0).flatMap((SignallingRef<int> ref) {
// discrete emits the current value immediately, then emits again only
// when the value changes. If updates happen faster than the consumer
// reads, intermediate values are dropped — the consumer always sees
// the latest available value.
final observer = ref.discrete.take(4).compile.toIList;
// Push three updates concurrently.
final updates = Rill.range(1, 4).evalMap((int n) => ref.setValue(n)).compile.drain;
return IO.both<IList<int>, Unit>(observer, updates).map((t) => t.$1);
// => IList(0, 1, 2, 3) — initial + each update
});
}
Use discrete for event-driven consumers: UI updates, progress logging,
threshold alerts. The guarantee that you always see the latest value (rather
than every intermediate value) is usually exactly what you want.
continuous — emit on every poll
continuous emits the current value on every element poll without waiting for
a change. Use it for polling-based consumers that need a consistent snapshot
rate regardless of how often the underlying value changes.
IO<IList<int>> signalContinuous() {
return SignallingRef.of(42).flatMap((SignallingRef<int> ref) {
// continuous emits the current value on every poll — it does not wait
// for the value to change. Use it for polling-based consumers that
// always want the freshest snapshot, regardless of whether it changed.
final snapshots = ref.continuous.take(3).compile.toIList;
return snapshots;
// => IList(42, 42, 42) — same value, sampled three times
});
}
Cancellation: interruptWhenSignaled
Signal<bool> is the standard cancellation-token pattern for Rill. The
rill does not need to know who signals the stop or when — it simply observes
the signal and terminates when it becomes true.
// interruptWhenSignaled is the standard cancellation-token pattern.
// The stream producer does not need to know who signals the stop or when.
IO<Unit> signalInterrupt() {
return SignallingRef.of(false).flatMap((SignallingRef<bool> stop) {
// This stream runs indefinitely until stop becomes true.
final stream =
Rill.constant<int>(
0,
).scan(0, (int acc, int _) => acc + 1).interruptWhenSignaled(stop).compile.drain;
// The controller lives in a separate fiber and decides when to stop.
final controller = stop.setValue(true);
return IO.both<Unit, Unit>(stream, controller).voided();
});
}
Related rill methods that accept a Signal<bool>:
| Method | Behaviour |
|---|---|
rill.interruptWhenSignaled(signal) | Terminate when signal becomes true |
rill.pauseWhenSignal(signal) | Suspend while signal is true; resume when false |
Waiting for a condition
signal.waitUntil(predicate) is a convenience method that blocks the calling
fiber until the signal's value satisfies the predicate:
// Block until at least 100 items have been processed.
await itemCount.waitUntil((int n) => n >= 100).unsafeRunFuture();
Internally it uses discrete.forall((a) => !p(a)).compile.drain — it consumes
the change stream, discarding values, until one fails the negated predicate.
Turning a Rill into a Signal: holdResource
rill.holdResource(initial) wraps a Rill in a Signal that always holds
the most recently emitted value. The stream runs in a background fiber for the
lifetime of the returned Resource.
// A live-updating signal that always holds the latest temperature reading.
final sensorSignal = temperatureRill.holdResource(0.0);
sensorSignal.use((Signal<double> temp) async {
// read the latest sensor value at any point in the resource scope
final current = await temp.value().unsafeRunFuture();
});
Real-world example: pauseable processor with watchdog
// A streaming processor with external pause and stop controls.
//
// Three concurrent fibers share two signals:
// - `paused` (bool) — when true, processing suspends.
// - `stop` (bool) — when true, processing terminates.
//
// A fourth fiber tracks item counts via a third signal and fires a stop
// once enough items have been processed.
IO<int> pauseableProcessor() {
return SignallingRef.of(false).flatMap((SignallingRef<bool> paused) {
return SignallingRef.of(false).flatMap((SignallingRef<bool> stop) {
return SignallingRef.of(0).flatMap((SignallingRef<int> itemCount) {
// Main processor: processes items from a large range.
// pauseWhenSignal suspends the stream while paused == true.
// interruptWhenSignaled terminates it when stop == true.
final processor =
Rill.range(0, 10000)
.pauseWhenSignal(paused)
.interruptWhenSignaled(stop)
.evalMap((int _) => itemCount.update((int n) => n + 1))
.compile
.drain;
// Progress observer: logs every 25 items processed.
// Also stops when the stop signal fires.
final logger =
itemCount.discrete
.filter((int n) => n > 0 && n % 25 == 0)
.evalMap((int n) => IO.print('processed $n items'))
.interruptWhenSignaled(stop)
.compile
.drain;
// Watchdog: fires the stop signal once 100 items have been
// processed. Uses waitUntil to block until the condition holds.
final watchdog = itemCount
.waitUntil((int n) => n >= 100)
.productR(() => stop.setValue(true));
// Demonstrate pause: briefly pause then resume after 50 items.
final pauseController = itemCount
.waitUntil((int n) => n >= 50)
.productR(() => paused.setValue(true))
.productR(() => paused.setValue(false));
final work = IO.both<Unit, Unit>(processor, logger).voided();
final control = IO.both<Unit, Unit>(watchdog, pauseController).voided();
return IO.both<Unit, Unit>(work, control).productR(() => itemCount.value());
// => 100 (or slightly more due to scheduling)
});
});
});
}
Four fibers share three SignallingRef values:
itemCount— incremented by the processor on every item. Both the logger (viadiscrete) and the watchdog (viawaitUntil) observe it.paused— the pause controller sets it totrueat 50 items then immediately back tofalse, exercising thepauseWhenSignalpath.stop— the watchdog sets it totrueat 100 items, which terminates both the processor and the logger viainterruptWhenSignaled.
IO.both runs the work pair (processor + logger) and the control pair
(watchdog + pause controller) concurrently, then reads the final item count
once all four fibers have finished.