Skip to main content

Topic

A Topic<A> is a publish-subscribe broadcast channel. Unlike Channel, which delivers each item to exactly one consumer, a Topic delivers a copy of every published item to every active subscriber simultaneously.

abstract class Topic<A> {
IO<Either<TopicClosed, Unit>> publish1(A a);
Pipe<A, Never> get publish;

Rill<A> subscribe(int maxQueued);
Resource<Rill<A>> subscribeAwait(int maxQueued);

Rill<int> get subscribers; // stream of active subscriber count
IO<Either<TopicClosed, Unit>> get close;
}

Channel vs Topic

Channel<A>Topic<A>
ConsumersOneMany
Fan-outNoYes — each subscriber gets its own copy
BackpressurePublisher suspends when buffer fullEach subscriber's buffer is independent

Use Topic when multiple independent consumers need to react to the same stream of events — logs, sensor readings, price feeds, UI events. Use Channel when one producer hands work off to one consumer.

Creating a Topic

final topic = await Topic.create<String>().run();

Topic.create() returns IO<Topic<A>> and allocates the internal state. A single topic can accept unlimited subscribers.

Subscribing

subscribe — simple subscription

subscribe(maxQueued) returns a Rill<A> immediately. The maxQueued argument sets the size of this subscriber's internal buffer: if the subscriber falls behind the publisher, the publisher will block once maxQueued items are waiting.

// Subscribe to a topic and collect the first 5 items.
IO<IList<String>> topicBasic() {
return Topic.create<String>().flatMap((Topic<String> topic) {
// subscribe(maxQueued) returns a Rill that receives a copy of every
// published item. maxQueued is the per-subscriber buffer size.
final subscriber = topic.subscribe(16).take(5).compile.toIList;

// publish is a Pipe<A, Never> that forwards a Rill into all subscribers
// and closes the topic when the source stream completes.
final publisher =
Rill.range(1, 6).map((int n) => 'event-$n').through(topic.publish).compile.drain;

// Run publisher and subscriber concurrently; collect subscriber output.
return IO.both<IList<String>, Unit>(subscriber, publisher).map((t) => t.$1);
// => IList('event-1', 'event-2', 'event-3', 'event-4', 'event-5')
});
}

The subscriber runs in the foreground via IO.both while the publisher runs concurrently. The publisher's publish pipe closes the topic automatically when the source stream completes, which terminates all subscriber streams.

subscribeAwait — race-free subscription

subscribeAwait(maxQueued) returns Resource<Rill<A>>. The subscription is registered in the resource acquire phase, so by the time the use body runs — and any concurrent publish call can start — the subscriber is already enrolled. This removes the startup race that would otherwise be possible with subscribe.

// Fan out: two independent subscribers each receive every published item.
IO<(IList<String>, IList<String>)> topicMultiSubscriber() {
return Topic.create<String>().flatMap((Topic<String> topic) {
final messages = Rill.emits(['alpha', 'beta', 'gamma']);

// subscribeAwait returns a Resource<Rill<A>>. The subscription is
// registered inside the Resource acquire phase, guaranteeing the
// subscriber is ready before any publish call can run.
return topic
.subscribeAwait(16)
.flatMap(
(Rill<String> sub1) => topic.subscribeAwait(16).map((Rill<String> sub2) => (sub1, sub2)),
)
.use((subs) {
final (sub1, sub2) = subs;

final collect1 = sub1.take(3).compile.toIList;
final collect2 = sub2.take(3).compile.toIList;

final publish = messages.through(topic.publish).compile.drain;

return IO.both<IList<String>, IList<String>>(collect1, collect2).productL(() => publish);
// Both sub1 and sub2 receive: IList('alpha', 'beta', 'gamma')
});
});
}

Both subscribers receive an independent copy of every message. Closing one subscriber does not affect the other, and each has its own backpressure budget.

Publishing

publish — pipe a Rill

publish is a Pipe<A, Never> that sends every element of a source stream to all active subscribers and closes the topic when the stream completes.

someRill.through(topic.publish).compile.drain

publish1 — single-value publish

publish1 publishes one value imperatively from IO-based producers. It returns Left(TopicClosed) if the topic is already closed.

// publish1 sends a single value imperatively — useful when the source of
// events is IO-based rather than a Rill.
IO<Unit> topicPublish1() {
return Topic.create<int>().flatMap((Topic<int> topic) {
final subscriber = topic.subscribe(16).take(3).compile.toIList;

final producer = IList.range(1, 4).traverseIO_((int n) {
return topic
.publish1(n)
.flatMap(
(Either<TopicClosed, Unit> result) => result.fold(
(TopicClosed _) => IO.print('topic closed, dropping $n'),
(Unit _) => IO.unit,
),
);
});

return IO
.both<IList<int>, Unit>(subscriber, producer.productR(() => topic.close.voided()))
.voided();
});
}

Observing subscriber count

topic.subscribers is a Rill<Int> that emits the current number of active subscribers each time it changes. It is backed by a SignallingRef, so it only emits on changes (like Signal.discrete).

// Log a message whenever the subscriber count changes.
topic.subscribers
.evalMap((int n) => IO.print('subscribers: $n'))
.compile
.drain

Real-world example: sensor fan-out

// A simple event bus: a temperature sensor broadcasts readings to an
// independent logger and alerter running in separate fibers.
IO<(IList<String>, IList<String>)> temperatureMonitor() {
final readings = ilist([18.5, 19.0, 22.3, 31.7, 29.1]);
final count = readings.length;

return Topic.create<double>().flatMap((Topic<double> topic) {
// Logger: formats every reading as a string.
final logger = topic.subscribe(32).take(count).map((double t) => 'temp: $t°C').compile.toIList;

// Alerter: only emits when temperature exceeds the threshold.
final alerter =
topic
.subscribe(32)
.take(count)
.filter((double t) => t > 25.0)
.map((double t) => 'ALERT: high temp $t°C')
.compile
.toIList;

// Publisher: streams the sensor readings, closes topic when done.
final publisher = Rill.emits(readings.toList()).through(topic.publish).compile.drain;

// Run all three concurrently and collect both subscriber outputs.
return IO.both<IList<String>, IList<String>>(logger, alerter).productL<Unit>(() => publisher);
// logger => IList('temp: 18.5°C', 'temp: 19.0°C', ...)
// alerter => IList('ALERT: high temp 31.7°C', 'ALERT: high temp 29.1°C')
});
}

Three fibers run concurrently from a single IO.both call:

  • The publisher streams sensor readings through topic.publish and closes the topic when done.
  • The logger subscribes with a generous buffer and records every reading.
  • The alerter subscribes independently, filters for high-temperature events, and emits alert strings.

Each subscriber's buffer is separate, so a slow alerter does not block the logger. Closing the topic (which publish does automatically on stream completion) terminates both subscriber streams, so compile.toIList returns without hanging.