Skip to main content

Cyclic Barrier

CyclicBarrier is a reusable synchronisation barrier that blocks a fixed number of fibers until they have all called await(). Once the last fiber arrives the barrier releases every waiter simultaneously and then resets itself — the same barrier can be used for the next round without any re-initialisation.

info

CyclicBarrier models the same primitive as java.util.concurrent.CyclicBarrier, but every operation is an IO effect that integrates naturally with fiber cancellation, resource safety, and the rest of the ribs_effect ecosystem.

Core operations

MethodReturnsDescription
CyclicBarrier.withCapacity(n)IO<CyclicBarrier>Allocate a new barrier that releases after n concurrent waiters
await()IO<Unit>Block until n fibers are waiting; returns immediately for a barrier of capacity 1
caution

CyclicBarrier.withCapacity throws ArgumentError if n < 1. Fibers that cancel while waiting have their slot returned to the barrier so the remaining fibers are not permanently stuck. For a one-shot barrier see CountDownLatch.

Basic usage

Create a barrier with capacity n, then start n worker fibers that each call await(). No fiber is released until all n have arrived.

/// A barrier of capacity 2 requires 2 concurrent [await] calls before any
/// fiber is released.
IO<Unit> cyclicBarrierBasic() => CyclicBarrier.withCapacity(2).flatMap((barrier) {
IO<Unit> worker(int id) => IO
.sleep((id * 30).milliseconds)
.productR(() => IO.print('worker $id: reached barrier'))
.productR(() => barrier.await())
.productR(() => IO.print('worker $id: released'));

return ilist([worker(1), worker(2)]).parSequence_();
});

Both fibers suspend inside await() and are unblocked simultaneously the moment the second one arrives.

Reuse across multiple rounds

After each full rendezvous the barrier resets automatically. The same CyclicBarrier can be awaited again in the next iteration — no re-creation needed.

/// Unlike [CountDownLatch], a [CyclicBarrier] resets after each cycle.
/// Here three fibers rendezvous twice — once per round.
IO<Unit> cyclicBarrierReuse() => CyclicBarrier.withCapacity(3).flatMap((barrier) {
IO<Unit> worker(int id) => barrier
.await()
.productR(() => IO.print('round 1: worker $id released'))
.productR(() => barrier.await())
.productR(() => IO.print('round 2: worker $id released'));

return ilist([worker(1), worker(2), worker(3)]).parSequence_();
});

All three fibers go through two rounds. The barrier resets between round 1 and round 2 with no additional setup.

Cancellation safety

A fiber waiting at the barrier can be canceled at any time. The barrier restores its slot so the remaining capacity is unchanged and other fibers are not left waiting forever.

/// A fiber waiting at the barrier can be canceled safely. The barrier count
/// is restored so the remaining fibers are not permanently stuck.
IO<Unit> cyclicBarrierCancel() => CyclicBarrier.withCapacity(2).flatMap((barrier) {
// This fiber will time out before the second fiber arrives.
final impatient = barrier.await().timeoutTo(50.milliseconds, IO.unit);

// After the impatient fiber cancels, the barrier is back to capacity 2,
// so we need a second fiber to arrive alongside the patient one.
final patient = IO.sleep(100.milliseconds).productR(() => barrier.await());

final secondArrival = IO
.sleep(150.milliseconds)
.productR(() => barrier.await())
.productR(() => IO.print('both patient fibers released'));

return ilist([impatient, patient, secondArrival]).parSequence_();
});

Comparison with CountDownLatch

CyclicBarrierCountDownLatch
ReusableYes — resets after every cycleNo — one-shot
How it worksAll n waiters arrive togethern release() calls counted down
Producer/consumerSymmetric — all fibers both produce and consumeAsymmetric — releasers and waiters are distinct
Use caseIterative phases, pipeline stagesWait-for-startup, fan-out coordination

Real-world example: parallel pipeline stages

A common pattern in data-processing pipelines is to have multiple stages (parse, transform, persist) run in parallel but advance in lock-step — no stage should start chunk N+1 before every stage has finished chunk N.

CyclicBarrier is the natural tool: each stage calls await() after finishing a chunk, and the barrier ensures all three stages commit the same chunk before any of them moves on.

/// Real-world example: parallel pipeline stages.
///
/// Three processing stages each complete a chunk of work, then wait at a
/// shared barrier before moving to the next chunk. This guarantees that all
/// stages stay in lock-step — no stage races ahead while another is still
/// processing the previous chunk.
IO<Unit> pipelineStages() => CyclicBarrier.withCapacity(3).flatMap((barrier) {
IO<Unit> stage(String name, Duration workTime) {
IO<Unit> processChunk(int chunk) => IO
.sleep(workTime)
.productR(() => IO.print('[$name] chunk $chunk done — waiting'))
.productR(() => barrier.await())
.productR(() => IO.print('[$name] chunk $chunk committed'));

return processChunk(1).productR(() => processChunk(2)).productR(() => processChunk(3));
}

return ilist([
stage('parse', 50.milliseconds),
stage('transform', 80.milliseconds),
stage('persist', 60.milliseconds),
]).parSequence_();
});

Because the three stage fibers run concurrently via parSequence_, each chunk takes only as long as the slowest stage. The barrier guarantees consistency without any shared mutable state or polling.