Count Down Latch
CountDownLatch is a one-shot synchronisation barrier that lets one or more
fibers wait until a fixed number of other fibers have each signalled completion.
You create it with a count n; every call to release() decrements that count
by one. Any fiber calling await() suspends until the count reaches zero, at
which point all waiters are unblocked simultaneously — and the latch stays
open permanently for any subsequent await() calls.
CountDownLatch models the same primitive as java.util.concurrent.CountDownLatch,
but every operation is an IO effect that integrates naturally with fiber
cancellation, resource safety, and the rest of the ribs_effect ecosystem.
Core operations
| Method | Returns | Description |
|---|---|---|
CountDownLatch.create(n) | IO<CountDownLatch> | Allocate a new latch with a count of n |
release() | IO<Unit> | Decrement the count by 1; unblocks all waiters when it reaches 0 |
await() | IO<Unit> | Suspend until the count reaches 0; returns immediately if already 0 |
CountDownLatch.create throws ArgumentError if n < 1. The count can only
go down — there is no reset or reuse. For a reusable barrier see CyclicBarrier.
Basic usage
Create a latch with count n, then start n worker fibers alongside a
coordinator. Each worker calls release() when it finishes; the coordinator
calls await() and unblocks once all workers are done.
/// A latch of 3 requires 3 [release] calls before [await] unblocks.
IO<Unit> countDownLatchBasic() => CountDownLatch.create(3).flatMap((latch) {
// Three worker fibers each do their work, then release the latch.
IO<Unit> worker(int id) => IO
.sleep((id * 50).milliseconds)
.productR(() => IO.print('worker $id: done'))
.productR(() => latch.release());
// The coordinator blocks until all three workers have released.
final coordinator = latch.await().productR(() => IO.print('all workers done — proceeding'));
return ilist([worker(1), worker(2), worker(3), coordinator]).parSequence_();
});
The workers run concurrently via parSequence_. The coordinator fiber suspends
inside await() and is woken up the moment the third release() fires.
Multiple waiters
Any number of fibers can call await() at the same time. They all unblock at
the same instant when the last release() is called.
/// Multiple fibers can call [await] simultaneously; they are all unblocked
/// at the same instant when the last [release] fires.
IO<Unit> multipleAwaiters() => CountDownLatch.create(1).flatMap((latch) {
IO<Unit> waiter(int id) => latch.await().productR(() => IO.print('waiter $id: unblocked'));
final starter = IO
.sleep(100.milliseconds)
.productR(() => IO.print('releasing'))
.productR(() => latch.release());
return ilist([waiter(1), waiter(2), waiter(3), starter]).parSequence_();
});
Latch stays open after reaching zero
Once the count hits zero the latch is permanently open. Any await() called
after that point — including calls that arrive arbitrarily later — returns
immediately without suspending.
/// Once the count reaches zero, subsequent [await] calls return immediately —
/// the latch stays open forever.
IO<Unit> awaitAlreadyDone() => CountDownLatch.create(1).flatMap((latch) {
return latch
.release()
.productR(() => latch.await()) // returns immediately
.productR(() => latch.await()) // also returns immediately
.productR(() => IO.print('done'));
});
Real-world example: parallel service initialisation
A common pattern in service-oriented applications is to start several
subsystems concurrently and only expose the public API once every subsystem
is ready. Doing this with a manual boolean flag and polling is error-prone;
CountDownLatch makes it trivial.
Each service fiber performs its startup work independently, then calls
release(). The gateway fiber sits on await() and begins accepting traffic
the moment the last service signals readiness — with no polling, no shared
mutable flags, and no missed signals.
/// Real-world example: parallel service initialisation.
///
/// Three services start up concurrently. An HTTP server must not begin
/// accepting requests until every service has signalled readiness.
/// [CountDownLatch] acts as the synchronisation point.
IO<Unit> parallelServiceInit() => CountDownLatch.create(3).flatMap((ready) {
// Each service performs its startup work, then decrements the latch.
IO<Unit> startService(String name, Duration startupTime, int port) => IO
.sleep(startupTime)
.productR(() => IO.print('[$name] listening on :$port'))
.productR(() => ready.release());
// The HTTP gateway waits for every service to be ready before it
// starts routing traffic.
final gateway = ready.await().productR(
() => IO.print('[gateway] all services ready — accepting requests'),
);
return ilist([
startService('auth', 150.milliseconds, 8081),
startService('catalog', 200.milliseconds, 8082),
startService('billing', 120.milliseconds, 8083),
gateway,
]).parSequence_();
});
Because parSequence_ starts all four fibers concurrently, the total startup
time is the maximum of the individual service startup times, not the sum.
The gateway never blocks the services, and the services never need to know
about each other.