Skip to main content

Signal

A Signal<A> is observable state — a value that can be read at any time and also observed as a stream. Where a plain Ref<A> only lets you read and write, a Signal additionally lets you subscribe to changes via a Rill.

abstract class Signal<A> {
IO<A> value(); // read the current value
Rill<A> get discrete; // stream of changes (deduped)
Rill<A> get continuous; // stream of snapshots (every poll)
IO<Unit> waitUntil(Function1<A, bool> p); // block until condition holds
Signal<B> map<B>(Function1<A, B> f); // derived signal
}

SignallingRef<A> is the concrete read-write implementation. It extends both Ref<A> and Signal<A>, giving you atomic updates alongside stream observation.

When to use a Signal

Use caseRecommended type
Cancellation token — stop a rill from outsideSignal<bool> + interruptWhenSignaled
Pause / resume a rillSignal<bool> + pauseWhenSignal
Live progress counter observed by multiple fibersSignallingRef<int>
Read-only view of changing stateSignal<A> (from map or holdResource)
Convert a Rill into always-readable stateRill.holdResource

Creating a SignallingRef

SignallingRef.of(initial) returns IO<SignallingRef<A>>. Because SignallingRef extends Ref, all the familiar atomic operations are available — update, modify, getAndUpdate, etc.

IO<Unit> signalBasics() {
return SignallingRef.of(0).flatMap((SignallingRef<int> counter) {
// SignallingRef is both a Ref and a Signal — all atomic Ref operations
// (update, modify, getAndUpdate, etc.) are available alongside stream
// observation.
final increment = counter.update((int n) => n + 1);

// value() reads the current value as a one-shot IO.
final readOnce = counter.value();

// map produces a derived Signal without modifying the underlying value.
// The mapping is lazy: it is applied whenever the signal is observed.
final Signal<String> label = counter.map((int n) => 'count=$n');

return increment
.productR(() => increment)
.productR(() => readOnce)
.flatMap((int n) => IO.print('value after two increments: $n'));
// value after two increments: 2
});
}

map produces a derived Signal whose observed value is the result of applying the function to the underlying signal's current value. The mapping is lazy — it runs on every read, not at construction time.

Observing changes: discrete vs continuous

discrete — emit on change

discrete emits the current value immediately, then emits again each time the value changes. If the producer updates the signal faster than the consumer reads, intermediate values are dropped — the consumer always receives the latest value, never a stale one.

IO<IList<int>> signalDiscrete() {
return SignallingRef.of(0).flatMap((SignallingRef<int> ref) {
// discrete emits the current value immediately, then emits again only
// when the value changes. If updates happen faster than the consumer
// reads, intermediate values are dropped — the consumer always sees
// the latest available value.
final observer = ref.discrete.take(4).compile.toIList;

// Push three updates concurrently.
final updates = Rill.range(1, 4).evalMap((int n) => ref.setValue(n)).compile.drain;

return IO.both<IList<int>, Unit>(observer, updates).map((t) => t.$1);
// => IList(0, 1, 2, 3) — initial + each update
});
}

Use discrete for event-driven consumers: UI updates, progress logging, threshold alerts. The guarantee that you always see the latest value (rather than every intermediate value) is usually exactly what you want.

continuous — emit on every poll

continuous emits the current value on every element poll without waiting for a change. Use it for polling-based consumers that need a consistent snapshot rate regardless of how often the underlying value changes.

IO<IList<int>> signalContinuous() {
return SignallingRef.of(42).flatMap((SignallingRef<int> ref) {
// continuous emits the current value on every poll — it does not wait
// for the value to change. Use it for polling-based consumers that
// always want the freshest snapshot, regardless of whether it changed.
final snapshots = ref.continuous.take(3).compile.toIList;

return snapshots;
// => IList(42, 42, 42) — same value, sampled three times
});
}

Cancellation: interruptWhenSignaled

Signal<bool> is the standard cancellation-token pattern for Rill. The rill does not need to know who signals the stop or when — it simply observes the signal and terminates when it becomes true.

// interruptWhenSignaled is the standard cancellation-token pattern.
// The stream producer does not need to know who signals the stop or when.
IO<Unit> signalInterrupt() {
return SignallingRef.of(false).flatMap((SignallingRef<bool> stop) {
// This stream runs indefinitely until stop becomes true.
final stream =
Rill.constant<int>(
0,
).scan(0, (int acc, int _) => acc + 1).interruptWhenSignaled(stop).compile.drain;

// The controller lives in a separate fiber and decides when to stop.
final controller = stop.setValue(true);

return IO.both<Unit, Unit>(stream, controller).voided();
});
}

Related rill methods that accept a Signal<bool>:

MethodBehaviour
rill.interruptWhenSignaled(signal)Terminate when signal becomes true
rill.pauseWhenSignal(signal)Suspend while signal is true; resume when false

Waiting for a condition

signal.waitUntil(predicate) is a convenience method that blocks the calling fiber until the signal's value satisfies the predicate:

// Block until at least 100 items have been processed.
await itemCount.waitUntil((int n) => n >= 100).unsafeRunFuture();

Internally it uses discrete.forall((a) => !p(a)).compile.drain — it consumes the change stream, discarding values, until one fails the negated predicate.

Turning a Rill into a Signal: holdResource

rill.holdResource(initial) wraps a Rill in a Signal that always holds the most recently emitted value. The stream runs in a background fiber for the lifetime of the returned Resource.

// A live-updating signal that always holds the latest temperature reading.
final sensorSignal = temperatureRill.holdResource(0.0);

sensorSignal.use((Signal<double> temp) async {
// read the latest sensor value at any point in the resource scope
final current = await temp.value().unsafeRunFuture();
});

Real-world example: pauseable processor with watchdog

// A streaming processor with external pause and stop controls.
//
// Three concurrent fibers share two signals:
// - `paused` (bool) — when true, processing suspends.
// - `stop` (bool) — when true, processing terminates.
//
// A fourth fiber tracks item counts via a third signal and fires a stop
// once enough items have been processed.
IO<int> pauseableProcessor() {
return SignallingRef.of(false).flatMap((SignallingRef<bool> paused) {
return SignallingRef.of(false).flatMap((SignallingRef<bool> stop) {
return SignallingRef.of(0).flatMap((SignallingRef<int> itemCount) {
// Main processor: processes items from a large range.
// pauseWhenSignal suspends the stream while paused == true.
// interruptWhenSignaled terminates it when stop == true.
final processor =
Rill.range(0, 10000)
.pauseWhenSignal(paused)
.interruptWhenSignaled(stop)
.evalMap((int _) => itemCount.update((int n) => n + 1))
.compile
.drain;

// Progress observer: logs every 25 items processed.
// Also stops when the stop signal fires.
final logger =
itemCount.discrete
.filter((int n) => n > 0 && n % 25 == 0)
.evalMap((int n) => IO.print('processed $n items'))
.interruptWhenSignaled(stop)
.compile
.drain;

// Watchdog: fires the stop signal once 100 items have been
// processed. Uses waitUntil to block until the condition holds.
final watchdog = itemCount
.waitUntil((int n) => n >= 100)
.productR(() => stop.setValue(true));

// Demonstrate pause: briefly pause then resume after 50 items.
final pauseController = itemCount
.waitUntil((int n) => n >= 50)
.productR(() => paused.setValue(true))
.productR(() => paused.setValue(false));

final work = IO.both<Unit, Unit>(processor, logger).voided();

final control = IO.both<Unit, Unit>(watchdog, pauseController).voided();

return IO.both<Unit, Unit>(work, control).productR(() => itemCount.value());
// => 100 (or slightly more due to scheduling)
});
});
});
}

Four fibers share three SignallingRef values:

  • itemCount — incremented by the processor on every item. Both the logger (via discrete) and the watchdog (via waitUntil) observe it.
  • paused — the pause controller sets it to true at 50 items then immediately back to false, exercising the pauseWhenSignal path.
  • stop — the watchdog sets it to true at 100 items, which terminates both the processor and the logger via interruptWhenSignaled.

IO.both runs the work pair (processor + logger) and the control pair (watchdog + pause controller) concurrently, then reads the final item count once all four fibers have finished.