Semaphore
Semaphore is a purely functional concurrency primitive that controls how
many fibers may execute a section of code simultaneously. It holds an integer
count of available permits; a fiber that wants to enter the guarded section
must first acquire a permit, and returns it when done. Any fiber that tries to
acquire when no permits are available will suspend until one is released.
Semaphore is to IO what a traditional counting semaphore is to threads —
but every operation is an IO effect, so it composes naturally with the rest
of your program and is safe to share across any number of concurrent fibers.
Common shapes
| Permits | Behavior |
|---|---|
Semaphore.permits(1) | Mutex — only one fiber in the critical section at a time |
Semaphore.permits(n) | Rate limiter — at most n fibers running concurrently |
Core operations
| Method | Returns | Description |
|---|---|---|
Semaphore.permits(n) | IO<Semaphore> | Allocate a new semaphore with n permits |
acquire() | IO<Unit> | Acquire 1 permit, suspending if none are available |
acquireN(n) | IO<Unit> | Acquire n permits |
tryAcquire() | IO<bool> | Acquire 1 permit without suspending; false if unavailable |
release() | IO<Unit> | Return 1 permit, unblocking a waiting fiber if any |
releaseN(n) | IO<Unit> | Return n permits |
permit() | Resource<Unit> | Acquire on open, release on close — preferred idiom |
available() | IO<int> | Current number of available permits |
count() | IO<int> | Available permits, or negative if fibers are waiting |
Basic usage
With a Semaphore.permits(2), the first two fibers to call acquire proceed
immediately. The third suspends until one of the first two calls release.
/// A Semaphore(2) allows up to 2 fibers to hold a permit simultaneously.
/// The third `acquire` suspends until one of the first two calls `release`.
IO<Unit> semaphoreBasic() => Semaphore.permits(2).flatMap((sem) {
IO<Unit> task(String name) =>
sem.acquire().productR(() => IO.print('$name: running')).guarantee(sem.release());
return ilist([task('A'), task('B'), task('C')]).parSequence_();
});
Note that guarantee is used to ensure the permit is always released, even if
the body raises an error or is canceled. This is the correct pattern for manual
acquire/release — but there is a better way.
The permit() idiom
Manually pairing acquire with release is error-prone: a thrown exception or
fiber cancellation between the two will leak the permit permanently. permit()
wraps the pair in a Resource, which guarantees the release runs no matter how
the body exits.
/// `permit()` returns a `Resource<Unit>` that acquires on open and releases
/// on close, even if the body raises an error or is canceled.
IO<Unit> semaphorePermit() => Semaphore.permits(1).flatMap(
(sem) => sem.permit().use((_) => IO.print('inside critical section')),
);
/// `surround` is a convenient shorthand when the body doesn't need the permit value.
IO<Unit> semaphoreSurround() => Semaphore.permits(1).flatMap(
(sem) => sem.permit().surround(IO.print('inside critical section')),
);
surround is a convenient shorthand for use((_) => body) — use it when
the body doesn't need the permit value (which is always the case for a
Resource<Unit>).
Prefer permit().surround(body) over manual acquire + release. It is
shorter, safer, and composes cleanly with the rest of the Resource ecosystem.
Real-world example: concurrency limiter
A common pattern in concurrent programs is to fan out work across many fibers while capping the number that run simultaneously — for example, limiting parallel HTTP requests to avoid overloading a server or exhausting a connection pool.
Semaphore makes this trivial: wrap each unit of work in
sem.permit().surround(...) and launch all tasks with parTraverseIO_. The
semaphore does the bookkeeping automatically — surplus fibers queue behind the
gate and are released one-by-one as permits become available.
/// Simulate fetching [count] URLs, but allow at most [maxConcurrent]
/// in-flight at any one time.
IO<Unit> rateLimitedFetches({int count = 10, int maxConcurrent = 3}) =>
Semaphore.permits(maxConcurrent).flatMap((sem) {
IO<Unit> fetch(int id) => sem.permit().surround(
IO
.print('fetch $id: start')
.productR(() => IO.sleep(50.milliseconds))
.productR(() => IO.print('fetch $id: done')),
);
return ilist(List.generate(count, (int i) => i + 1)).parTraverseIO_(fetch);
});
Even though parTraverseIO_ starts all fibers at once, at most
maxConcurrent will be executing their IO.sleep body at any moment. The rest
sit suspended inside permit(), waiting their turn. No polling, no sleep loops,
no manual counter needed.