Skip to main content

Channel

A Channel<A> is a concurrent queue that connects a producer fiber to a consumer fiber. Producers call send to enqueue items; the consumer reads them back as a Rill<A>. When the producer is done, it closes the channel and the consumer's stream completes naturally.

abstract mixin class Channel<A> {
IO<Either<ChannelClosed, Unit>> send(A a);
IO<Either<ChannelClosed, bool>> trySend(A a);
IO<Either<ChannelClosed, Unit>> close();
Rill<A> get rill;
// ...
}

When to use a Channel

A Channel is the right tool when you need to decouple a producer from a consumer running in separate fibers:

  • Aggregating events from multiple concurrent services into a single stream.
  • Feeding work items into a stream-based processing pipeline from an IO-based producer.
  • Bridging callback-style or event-driven code into a Rill.

When you just need to transform or combine streams that are already Rill values, use stream operators like merge, parEvalMap, or flatMap instead.

Creating a Channel

Three constructors cover the common capacity strategies:

IO<Unit> channelCreate() {
// bounded(n): producers suspend when n items are waiting — natural backpressure.
final bounded = Channel.bounded<String>(16);

// synchronous(): equivalent to bounded(0). Every send blocks until a consumer
// is ready, providing strict hand-off semantics between fibers.
final synchronous = Channel.synchronous<String>();

// unbounded(): never suspends producers. Use carefully — a burst of sends can
// queue arbitrarily many items before the consumer catches up.
final unbounded = Channel.unbounded<String>();

return bounded.flatMap((_) => synchronous).flatMap((_) => unbounded).voided();
}

In most cases, bounded with a modest capacity (16–256) is the right default.

Sending values

send — suspending send

send enqueues one value. If the channel is at capacity, the calling fiber suspends until a slot opens. If the channel is already closed it returns Left(ChannelClosed) immediately rather than blocking.

IO<Unit> channelSend() {
return Channel.bounded<int>(4).flatMap((Channel<int> channel) {
// send() suspends the calling fiber when the channel is at capacity.
// It resumes automatically once the consumer drains at least one slot.
// Returns Left(ChannelClosed) if the channel has already been closed.
final sendOne = channel
.send(42)
.flatMap(
(Either<ChannelClosed, Unit> result) => result.fold(
(ChannelClosed _) => IO.print('channel was closed before send'),
(Unit _) => IO.print('42 sent successfully'),
),
);

// closeWithElement sends a final value and closes in one atomic step.
final sendFinal = channel
.closeWithElement(99)
.flatMap(
(Either<ChannelClosed, Unit> result) => result.fold(
(ChannelClosed _) => IO.print('already closed'),
(Unit _) => IO.print('99 sent and channel closed'),
),
);

// Consume both values, then we are done.
final consume = channel.rill.take(2).compile.drain;

return sendOne.productR(() => sendFinal).productR(() => consume);
});
}

closeWithElement sends a final value and closes the channel atomically — no separate close() call is needed at the end of a fixed sequence.

trySend — non-blocking send

trySend never suspends. It returns Right(true) when the value was queued, Right(false) when the channel is full, and Left(ChannelClosed) when the channel has already been closed. Use it when dropping items under back-pressure is acceptable.

IO<Unit> channelTrySend() {
return Channel.bounded<int>(2).flatMap((Channel<int> channel) {
// trySend never suspends. Returns Right(true) if the value was queued,
// Right(false) if the channel is full, or Left(ChannelClosed) if closed.
return IList.range(0, 5).traverseIO_((int i) {
return channel
.trySend(i)
.flatMap(
(Either<ChannelClosed, bool> result) => result.fold(
(ChannelClosed _) => IO.print('$i: channel closed'),
(bool queued) => IO.print('$i: ${queued ? "sent" : "dropped (full)"}'),
),
);
});
// output:
// 0: sent
// 1: sent
// 2: dropped (full)
// 3: dropped (full)
// 4: dropped (full)
});
}

sendAll — pipe a Rill into a Channel

sendAll is a Pipe<A, Never> that feeds every element of a source stream into the channel and closes the channel automatically when the source completes.

IO<Unit> channelSendAll() {
return Channel.bounded<int>(8).flatMap((Channel<int> channel) {
// sendAll is a Pipe<A, Never> that feeds every element of a Rill into
// the channel. The channel is automatically closed when the source
// stream completes — no manual close() call required.
final producer = Rill.range(1, 6).through(channel.sendAll).compile.drain;

// Consumer runs concurrently with the producer.
final consumer = channel.rill.compile.toIList;

return IO
.both<Unit, IList<int>>(producer, consumer)
.flatMap(
(t) => IO.print('received: ${t.$2}'),
); // received: IList(1, 2, 3, 4, 5)
});
}

Because sendAll returns a Pipe, you compose it with Rill.through. The Never output type signals that this pipe consumes elements and produces none — you run it with .compile.drain.

Consuming a Channel

channel.rill is an ordinary Rill<A> that emits every enqueued element in order and completes as soon as the channel is closed and all buffered items have been delivered. You can apply any Rill operator to it:

channel.rill.map(transform).compile.toIList
channel.rill.take(10).compile.drain
channel.rill.through(Pipes.text.utf8.decode).compile.toIList

Real-world example: log aggregation

// Collect log events from three concurrent services into a single list.
//
// Each service sends its events independently via `channel.send`. The bounded
// channel (capacity 4) provides backpressure: a service that gets ahead of the
// consumer will suspend rather than flood memory. When all services finish,
// the channel is closed and the consumer returns everything it received.
IO<IList<String>> aggregateLogs() {
final authEvents = ilist(['auth: login', 'auth: token refreshed']);
final apiEvents = ilist(['api: GET /users', 'api: POST /orders', 'api: GET /items']);
final dbEvents = ilist(['db: SELECT executed', 'db: cache miss', 'db: INSERT executed']);

return Channel.bounded<String>(4).flatMap((Channel<String> channel) {
// Producers: each service sends its events concurrently.
// parTraverseIO_ starts all three fibers and waits for all to finish.
final producers = IList.fromDart([authEvents, apiEvents, dbEvents]).parTraverseIO_(
(IList<String> events) => events.traverseIO_((String e) => channel.send(e)),
);

// Consumer: materialise the channel's Rill into a list.
// The Rill completes once the channel is closed.
final consumer = channel.rill.compile.toIList;

// Run producers and consumer together. Close the channel as soon as
// all producers finish so the consumer knows when to stop.
return IO.both(producers.productR(() => channel.close()), consumer).map((t) => t.$2);
// Returns all 8 log lines (order varies by scheduling).
});
}

Three services produce log events concurrently via parTraverseIO_. The bounded channel (capacity 4) provides backpressure: a fast service suspends rather than flooding memory. Once all producers finish, channel.close() is called so the consumer knows when to stop. IO.both runs producers and consumer concurrently and waits for both to complete.