Skip to main content

Count Down Latch

CountDownLatch is a one-shot synchronisation barrier that lets one or more fibers wait until a fixed number of other fibers have each signalled completion. You create it with a count n; every call to release() decrements that count by one. Any fiber calling await() suspends until the count reaches zero, at which point all waiters are unblocked simultaneously — and the latch stays open permanently for any subsequent await() calls.

info

CountDownLatch models the same primitive as java.util.concurrent.CountDownLatch, but every operation is an IO effect that integrates naturally with fiber cancellation, resource safety, and the rest of the ribs_effect ecosystem.

Core operations

MethodReturnsDescription
CountDownLatch.create(n)IO<CountDownLatch>Allocate a new latch with a count of n
release()IO<Unit>Decrement the count by 1; unblocks all waiters when it reaches 0
await()IO<Unit>Suspend until the count reaches 0; returns immediately if already 0
caution

CountDownLatch.create throws ArgumentError if n < 1. The count can only go down — there is no reset or reuse. For a reusable barrier see CyclicBarrier.

Basic usage

Create a latch with count n, then start n worker fibers alongside a coordinator. Each worker calls release() when it finishes; the coordinator calls await() and unblocks once all workers are done.

/// A latch of 3 requires 3 [release] calls before [await] unblocks.
IO<Unit> countDownLatchBasic() => CountDownLatch.create(3).flatMap((latch) {
// Three worker fibers each do their work, then release the latch.
IO<Unit> worker(int id) => IO
.sleep((id * 50).milliseconds)
.productR(() => IO.print('worker $id: done'))
.productR(() => latch.release());

// The coordinator blocks until all three workers have released.
final coordinator = latch.await().productR(() => IO.print('all workers done — proceeding'));

return ilist([worker(1), worker(2), worker(3), coordinator]).parSequence_();
});

The workers run concurrently via parSequence_. The coordinator fiber suspends inside await() and is woken up the moment the third release() fires.

Multiple waiters

Any number of fibers can call await() at the same time. They all unblock at the same instant when the last release() is called.

/// Multiple fibers can call [await] simultaneously; they are all unblocked
/// at the same instant when the last [release] fires.
IO<Unit> multipleAwaiters() => CountDownLatch.create(1).flatMap((latch) {
IO<Unit> waiter(int id) => latch.await().productR(() => IO.print('waiter $id: unblocked'));

final starter = IO
.sleep(100.milliseconds)
.productR(() => IO.print('releasing'))
.productR(() => latch.release());

return ilist([waiter(1), waiter(2), waiter(3), starter]).parSequence_();
});

Latch stays open after reaching zero

Once the count hits zero the latch is permanently open. Any await() called after that point — including calls that arrive arbitrarily later — returns immediately without suspending.

/// Once the count reaches zero, subsequent [await] calls return immediately —
/// the latch stays open forever.
IO<Unit> awaitAlreadyDone() => CountDownLatch.create(1).flatMap((latch) {
return latch
.release()
.productR(() => latch.await()) // returns immediately
.productR(() => latch.await()) // also returns immediately
.productR(() => IO.print('done'));
});

Real-world example: parallel service initialisation

A common pattern in service-oriented applications is to start several subsystems concurrently and only expose the public API once every subsystem is ready. Doing this with a manual boolean flag and polling is error-prone; CountDownLatch makes it trivial.

Each service fiber performs its startup work independently, then calls release(). The gateway fiber sits on await() and begins accepting traffic the moment the last service signals readiness — with no polling, no shared mutable flags, and no missed signals.

/// Real-world example: parallel service initialisation.
///
/// Three services start up concurrently. An HTTP server must not begin
/// accepting requests until every service has signalled readiness.
/// [CountDownLatch] acts as the synchronisation point.
IO<Unit> parallelServiceInit() => CountDownLatch.create(3).flatMap((ready) {
// Each service performs its startup work, then decrements the latch.
IO<Unit> startService(String name, Duration startupTime, int port) => IO
.sleep(startupTime)
.productR(() => IO.print('[$name] listening on :$port'))
.productR(() => ready.release());

// The HTTP gateway waits for every service to be ready before it
// starts routing traffic.
final gateway = ready.await().productR(
() => IO.print('[gateway] all services ready — accepting requests'),
);

return ilist([
startService('auth', 150.milliseconds, 8081),
startService('catalog', 200.milliseconds, 8082),
startService('billing', 120.milliseconds, 8083),
gateway,
]).parSequence_();
});

Because parSequence_ starts all four fibers concurrently, the total startup time is the maximum of the individual service startup times, not the sum. The gateway never blocks the services, and the services never need to know about each other.