Topic
A Topic<A> is a publish-subscribe broadcast channel. Unlike Channel,
which delivers each item to exactly one consumer, a Topic delivers a copy of
every published item to every active subscriber simultaneously.
abstract class Topic<A> {
IO<Either<TopicClosed, Unit>> publish1(A a);
Pipe<A, Never> get publish;
Rill<A> subscribe(int maxQueued);
Resource<Rill<A>> subscribeAwait(int maxQueued);
Rill<int> get subscribers; // stream of active subscriber count
IO<Either<TopicClosed, Unit>> get close;
}
Channel vs Topic
Channel<A> | Topic<A> | |
|---|---|---|
| Consumers | One | Many |
| Fan-out | No | Yes — each subscriber gets its own copy |
| Backpressure | Publisher suspends when buffer full | Each subscriber's buffer is independent |
Use Topic when multiple independent consumers need to react to the same stream
of events — logs, sensor readings, price feeds, UI events. Use Channel when
one producer hands work off to one consumer.
Creating a Topic
final topic = await Topic.create<String>().run();
Topic.create() returns IO<Topic<A>> and allocates the internal state. A
single topic can accept unlimited subscribers.
Subscribing
subscribe — simple subscription
subscribe(maxQueued) returns a Rill<A> immediately. The maxQueued
argument sets the size of this subscriber's internal buffer: if the subscriber
falls behind the publisher, the publisher will block once maxQueued items are
waiting.
// Subscribe to a topic and collect the first 5 items.
IO<IList<String>> topicBasic() {
return Topic.create<String>().flatMap((Topic<String> topic) {
// subscribe(maxQueued) returns a Rill that receives a copy of every
// published item. maxQueued is the per-subscriber buffer size.
final subscriber = topic.subscribe(16).take(5).compile.toIList;
// publish is a Pipe<A, Never> that forwards a Rill into all subscribers
// and closes the topic when the source stream completes.
final publisher =
Rill.range(1, 6).map((int n) => 'event-$n').through(topic.publish).compile.drain;
// Run publisher and subscriber concurrently; collect subscriber output.
return IO.both<IList<String>, Unit>(subscriber, publisher).map((t) => t.$1);
// => IList('event-1', 'event-2', 'event-3', 'event-4', 'event-5')
});
}
The subscriber runs in the foreground via IO.both while the publisher runs
concurrently. The publisher's publish pipe closes the topic automatically
when the source stream completes, which terminates all subscriber streams.
subscribeAwait — race-free subscription
subscribeAwait(maxQueued) returns Resource<Rill<A>>. The subscription is
registered in the resource acquire phase, so by the time the use body
runs — and any concurrent publish call can start — the subscriber is already
enrolled. This removes the startup race that would otherwise be possible with
subscribe.
// Fan out: two independent subscribers each receive every published item.
IO<(IList<String>, IList<String>)> topicMultiSubscriber() {
return Topic.create<String>().flatMap((Topic<String> topic) {
final messages = Rill.emits(['alpha', 'beta', 'gamma']);
// subscribeAwait returns a Resource<Rill<A>>. The subscription is
// registered inside the Resource acquire phase, guaranteeing the
// subscriber is ready before any publish call can run.
return topic
.subscribeAwait(16)
.flatMap(
(Rill<String> sub1) => topic.subscribeAwait(16).map((Rill<String> sub2) => (sub1, sub2)),
)
.use((subs) {
final (sub1, sub2) = subs;
final collect1 = sub1.take(3).compile.toIList;
final collect2 = sub2.take(3).compile.toIList;
final publish = messages.through(topic.publish).compile.drain;
return IO.both<IList<String>, IList<String>>(collect1, collect2).productL(() => publish);
// Both sub1 and sub2 receive: IList('alpha', 'beta', 'gamma')
});
});
}
Both subscribers receive an independent copy of every message. Closing one subscriber does not affect the other, and each has its own backpressure budget.
Publishing
publish — pipe a Rill
publish is a Pipe<A, Never> that sends every element of a source stream to
all active subscribers and closes the topic when the stream completes.
someRill.through(topic.publish).compile.drain
publish1 — single-value publish
publish1 publishes one value imperatively from IO-based producers. It returns
Left(TopicClosed) if the topic is already closed.
// publish1 sends a single value imperatively — useful when the source of
// events is IO-based rather than a Rill.
IO<Unit> topicPublish1() {
return Topic.create<int>().flatMap((Topic<int> topic) {
final subscriber = topic.subscribe(16).take(3).compile.toIList;
final producer = IList.range(1, 4).traverseIO_((int n) {
return topic
.publish1(n)
.flatMap(
(Either<TopicClosed, Unit> result) => result.fold(
(TopicClosed _) => IO.print('topic closed, dropping $n'),
(Unit _) => IO.unit,
),
);
});
return IO
.both<IList<int>, Unit>(subscriber, producer.productR(() => topic.close.voided()))
.voided();
});
}
Observing subscriber count
topic.subscribers is a Rill<Int> that emits the current number of active
subscribers each time it changes. It is backed by a SignallingRef, so it only
emits on changes (like Signal.discrete).
// Log a message whenever the subscriber count changes.
topic.subscribers
.evalMap((int n) => IO.print('subscribers: $n'))
.compile
.drain
Real-world example: sensor fan-out
// A simple event bus: a temperature sensor broadcasts readings to an
// independent logger and alerter running in separate fibers.
IO<(IList<String>, IList<String>)> temperatureMonitor() {
final readings = ilist([18.5, 19.0, 22.3, 31.7, 29.1]);
final count = readings.length;
return Topic.create<double>().flatMap((Topic<double> topic) {
// Logger: formats every reading as a string.
final logger = topic.subscribe(32).take(count).map((double t) => 'temp: $t°C').compile.toIList;
// Alerter: only emits when temperature exceeds the threshold.
final alerter =
topic
.subscribe(32)
.take(count)
.filter((double t) => t > 25.0)
.map((double t) => 'ALERT: high temp $t°C')
.compile
.toIList;
// Publisher: streams the sensor readings, closes topic when done.
final publisher = Rill.emits(readings.toList()).through(topic.publish).compile.drain;
// Run all three concurrently and collect both subscriber outputs.
return IO.both<IList<String>, IList<String>>(logger, alerter).productL<Unit>(() => publisher);
// logger => IList('temp: 18.5°C', 'temp: 19.0°C', ...)
// alerter => IList('ALERT: high temp 31.7°C', 'ALERT: high temp 29.1°C')
});
}
Three fibers run concurrently from a single IO.both call:
- The publisher streams sensor readings through
topic.publishand closes the topic when done. - The logger subscribes with a generous buffer and records every reading.
- The alerter subscribes independently, filters for high-temperature events, and emits alert strings.
Each subscriber's buffer is separate, so a slow alerter does not block the
logger. Closing the topic (which publish does automatically on stream
completion) terminates both subscriber streams, so compile.toIList returns
without hanging.