Cyclic Barrier
CyclicBarrier is a reusable synchronisation barrier that blocks a fixed
number of fibers until they have all called await(). Once the last fiber
arrives the barrier releases every waiter simultaneously and then
resets itself — the same barrier can be used for the next round without
any re-initialisation.
CyclicBarrier models the same primitive as java.util.concurrent.CyclicBarrier,
but every operation is an IO effect that integrates naturally with fiber
cancellation, resource safety, and the rest of the ribs_effect ecosystem.
Core operations
| Method | Returns | Description |
|---|---|---|
CyclicBarrier.withCapacity(n) | IO<CyclicBarrier> | Allocate a new barrier that releases after n concurrent waiters |
await() | IO<Unit> | Block until n fibers are waiting; returns immediately for a barrier of capacity 1 |
CyclicBarrier.withCapacity throws ArgumentError if n < 1. Fibers that
cancel while waiting have their slot returned to the barrier so the remaining
fibers are not permanently stuck. For a one-shot barrier see CountDownLatch.
Basic usage
Create a barrier with capacity n, then start n worker fibers that each
call await(). No fiber is released until all n have arrived.
/// A barrier of capacity 2 requires 2 concurrent [await] calls before any
/// fiber is released.
IO<Unit> cyclicBarrierBasic() => CyclicBarrier.withCapacity(2).flatMap((barrier) {
IO<Unit> worker(int id) => IO
.sleep((id * 30).milliseconds)
.productR(() => IO.print('worker $id: reached barrier'))
.productR(() => barrier.await())
.productR(() => IO.print('worker $id: released'));
return ilist([worker(1), worker(2)]).parSequence_();
});
Both fibers suspend inside await() and are unblocked simultaneously the
moment the second one arrives.
Reuse across multiple rounds
After each full rendezvous the barrier resets automatically. The same
CyclicBarrier can be awaited again in the next iteration — no
re-creation needed.
/// Unlike [CountDownLatch], a [CyclicBarrier] resets after each cycle.
/// Here three fibers rendezvous twice — once per round.
IO<Unit> cyclicBarrierReuse() => CyclicBarrier.withCapacity(3).flatMap((barrier) {
IO<Unit> worker(int id) => barrier
.await()
.productR(() => IO.print('round 1: worker $id released'))
.productR(() => barrier.await())
.productR(() => IO.print('round 2: worker $id released'));
return ilist([worker(1), worker(2), worker(3)]).parSequence_();
});
All three fibers go through two rounds. The barrier resets between round 1 and round 2 with no additional setup.
Cancellation safety
A fiber waiting at the barrier can be canceled at any time. The barrier restores its slot so the remaining capacity is unchanged and other fibers are not left waiting forever.
/// A fiber waiting at the barrier can be canceled safely. The barrier count
/// is restored so the remaining fibers are not permanently stuck.
IO<Unit> cyclicBarrierCancel() => CyclicBarrier.withCapacity(2).flatMap((barrier) {
// This fiber will time out before the second fiber arrives.
final impatient = barrier.await().timeoutTo(50.milliseconds, IO.unit);
// After the impatient fiber cancels, the barrier is back to capacity 2,
// so we need a second fiber to arrive alongside the patient one.
final patient = IO.sleep(100.milliseconds).productR(() => barrier.await());
final secondArrival = IO
.sleep(150.milliseconds)
.productR(() => barrier.await())
.productR(() => IO.print('both patient fibers released'));
return ilist([impatient, patient, secondArrival]).parSequence_();
});
Comparison with CountDownLatch
CyclicBarrier | CountDownLatch | |
|---|---|---|
| Reusable | Yes — resets after every cycle | No — one-shot |
| How it works | All n waiters arrive together | n release() calls counted down |
| Producer/consumer | Symmetric — all fibers both produce and consume | Asymmetric — releasers and waiters are distinct |
| Use case | Iterative phases, pipeline stages | Wait-for-startup, fan-out coordination |
Real-world example: parallel pipeline stages
A common pattern in data-processing pipelines is to have multiple stages (parse, transform, persist) run in parallel but advance in lock-step — no stage should start chunk N+1 before every stage has finished chunk N.
CyclicBarrier is the natural tool: each stage calls await() after
finishing a chunk, and the barrier ensures all three stages commit the same
chunk before any of them moves on.
/// Real-world example: parallel pipeline stages.
///
/// Three processing stages each complete a chunk of work, then wait at a
/// shared barrier before moving to the next chunk. This guarantees that all
/// stages stay in lock-step — no stage races ahead while another is still
/// processing the previous chunk.
IO<Unit> pipelineStages() => CyclicBarrier.withCapacity(3).flatMap((barrier) {
IO<Unit> stage(String name, Duration workTime) {
IO<Unit> processChunk(int chunk) => IO
.sleep(workTime)
.productR(() => IO.print('[$name] chunk $chunk done — waiting'))
.productR(() => barrier.await())
.productR(() => IO.print('[$name] chunk $chunk committed'));
return processChunk(1).productR(() => processChunk(2)).productR(() => processChunk(3));
}
return ilist([
stage('parse', 50.milliseconds),
stage('transform', 80.milliseconds),
stage('persist', 60.milliseconds),
]).parSequence_();
});
Because the three stage fibers run concurrently via parSequence_, each
chunk takes only as long as the slowest stage. The barrier guarantees
consistency without any shared mutable state or polling.