Queue
Queue<A> is a purely functional, concurrent queue. Every operation —
enqueuing an item, dequeuing an item, checking the size — is expressed as
an IO effect, so queues compose naturally with the rest of an IO program
and are safe to share across concurrent fibers.
Why a functional queue?
Mutable queues from dart:collection expose methods that perform effects
immediately. That makes them hard to compose, test, or reason about in a
program built around IO:
// Imperative: the effect happens now, unconditionally
final q = Queue<int>();
q.add(42); // side effect — no way to defer, retry, or cancel this
A ribs Queue defers every operation inside IO. Adding an item is a
description of an add that can be composed, passed to other functions,
or wrapped in error handling before it ever runs:
// Functional: the effect is described, not performed
final program = Queue.bounded<int>(10).flatMap((q) => q.offer(42));
Beyond composability, ribs queues provide two properties that are essential for concurrent programs:
- Fiber-safe. Any number of producer and consumer fibers can access the same queue simultaneously — all operations are atomic.
- Backpressure by default. A bounded queue suspends a producer fiber when full and suspends a consumer fiber when empty, rather than throwing or silently dropping data.
Queue variants
| Constructor | Behavior when full |
|---|---|
Queue.bounded(n) | Suspends the producer until a slot opens |
Queue.unbounded() | Never suspends the producer |
Queue.dropping(n) | Drops the new item; producer returns false via tryOffer |
Queue.circularBuffer(n) | Drops the oldest item to make room |
Queue.synchronous() | No internal buffer; producer suspends until a consumer is ready |
Core operations
| Method | Returns | Blocks? | Description |
|---|---|---|---|
offer(a) | IO<Unit> | Yes, if full | Enqueue an item |
tryOffer(a) | IO<bool> | No | Enqueue if capacity is available |
take() | IO<A> | Yes, if empty | Dequeue the next item |
tryTake() | IO<Option<A>> | No | Dequeue if an item is available |
tryTakeN(maxN) | IO<IList<A>> | No | Dequeue up to N items at once |
size() | IO<int> | No | Current number of items |
Basic usage
IO<Unit> basicQueueExample() => Queue.bounded<int>(10).flatMap(
(queue) => queue
.offer(1)
.flatMap((_) => queue.offer(2))
.flatMap((_) => queue.offer(3))
.flatMap((_) => queue.take())
.flatMap((a) => IO.print('took: $a')),
); // took: 1
Backpressure in action
With a bounded queue, a producer that outpaces its consumers is automatically slowed down — no polling, no sleep loops required.
IO<Unit> backpressureExample() => Queue.bounded<int>(2).flatMap((queue) {
// Producer: the third offer suspends because the queue is full.
// It only unblocks once the consumer takes an element.
final producer = queue
.offer(1)
.flatMap((_) => queue.offer(2))
.flatMap((_) => queue.offer(3)) // suspends here until consumer takes
.flatMap((_) => IO.print('all offered'));
final consumer =
IO
.sleep(100.milliseconds)
.productR(() => queue.take())
.flatMap((n) => IO.print('took: $n'))
.replicate_(3)
.voided();
return IO.both(producer, consumer).voided();
});
The third offer suspends the producer fiber until the consumer takes
an element, freeing a slot. The moment a slot opens, the producer
resumes without any explicit coordination code.
Real-world example: pub-sub
A queue is a natural fit for publish-subscribe message passing. The publisher sends events without knowing who is listening; each subscriber processes events independently at its own pace.
In a pure program, the pattern looks like this:
- Allocate one bounded
Queueper subscriber (each gets its own inbox). - A publisher fiber fans the same event out to every inbox via
offer. - Each subscriber fiber loops on
take, processing events as they arrive. - Everything runs concurrently —
parSequence_starts all fibers and waits for them to finish.
The example below models a temperature sensor publishing readings to two independent subscribers: an alert worker that warns on high temperatures, and a logger that records every reading.
/// A simple pub-sub dispatcher: each call to [subscribe] returns a new
/// [Queue] that independently receives every published event.
IO<Unit> pubSubExample() => IO
.both(
Queue.bounded<double>(16), // alert subscriber's inbox
Queue.bounded<double>(16), // logger subscriber's inbox
)
.flatMap((inboxes) {
final (alertInbox, logInbox) = inboxes;
// Fan-out: deliver the same reading to every subscriber inbox
IO<Unit> publish(double celsius) =>
alertInbox.offer(celsius).flatMap((_) => logInbox.offer(celsius));
// Subscriber 1: raise an alert on high temperatures
final alertWorker =
alertInbox
.take()
.flatMap(
(t) =>
t > 38.0 ? IO.print('ALERT: high temperature — $t°C!') : IO.print('OK: $t°C'),
)
.replicate_(4)
.voided();
// Subscriber 2: record every reading to a log
final logWorker =
logInbox.take().flatMap((t) => IO.print('LOG: reading $t°C')).replicate_(4).voided();
// Publisher: simulate a temperature sensor emitting four readings
final sensor = ilist<double>([36.6, 39.2, 37.1, 38.5]).traverseIO_(publish);
// Run the publisher and both subscribers concurrently.
// parSequence_() starts all three as independent fibers and waits for all to finish.
return ilist([alertWorker, logWorker, sensor]).parSequence_();
});
Because offer suspends if an inbox is full, a slow subscriber naturally
applies backpressure to the publisher — the sensor will pause until the
slow subscriber catches up, preventing unbounded memory growth.
For a higher-level streaming abstraction built on top of Queue, see
the ribs_rill package. Channel wraps a Queue and exposes both a
Rill (stream of taken values) and a send function (IO-based offer),
making producer-consumer pipelines easier to express.