IO
IO is one of the most useful types in Ribs because it enables us to control side effects and build structured concurrent programs.
INFO
If you're familiar with the IO type from Cats Effect, then Ribs IO should look very similar. In fact, Ribs IO is an almost direct port from Scala to Dart. The API is designed to stay as close as possible to the original implementation.
Motivation
While Darts Future type has it's uses, it suffers from issues that make it unsuitable for pure functional programming. Consider the following code:
final rng = Future(() => math.Random.secure().nextInt(1000));
await rng.then((x) => rng.then((y) => print('x: $x / y: $y')));If you run this code, you should notice that the value of x and y are always the same. Can you see why this is problematic? Now consider this piece of code where we replace each reference to fut with the expression that fut evaluated to:
// Substitute the definition of fut with it's expression
// x and y are different (probably)
await Future(
() => math.Random.secure().nextInt(1000),
).then(
(x) => Future(() => math.Random.secure().nextInt(1000)).then((y) => print('x: $x / y: $y')),
);When we do the substitution, the meaning of the program changes which leads us to the conclusion that Future is not referentially transparent. That means that it's insufficient for use in pure functions.
Here is where IO steps in. It provides lazy, pure capabilities that provide greater control over execution and dealing with failure. Here's the same program from above, using IO instead of Future.
final rng = IO.delay(() => math.Random.secure().nextInt(1000));
// x and y are different (probably)
await rng.flatMap((x) => rng.flatMap((y) => IO.print('x: $x / y: $y'))).unsafeRunFuture();You'll notice there are some differences in the APIs between Future and IO but for the sake of this example, you can assume that flatMap is equivalent to then and IO.println is equivalent to print. If you squint hard enough, this IO version should look pretty similar to the original implementation where we defined rng using Future. However, this piece of code is pure and referentially transparent because of the way IO is implemented.
Along with this small but important quality, IO provides the following features:
- Asynchronous Execution
- Error Handling
- Resource Safety
- Cancellation
Asynchronous Execution
IO is capable of describing both synchronous and asynchronous effects.
IO.pure
IO.pure accepts a value. This means that there is no laziness or delaying of effects. Any parameter is eagerly evaluated.
IO.delay / IO.defer
IO.delay can be used for synchronous effects that can be evaluated immediately once the IO itself is evaluated (within the context of the IO run loop).
IO.defer accepts a thunk that returns an IO<A>, rather than a plain value. It is the right tool when you need to choose between different IOs based on values that aren't available until the program runs, or when the act of constructing the IO itself might throw.
// IO.delay: synchronous thunk that produces a plain value A
final delayEx = IO.delay(() => math.Random.secure().nextInt(100));
// IO.defer: thunk that produces an IO<A> — use when choosing between IOs
// at runtime, or when IO construction itself could throw
IO<int> coinFlip() => IO.defer(() {
final heads = math.Random.secure().nextBool();
return heads ? IO.pure(1) : IO.raiseError('tails!');
});IO.defer is also what makes recursive IO programs stack-safe — wrapping the recursive call in defer turns it into a trampolined loop rather than a call-stack chain.
IO.async
IO.async and IO.async_ are used to describe asynchronous effects that require a callback to be invoked to indicate completion and resume execution. Using async, we can write a function that will convert a lazy Future into an IO.
IO<A> futureToIO<A>(Function0<Future<A>> fut) {
return IO.async_<A>((cb) {
fut().then(
(a) => cb(Right(a)),
onError: (Object err, StackTrace st) => cb(Left(err)),
);
});
}INFO
IO already has this conversion for Future included but the example illustrates one case where IO.async_ is useful.
The only difference between IO.async and IO.async_ is that with IO.async you can include a cancellation finalizer. Since Future doesn't have a mechanism for cancellation, we can safely use IO.async_.
TIP
To see an example of using IO.async, check out the implementation of IO.fromCancelableOperation.
Combinators
Once you have an IO, combinators let you transform it and sequence it with other IO values.
// map: transform the successful value
final doubled = IO.pure(21).map((n) => n * 2); // IO(42)
// flatMap: sequence two IOs, passing the result of the first to the second
final chained = IO.pure(10).flatMap((n) => IO.pure(n + 5)); // IO(15)
// flatTap: run a side-effect using the value, then pass the value through
final logged = IO.pure(42).flatTap((n) => IO.print('computed: $n')); // IO(42)
// productR: run two IOs in sequence, keep only the second result
final init = IO.print('setup').productR(IO.pure(42)); // IO(42)
// attempt: pull a potential error into an Either instead of raising it
final safe = IO.raiseError<int>('oops').attempt(); // IO<Either<Object, int>>
// redeem: recover from an error or transform the success, all in one step
final recovered = IO.raiseError<int>('oops').redeem((_) => -1, (n) => n * 2); // IO(-1)TIP
These examples only scratch the surface of the API available for manipulating and controlling IO.
Error Handling
One of the first recommendations on the Dart Error Handling page demonstrates using Exceptions paired with try/catch/finally to manage errors in your programs. But it's already been established that throwing exceptions is a side-effect. This rules out throwing them in our pure FP programs.
That begs the question on how we create and handle errors using IO.
// composable handler using handleError
final ioA = IO.delay(() => 90 / 0).handleError((ex) => 0);
final ioB = IO.delay(() => 90 / 0).handleErrorWith((ex) => IO.pure(double.infinity));
IO<double> safeDiv(int a, int b) => IO.defer(() {
if (b != 0) {
return IO.pure(a / b);
} else {
return IO.raiseError('cannot divide by 0!');
}
});Concurrency
Two or more IO programs can run concurrently using IO.both and IO.race.
IO.both
IO.both runs two IOs at the same time and returns a tuple of both results. If either IO fails or is canceled, the other is automatically canceled too.
final fetchUser = IO.pure('Alice');
final fetchProfile = IO.pure(99);
// Both IOs run concurrently; if either fails the other is canceled
final program = IO
.both(fetchUser, fetchProfile)
.flatMap((t) => IO.print('user: ${t.$1}, score: ${t.$2}'));IO.race
IO.race runs two IOs concurrently and returns the result of whichever finishes first as Either<A, B>. The slower IO is automatically canceled.
final fast = IO.sleep(100.milliseconds).productR(IO.pure('fast result'));
final slow = IO.sleep(500.milliseconds).productR(IO.pure('slow result'));
// The winner is returned as Either<A, B>; the loser is automatically canceled
final race = IO
.race(fast, slow)
.map((winner) => winner.fold((a) => 'A won: $a', (b) => 'B won: $b'));race is the natural building block for timeout logic: race your computation against IO.sleep(duration).productR(() => IO.pure(fallback)).
TIP
For running an arbitrary collection of IOs concurrently see Traversing Collections with IO below.
Timing
IO.sleep
IO.sleep(duration) suspends the current fiber for the given duration. It is an asynchronous boundary, which means cancellation is checked when it resumes.
timeout and timeoutTo
io.timeout(duration) races the IO against a timer; if the timer wins a TimeoutException is raised. io.timeoutTo(duration, fallback) returns the fallback IO instead.
timed
io.timed() measures wall-clock time around the IO and returns a IO<(Duration, A)> pair.
// sleep: deliberate async delay
final delayed = IO.sleep(200.milliseconds).productR(IO.pure(42));
// timed: measure how long an IO takes
final measured = IO.pure(42).timed(); // IO<(Duration, int)>
// timeout: raise TimeoutException if the IO doesn't finish in time
final withTimeout = IO.sleep(5.seconds).timeout(1.seconds);
// timeoutTo: return a fallback IO instead of raising
final withFallback = IO.sleep(5.seconds).productR(IO.pure(42)).timeoutTo(1.seconds, IO.pure(-1));Repetition
IO provides several combinators for running an action more than once.
// replicate: run sequentially n times, collect results
final rolls = IO.delay(() => math.Random.secure().nextInt(6) + 1).replicate(3);
// IO<IList<int>>
// replicate_: run n times, discard results
final ticks = IO.print('tick').replicate_(5);
// iterateUntil: keep running until the predicate is satisfied
final poll = IO.delay(() => math.Random.secure().nextInt(100)).iterateUntil((n) => n > 90);
// IO<int> — repeats until a value > 90 is produced| Combinator | Returns | Description |
|---|---|---|
replicate(n) | IO<IList<A>> | Run sequentially n times, collect results |
replicate_(n) | IO<Unit> | Run sequentially n times, discard results |
iterateWhile(p) | IO<A> | Repeat while predicate on the result holds |
iterateUntil(p) | IO<A> | Repeat until predicate on the result holds |
foreverM() | IO<Nothing> | Repeat until error or cancellation |
Traversing Collections with IO
When you have a collection of values and an IO-valued function, the traversal extensions on IList let you apply it to every element and collect the results into a single IO.
// traverseIO: apply an IO-valued function to each element, sequentially.
// Short-circuits on the first error — later elements are not evaluated.
final validated = ilist([2, 4, 6]).traverseIO(
(n) => n.isEven ? IO.pure(n * 10) : IO.raiseError<int>('odd: $n'),
); // IO<IList<int>>([20, 40, 60])
// traverseIO_: same, but discard the results
final logged = ilist(['a', 'b', 'c']).traverseIO_((s) => IO.print('processing: $s'));
// sequence: collapse IList<IO<A>> into IO<IList<A>>
final actions = ilist([IO.pure(1), IO.pure(2), IO.pure(3)]);
final sequenced = actions.sequence(); // IO<IList<int>>
// parTraverseIO / parSequence: same ideas, but run all IOs concurrently
final parallel = ilist([1, 2, 3]).parTraverseIO((n) => IO.pure(n * 10));The key choice is sequential vs concurrent:
| Sequential | Concurrent | |
|---|---|---|
| with function | traverseIO(f) | parTraverseIO(f) |
| discard results | traverseIO_(f) | parTraverseIO_(f) |
IList<IO<A>> | sequence() | parSequence() |
Sequential traversal (traverseIO) short-circuits on the first error — elements after the failure are never evaluated. Concurrent traversal (parTraverseIO) runs everything concurrently and cancels remaining fibers as soon as any one fails.
Safe Resource Handling
Let's begin with a fairly typical resource pattern used in Dart program that want's opens a file, writes some data and then wants to make sure the file resource is closed:
final sink = File('path/to/file').openWrite();
try {
// use sink...
} catch (e) {
// catch error
} finally {
sink.close();
}Now let's write an equivalent program using IO:
final sink = IO.delay(() => File('path/to/file')).map((f) => f.openWrite());
// bracket *ensures* that the sink is closed
final program = sink.bracket(
(sink) => IO.exec(() => sink.writeAll(['Hello', 'World'])),
(sink) => IO.exec(() => sink.close()),
);This version using IO has all the resource safety guarentees of the try/catch version but doesn't use Exceptions to avoid side-effects.
TIP
While IO.bracket is a great starting point for resource safety, you should check out Resource for a more robust solution.
Conversions
IO comes with a few helper functions to convert common FP types into an IO.
IO.fromOption(const Some(42), () => Exception('raiseError: none'));
IO.fromEither(Either.right(42));
IO.fromFuture(IO.delay(() => Future(() => 42)));
IO.fromCancelableOperation(
IO.delay(
() => CancelableOperation.fromFuture(
Future(() => 32),
onCancel: () => print('canceled!'),
),
),
);IO.fromOption: Takes an
Optionand will either return a pure synchronousIOin the case ofSomeor raise an error in the case ofNoneIO.fromEither: Returns a pure
IOif theEitherisRightor ifLeft, will raise an error using the value of theLeft.IO.fromFuture: Since
Futureis eagerly evaluated and memoized,fromFuturetakes a parameter of typeIO<Future>to control the laziness and ensure referential transparency.
WARNING
Simply using IO doesn't magically make the Future parameter referentially transparent. You must still take care on controlling the evaluation of the Future.
final fut = Future(() => print('bad'));
// Too late! Future is already running!
final ioBad = IO.fromFuture(IO.pure(fut));
// IO.pure parameter is not lazy so it's evaluated immediately!
final ioAlsoBad = IO.fromFuture(IO.pure(Future(() => print('also bad'))));
// Here we preserve laziness so that ioGood is referentially transparent
final ioGood = IO.fromFuture(IO.delay(() => Future(() => print('good'))));- IO.fromCancelableOperation: This behaves in the same way as
IO.fromFuturebut is able to take advantage of some of the advanced features ofCancelableOperation.
Cancellation
IO also allows you to build cancelable operations.
int count = 0;
// Our IO program
final io = IO
.pure(42)
.delayBy(10.seconds)
.onCancel(IO.exec(() => count += 1))
.onCancel(IO.exec(() => count += 2))
.onCancel(IO.exec(() => count += 3));
// .start() kicks off the IO execution and gives us a handle to that
// execution in the form of an IOFiber
final fiber = await io.start().unsafeRunFuture();
// We immediately cancel the IO
fiber.cancel().unsafeRunAndForget();
// .join() will wait for the fiber to finish
// In this case, that's immediate since we've canceled the IO above
final outcome = await fiber.join().unsafeRunFuture();
// Show the Outcome of the IO as well as confirmation that our `onCancel`
// handlers have been called since the IO was canceled
print('Outcome: $outcome | count: $count'); // Outcome: Canceled | count: 6This is obviously a contrived example but exhibits that you have a great deal of power controlling the execution of an IO.
Also note that an IO can only be checked for cancellation at it's asynchronous boundaries. Types of asynchronous boundaries include:
Outcome
Outcome<A> is the type returned by unsafeRunFutureOutcome and IOFiber.join. It has exactly three cases:
| Case | Meaning |
|---|---|
Succeeded(value) | The IO completed normally |
Errored(error, stackTrace) | The IO raised an unhandled error |
Canceled() | The IO was canceled before finishing |
Use fold to pattern-match all three:
final outcome = await IO.pure(42).unsafeRunFutureOutcome();
// fold over the three possible cases: canceled, errored, succeeded
outcome.fold(
() => print('IO was canceled'),
(err, _) => print('IO failed: $err'),
(value) => print('IO succeeded: $value'), // IO succeeded: 42
);'Unsafe' Operations
There are a few functions on IO prefixed with the word unsafe. These are what you should be calling at the 'edge(s)' of your program. In a completely pure program, you should only call an 'unsafe' function once, in the main method after you've built and described your program using IO.
The reason these functions include the 'unsafe' keyword isn't because your computer will explode when they're called. They're unsafe because they are not pure functions and will interpret your IO and perform side effects. 'Unsafe' is included because you should always take care before deciding to call these functions.
unsafeRunAsync: As the name indicates, this will evaluate the
IOasynchronously, and the provided callback will be executed when it has finished;unsafeRunAndForget: Same as
unsafeRunAsyncbut no callback is provided.unsafeRunFuture: Evaluates the
IOand returns aFuturethat will complete with the value of theIOor any error that is encountered during the evaluation.unsafeRunFutureOutcome: Returns a
Futurethat will complete with theOutcomeof theIO, being one of 3 possible states:- The value itself (successful)
- The error encountered (errored)
- Marker indicating the
IOwas canceled before completing (canceled)