Skip to main content

Streaming

A common need when working with binary data is encoding domain objects into a byte stream and decoding them back out — for example when reading from a file or communicating over a network socket.

Ribs binary provides adapters for both Dart's built-in Stream API and the Rill streaming library. Both approaches use the same Codec definitions, so the choice between them is purely about which streaming model you are already using.

Let's assume we've got a simple domain model with its binary Codec already defined:

Domain Model
final class Event {
final DateTime timestamp;
final int id;
final String message;

const Event(this.timestamp, this.id, this.message);

static final codec = Codec.product3(
Codec.int64.xmap(
(i) => DateTime.fromMillisecondsSinceEpoch(i),
(date) => date.millisecondsSinceEpoch,
),
Codec.uint24,
Codec.utf8_32,
Event.new,
(evt) => (evt.timestamp, evt.id, evt.message),
);
}

With Rill

If you are using ribs_rill and ribs_rill_io, the RillEncoder and RillDecoder types from package:ribs_rill/binary.dart plug directly into the Rill streaming pipeline as Pipes.

RillEncoder

RillEncoder.many(codec) produces an encoder that repeatedly consumes one element at a time from the source Rill<A> and encodes each to BitVector. .toPipeByte flattens the bits into individual bytes so the result composes directly with Socket.writes:

RillEncoder — sending events over a socket
IO<Unit> sendEvents(Rill<Event> events, SocketAddress addr) => Network.connect(addr).use(
(socket) =>
events
// Encode each Event to bytes using the codec
.through(RillEncoder.many(Event.codec).toPipeByte)
// Write the byte stream to the socket
.through(socket.writes)
.compile
.drain,
);

RillDecoder

RillDecoder.many(codec) is the streaming counterpart: it accumulates raw bytes from Socket.reads, decodes one A at a time, and emits each value downstream. .toPipeByte accepts a Rill<int> so it connects directly to socket.reads without any manual conversion:

RillDecoder — receiving events from a socket
// Decode incoming bytes from a single client connection
IO<Unit> handleClient(Socket socket) =>
socket.reads
// Reassemble framed Events from the raw byte stream
.through(RillDecoder.many(Event.codec).toPipeByte)
// Process each decoded Event
.foreach(processEvent)
.compile
.drain;

// Accept clients and handle each one sequentially
IO<Unit> runServer(SocketAddress addr) =>
Network.bindAndAccept(addr).evalMap(handleClient).compile.drain;

Both RillEncoder and RillDecoder handle framing automatically: the decoder buffers partial chunks across BitVector boundaries, so there is no need to worry about TCP segmentation splitting a message mid-frame.

note

evalMap in runServer handles one client at a time. For concurrent clients use parEvalMap instead, supplying a bound on the number of simultaneous connections.

once — encoding and decoding a header

RillEncoder.many and RillDecoder.many keep consuming until the stream ends. once reads or writes exactly one value, which is useful for a fixed-size protocol header that precedes the variable-length body.

Encoding a header with RillEncoder.once

Encode the header with RillEncoder.once, then concatenate the body stream encoded with RillEncoder.many using the + operator:

RillEncoder.once — sending a header then events
IO<Unit> sendEventsWithHeader(
Header header,
Rill<Event> events,
SocketAddress addr,
) => Network.connect(addr).use(
(socket) =>
// RillEncoder.once sends the header exactly once, then the events follow
(Rill.emit(header).through(RillEncoder.once(Header.codec).toPipeByte) +
events.through(RillEncoder.many(Event.codec).toPipeByte))
.through(socket.writes)
.compile
.drain,
);

Decoding a header with RillDecoder.once

On the receiving side, RillDecoder.once reads the header and flatMap hands the remaining bytes to the next decoder. The composed decoder is a plain Pipe<int, Event> that hides all the framing logic:

RillDecoder.once — reading a header then events
// RillDecoder.once reads the header, flatMap hands the remainder to many(Event.codec)
final headerThenEvents = RillDecoder.once(
Header.codec,
).flatMap((_) => RillDecoder.many(Event.codec));

IO<Unit> handleClientWithHeader(Socket socket) =>
socket.reads.through(headerThenEvents.toPipeByte).foreach(processEvent).compile.drain;

flatMap can also branch on the decoded header value — for example to select a different body Codec based on a protocol version field.

With Dart Streams

StreamEncoder

StreamEncoder is a StreamTransformer<A, BitVector>. Pipe a Stream<A> through it and each element is encoded to a BitVector using the provided Codec. You then convert each BitVector to a Uint8List and hand it off to whatever sink you need:

StreamEncoder
Stream<Event> events() => throw UnimplementedError('TODO');

final socket = await Socket.connect('localhost', 12345);

final Future<void> eventWriter = events()
// Encodes each Event to BitVector
.transform(StreamEncoder(Event.codec))
// Convert BitVector to Uint8List
.map((bitVector) => bitVector.toByteArray())
// Write each Uint8List to Socket
.forEach((byteList) => socket.add(byteList));

StreamDecoder

StreamDecoder is the reverse: a StreamTransformer<BitVector, A>. It buffers incoming BitVector chunks and emits a decoded A as soon as enough bits have arrived. The receiving side maps raw bytes to BitVector first, then applies the transformer:

StreamDecoder
void storeEvent(Event evt) => throw UnimplementedError('TODO');

Future<void> handleClient(Socket clientSocket) => clientSocket
// Convert Uint8List to BitVector
.map((bytes) => ByteVector(bytes).bits)
// Convert BitVector to Event
.transform(StreamDecoder(Event.codec))
// Do something with the events
.forEach(storeEvent);

final socket = await ServerSocket.bind('0.0.0.0', 12345);
final events = socket.forEach(handleClient);