Skip to content

Queue

Queue<A> is a purely functional, concurrent queue. Every operation — enqueuing an item, dequeuing an item, checking the size — is expressed as an IO effect, so queues compose naturally with the rest of an IO program and are safe to share across concurrent fibers.

Why a functional queue?

Mutable queues from dart:collection expose methods that perform effects immediately. That makes them hard to compose, test, or reason about in a program built around IO.

A ribs-effect Queue defers every operation inside IO. Adding an item is a description of an add that can be composed, passed to other functions, or wrapped in error handling before it ever runs:

dart
// Functional: the effect is described, not performed
final program = Queue.bounded<int>(10).flatMap((q) => q.offer(42));

Beyond composability, ribs-effect queues provide two properties that are essential for concurrent programs:

  • Fiber-safe. Any number of producer and consumer fibers can access the same queue simultaneously — all operations are atomic.
  • Backpressure by default. A bounded queue suspends a producer fiber when full and suspends a consumer fiber when empty, rather than throwing or silently dropping data.

Queue variants

ConstructorBehavior when full
Queue.bounded(n)Suspends the producer until a slot opens
Queue.unbounded()Never suspends the producer
Queue.dropping(n)Drops the new item; producer returns false via tryOffer
Queue.circularBuffer(n)Drops the oldest item to make room
Queue.synchronous()No internal buffer; producer suspends until a consumer is ready

Core operations

MethodReturnsBlocks?Description
offer(a)IO<Unit>Yes, if fullEnqueue an item
tryOffer(a)IO<bool>NoEnqueue if capacity is available
take()IO<A>Yes, if emptyDequeue the next item
tryTake()IO<Option<A>>NoDequeue if an item is available
tryTakeN(maxN)IO<IList<A>>NoDequeue up to N items at once
size()IO<int>NoCurrent number of items

INFO

Note that blocking here is only semantic blocking, the running thread/isolate is not blocked.

Basic usage

dart
IO<Unit> basicQueueExample() => Queue.bounded<int>(10).flatMap(
  (queue) => queue
      .offer(1)
      .flatMap((_) => queue.offer(2))
      .flatMap((_) => queue.offer(3))
      .flatMap((_) => queue.take())
      .flatMap((a) => IO.print('took: $a')),
); // took: 1

Backpressure in action

With a bounded queue, a producer that outpaces its consumers is automatically slowed down — no polling, no sleep loops required.

dart
IO<Unit> backpressureExample() => Queue.bounded<int>(2).flatMap((queue) {
  // Producer: the third offer suspends because the queue is full.
  // It only unblocks once the consumer takes an element.
  final producer = queue
      .offer(1)
      .flatMap((_) => queue.offer(2))
      .flatMap((_) => queue.offer(3)) // suspends here until consumer takes
      .flatMap((_) => IO.print('all offered'));

  final consumer =
      IO
          .sleep(100.milliseconds)
          .productR(queue.take())
          .flatMap((n) => IO.print('took: $n'))
          .replicate_(3)
          .voided();

  return IO.both(producer, consumer).voided();
});

The third offer suspends the producer fiber until the consumer takes an element, freeing a slot. The moment a slot opens, the producer resumes without any explicit coordination code.


Real-world example: pub-sub

A queue is a natural fit for publish-subscribe message passing. The publisher sends events without knowing who is listening; each subscriber processes events independently at its own pace.

In a pure program, the pattern looks like this:

  1. Allocate one bounded Queue per subscriber (each gets its own inbox).
  2. A publisher fiber fans the same event out to every inbox via offer.
  3. Each subscriber fiber loops on take, processing events as they arrive.
  4. Everything runs concurrently — parSequence_ starts all fibers and waits for them to finish.

The example below models a temperature sensor publishing readings to two independent subscribers: an alert worker that warns on high temperatures, and a logger that records every reading.

dart
/// A simple pub-sub dispatcher: each call to [subscribe] returns a new
/// [Queue] that independently receives every published event.
IO<Unit> pubSubExample() => IO
    .both(
      Queue.bounded<double>(16), // alert subscriber's inbox
      Queue.bounded<double>(16), // logger subscriber's inbox
    )
    .flatMap((inboxes) {
      final (alertInbox, logInbox) = inboxes;

      // Fan-out: deliver the same reading to every subscriber inbox
      IO<Unit> publish(double celsius) =>
          alertInbox.offer(celsius).flatMap((_) => logInbox.offer(celsius));

      // Subscriber 1: raise an alert on high temperatures
      final alertWorker =
          alertInbox
              .take()
              .flatMap(
                (t) =>
                    t > 38.0 ? IO.print('ALERT: high temperature — $t°C!') : IO.print('OK: $t°C'),
              )
              .replicate_(4)
              .voided();

      // Subscriber 2: record every reading to a log
      final logWorker =
          logInbox.take().flatMap((t) => IO.print('LOG: reading $t°C')).replicate_(4).voided();

      // Publisher: simulate a temperature sensor emitting four readings
      final sensor = ilist<double>([36.6, 39.2, 37.1, 38.5]).traverseIO_(publish);

      // Run the publisher and both subscribers concurrently.
      // parSequence_() starts all three as independent fibers and waits for all to finish.
      return ilist([alertWorker, logWorker, sensor]).parSequence_();
    });

Because offer suspends if an inbox is full, a slow subscriber naturally applies backpressure to the publisher — the sensor will pause until the slow subscriber catches up, preventing unbounded memory growth.

TIP

For a higher-level streaming abstraction built on top of Queue, see the ribs_rill package. Channel wraps a Queue and exposes both a Rill (stream of taken values) and a send function (IO-based offer), making producer-consumer pipelines easier to express.