Skip to main content

Rill I/O

ribs_rill_io is the I/O layer for ribs_rill. It provides purely functional, resource-safe abstractions for reading and writing files and network sockets, all expressed as Rill streams and IO effects.

Two top-level namespaces cover the main use cases:

NamespaceDescription
FilesStream file contents, write streams to disk, manage paths
NetworkBind TCP/UDP sockets, accept client connections, connect to servers

Files

Path

Path is a lightweight extension type over String that wraps the path package for cross-platform path manipulation.

final log  = Path.current / 'logs' / 'app.log'; // relative → absolute
final name = log.fileName; // Path('app.log')
final ext = log.extension; // '.log'

The / operator joins segments; + joins two Path values. Both normalise the result automatically.


Reading

The three high-level read helpers cover the most common cases:

MethodReturnsDescription
Files.readAll(path)Rill<int>Stream raw bytes
Files.readUtf8(path)Rill<String>Decode bytes as UTF-8
Files.readUtf8Lines(path)Rill<String>Decode and split by line

All three are lazy — no file is opened until the Rill is compiled into an IO and run. The file is closed automatically when the stream ends, errors, or is canceled.

/// Print each line of a file with a 1-based line number prefix.
IO<Unit> printNumberedLines(Path path) =>
Files.readUtf8Lines(
path,
).zipWithIndex().evalTap((t) => IO.print('${t.$2 + 1}: ${t.$1}')).compile.drain;

/// Count lines in a file.
IO<int> countLines(Path path) => Files.readUtf8Lines(path).compile.count;

For partial reads, Files.readRange(path, start: s, end: e) streams a byte range, and Files.tail(path) emulates tail -f — it streams new bytes as they are appended.

Writing

Write helpers mirror the read API and return Pipe<I, Never> — apply them with .through(...) to pipe a Rill directly into a file.

MethodReturnsDescription
Files.writeAll(path)Pipe<int, Never>Write raw bytes
Files.writeUtf8(path)Pipe<String, Never>Encode strings as UTF-8
Files.writeUtf8Lines(path)Pipe<String, Never>Write strings separated by the platform line separator
/// Copy a file byte-for-byte.
IO<Unit> copyFile(Path src, Path dest) =>
Files.readAll(src).through(Files.writeAll(dest)).compile.drain;

/// Write a list of strings to a file, one per line.
IO<Unit> writeLines(List<String> lines, Path dest) =>
Rill.emits(lines).through(Files.writeUtf8Lines(dest)).compile.drain;

To append rather than truncate, pass flags: Flags.Append to writeAll or open a WriteCursor directly via Files.writeCursor(path, Flags.Append).

File management

Files also exposes the usual filesystem operations as IO effects:

MethodReturnsDescription
Files.exists(path)IO<bool>Check if a path exists
Files.list(path)Rill<Path>Stream directory entries
Files.copy(src, dest)IO<Unit>Copy a file
Files.move(src, dest)IO<Unit>Move / rename
Files.delete(path)IO<Unit>Delete a file or empty directory
Files.deleteRecursively(path)IO<Unit>Recursive delete
Files.size(path)IO<int>File size in bytes
Files.tempFileResource<Path>Create a temp file, delete on release
Files.tempDirectoryResource<Path>Create a temp dir, delete on release

Network

Network provides TCP and UDP socket operations. All sockets are managed as Resource values — they are closed automatically when the resource scope exits, regardless of success, error, or cancellation.

TCP client

Network.connect(address) opens a TCP connection and returns a Resource<Socket>. Inside the resource:

  • socket.readsRill<int> of received bytes, terminates on EOF
  • socket.writesPipe<int, Never> that sends bytes to the server
  • socket.endOfOutput() — half-close (send TCP FIN while still receiving)
/// Connect to [addr], send [request] bytes, and collect all response bytes
/// until the server closes the connection.
IO<IList<int>> sendAndReceive(SocketAddress<Ipv4Address> addr, List<int> request) =>
Network.connect(addr).use((Socket socket) {
final send = Rill.emits(request).through(socket.writes).compile.drain;
final recv = socket.reads.compile.toIList;

// Run send and receive concurrently; the server may start responding
// before we finish sending.
return IO.both(send, recv).map((t) => t.$2);
});

TCP server

Network.bind(address) returns a Resource<ServerSocket>. Network.bindAndAccept(address) is a convenience wrapper that turns the server directly into a Rill<Socket>, one element per accepted connection. Combine it with parEvalMap to handle connections concurrently:

/// A TCP echo server: every byte received on a connection is written back.
///
/// [bindAndAccept] turns the server socket into a `Rill<Socket>`.
/// [parEvalMap] handles up to [maxConnections] clients concurrently.
IO<Unit> echoServer(SocketAddress<Ipv4Address> addr, {int maxConnections = 100}) =>
Network.bindAndAccept(addr)
.parEvalMap(
maxConnections,
(Socket socket) => socket.reads.through(socket.writes).compile.drain,
)
.compile
.drain;

UDP

Network.bindDatagramSocket(address) returns a Resource<DatagramSocket>. DatagramSocket.reads streams Datagram values (each carrying a Chunk<int> payload and a SocketAddress identifying the sender); DatagramSocket.writes is a Pipe<Datagram, Never> for sending.


Real-world example: echo server with logging

The example below combines file-level features with the network API to build a small but complete TCP echo server. Each accepted connection is handled concurrently by a dedicated fiber. An IO.ref counter assigns a unique ID to each connection, and guarantee ensures the disconnect log line is printed even if the handler errors or is canceled.

/// A TCP echo server with per-connection logging and a connection counter.
///
/// Demonstrates:
/// - [Network.bindAndAccept] to stream incoming connections as a `Rill`
/// - [IO.ref] for shared mutable state across concurrent fibers
/// - [parEvalMap] for bounded concurrency over the connection stream
/// - [guarantee] to log disconnections on success, error, or cancellation
IO<Unit> loggingEchoServer({int port = 9090, int maxConnections = 100}) => IO.ref(0).flatMap((
Ref<int> counter,
) {
IO<Unit> handle(Socket socket) => counter.modify((int n) => (n + 1, n + 1)).flatMap((int id) {
final tag = '[#$id ${socket.remoteAddress}]';

return IO
.print('$tag connected')
.productR(() => socket.reads.through(socket.writes).compile.drain)
.guarantee(IO.print('$tag disconnected'));
});

return Network.bindAndAccept(
SocketAddress.Wildcard,
).parEvalMap(maxConnections, handle).compile.drain;
});

Start it with:

void main() => loggingEchoServer(port: 9090).unsafeRunAndForget();

The server runs until the process exits. Canceling the outer IO (e.g. via SIGINT) propagates through the Rill and causes parEvalMap to cancel all in-flight connection handlers, guaranteeing that disconnect logs and any other finalizers run cleanly.