Rill I/O
ribs_rill_io is the I/O layer for ribs_rill. It provides purely functional,
resource-safe abstractions for reading and writing files and network sockets,
all expressed as Rill streams and IO effects.
Two top-level namespaces cover the main use cases:
| Namespace | Description |
|---|---|
Files | Stream file contents, write streams to disk, manage paths |
Network | Bind TCP/UDP sockets, accept client connections, connect to servers |
Files
Path
Path is a lightweight extension type over String that wraps the path
package for cross-platform path manipulation.
final log = Path.current / 'logs' / 'app.log'; // relative → absolute
final name = log.fileName; // Path('app.log')
final ext = log.extension; // '.log'
The / operator joins segments; + joins two Path values. Both normalise
the result automatically.
Reading
The three high-level read helpers cover the most common cases:
| Method | Returns | Description |
|---|---|---|
Files.readAll(path) | Rill<int> | Stream raw bytes |
Files.readUtf8(path) | Rill<String> | Decode bytes as UTF-8 |
Files.readUtf8Lines(path) | Rill<String> | Decode and split by line |
All three are lazy — no file is opened until the Rill is compiled into an
IO and run. The file is closed automatically when the stream ends, errors, or
is canceled.
/// Print each line of a file with a 1-based line number prefix.
IO<Unit> printNumberedLines(Path path) =>
Files.readUtf8Lines(
path,
).zipWithIndex().evalTap((t) => IO.print('${t.$2 + 1}: ${t.$1}')).compile.drain;
/// Count lines in a file.
IO<int> countLines(Path path) => Files.readUtf8Lines(path).compile.count;
For partial reads, Files.readRange(path, start: s, end: e) streams a
byte range, and Files.tail(path) emulates tail -f — it streams new bytes
as they are appended.
Writing
Write helpers mirror the read API and return Pipe<I, Never> — apply them with
.through(...) to pipe a Rill directly into a file.
| Method | Returns | Description |
|---|---|---|
Files.writeAll(path) | Pipe<int, Never> | Write raw bytes |
Files.writeUtf8(path) | Pipe<String, Never> | Encode strings as UTF-8 |
Files.writeUtf8Lines(path) | Pipe<String, Never> | Write strings separated by the platform line separator |
/// Copy a file byte-for-byte.
IO<Unit> copyFile(Path src, Path dest) =>
Files.readAll(src).through(Files.writeAll(dest)).compile.drain;
/// Write a list of strings to a file, one per line.
IO<Unit> writeLines(List<String> lines, Path dest) =>
Rill.emits(lines).through(Files.writeUtf8Lines(dest)).compile.drain;
To append rather than truncate, pass flags: Flags.Append to writeAll or
open a WriteCursor directly via Files.writeCursor(path, Flags.Append).
File management
Files also exposes the usual filesystem operations as IO effects:
| Method | Returns | Description |
|---|---|---|
Files.exists(path) | IO<bool> | Check if a path exists |
Files.list(path) | Rill<Path> | Stream directory entries |
Files.copy(src, dest) | IO<Unit> | Copy a file |
Files.move(src, dest) | IO<Unit> | Move / rename |
Files.delete(path) | IO<Unit> | Delete a file or empty directory |
Files.deleteRecursively(path) | IO<Unit> | Recursive delete |
Files.size(path) | IO<int> | File size in bytes |
Files.tempFile | Resource<Path> | Create a temp file, delete on release |
Files.tempDirectory | Resource<Path> | Create a temp dir, delete on release |
Network
Network provides TCP and UDP socket operations. All sockets are managed as
Resource values — they are closed automatically when the resource scope exits,
regardless of success, error, or cancellation.
TCP client
Network.connect(address) opens a TCP connection and returns a
Resource<Socket>. Inside the resource:
socket.reads—Rill<int>of received bytes, terminates on EOFsocket.writes—Pipe<int, Never>that sends bytes to the serversocket.endOfOutput()— half-close (send TCP FIN while still receiving)
/// Connect to [addr], send [request] bytes, and collect all response bytes
/// until the server closes the connection.
IO<IList<int>> sendAndReceive(SocketAddress<Ipv4Address> addr, List<int> request) =>
Network.connect(addr).use((Socket socket) {
final send = Rill.emits(request).through(socket.writes).compile.drain;
final recv = socket.reads.compile.toIList;
// Run send and receive concurrently; the server may start responding
// before we finish sending.
return IO.both(send, recv).map((t) => t.$2);
});
TCP server
Network.bind(address) returns a Resource<ServerSocket>.
Network.bindAndAccept(address) is a convenience wrapper that turns the
server directly into a Rill<Socket>, one element per accepted connection.
Combine it with parEvalMap to handle connections concurrently:
/// A TCP echo server: every byte received on a connection is written back.
///
/// [bindAndAccept] turns the server socket into a `Rill<Socket>`.
/// [parEvalMap] handles up to [maxConnections] clients concurrently.
IO<Unit> echoServer(SocketAddress<Ipv4Address> addr, {int maxConnections = 100}) =>
Network.bindAndAccept(addr)
.parEvalMap(
maxConnections,
(Socket socket) => socket.reads.through(socket.writes).compile.drain,
)
.compile
.drain;
UDP
Network.bindDatagramSocket(address) returns a Resource<DatagramSocket>.
DatagramSocket.reads streams Datagram values (each carrying a
Chunk<int> payload and a SocketAddress identifying the sender);
DatagramSocket.writes is a Pipe<Datagram, Never> for sending.
Real-world example: echo server with logging
The example below combines file-level features with the network API to build a
small but complete TCP echo server. Each accepted connection is handled
concurrently by a dedicated fiber. An IO.ref counter assigns a unique ID to
each connection, and guarantee ensures the disconnect log line is printed even
if the handler errors or is canceled.
/// A TCP echo server with per-connection logging and a connection counter.
///
/// Demonstrates:
/// - [Network.bindAndAccept] to stream incoming connections as a `Rill`
/// - [IO.ref] for shared mutable state across concurrent fibers
/// - [parEvalMap] for bounded concurrency over the connection stream
/// - [guarantee] to log disconnections on success, error, or cancellation
IO<Unit> loggingEchoServer({int port = 9090, int maxConnections = 100}) => IO.ref(0).flatMap((
Ref<int> counter,
) {
IO<Unit> handle(Socket socket) => counter.modify((int n) => (n + 1, n + 1)).flatMap((int id) {
final tag = '[#$id ${socket.remoteAddress}]';
return IO
.print('$tag connected')
.productR(() => socket.reads.through(socket.writes).compile.drain)
.guarantee(IO.print('$tag disconnected'));
});
return Network.bindAndAccept(
SocketAddress.Wildcard,
).parEvalMap(maxConnections, handle).compile.drain;
});
Start it with:
void main() => loggingEchoServer(port: 9090).unsafeRunAndForget();
The server runs until the process exits. Canceling the outer IO (e.g. via
SIGINT) propagates through the Rill and causes parEvalMap to cancel all
in-flight connection handlers, guaranteeing that disconnect logs and any other
finalizers run cleanly.