Streaming
A common need when working with binary data is encoding domain objects into a byte stream and decoding them back out — for example when reading from a file or communicating over a network socket.
Ribs binary provides adapters for both Dart's built-in Stream API and the
Rill streaming library. Both approaches use the same Codec definitions, so
the choice between them is purely about which streaming model you are already
using.
Let's assume we've got a simple domain model with its binary Codec already
defined:
final class Event {
final DateTime timestamp;
final int id;
final String message;
const Event(this.timestamp, this.id, this.message);
static final codec = Codec.product3(
Codec.int64.xmap(
(i) => DateTime.fromMillisecondsSinceEpoch(i),
(date) => date.millisecondsSinceEpoch,
),
Codec.uint24,
Codec.utf8_32,
Event.new,
(evt) => (evt.timestamp, evt.id, evt.message),
);
}
With Rill
If you are using ribs_rill and ribs_rill_io, the RillEncoder and
RillDecoder types from package:ribs_rill/binary.dart plug directly into the
Rill streaming pipeline as Pipes.
RillEncoder
RillEncoder.many(codec) produces an encoder that repeatedly consumes one
element at a time from the source Rill<A> and encodes each to BitVector.
.toPipeByte flattens the bits into individual bytes so the result composes
directly with Socket.writes:
IO<Unit> sendEvents(Rill<Event> events, SocketAddress addr) => Network.connect(addr).use(
(socket) =>
events
// Encode each Event to bytes using the codec
.through(RillEncoder.many(Event.codec).toPipeByte)
// Write the byte stream to the socket
.through(socket.writes)
.compile
.drain,
);
RillDecoder
RillDecoder.many(codec) is the streaming counterpart: it accumulates raw bytes
from Socket.reads, decodes one A at a time, and emits each value
downstream. .toPipeByte accepts a Rill<int> so it connects directly to
socket.reads without any manual conversion:
// Decode incoming bytes from a single client connection
IO<Unit> handleClient(Socket socket) =>
socket.reads
// Reassemble framed Events from the raw byte stream
.through(RillDecoder.many(Event.codec).toPipeByte)
// Process each decoded Event
.foreach(processEvent)
.compile
.drain;
// Accept clients and handle each one sequentially
IO<Unit> runServer(SocketAddress addr) =>
Network.bindAndAccept(addr).evalMap(handleClient).compile.drain;
Both RillEncoder and RillDecoder handle framing automatically: the decoder
buffers partial chunks across BitVector boundaries, so there is no need to
worry about TCP segmentation splitting a message mid-frame.
evalMap in runServer handles one client at a time. For concurrent clients use
parEvalMap instead, supplying a bound on the number of simultaneous connections.
once — encoding and decoding a header
RillEncoder.many and RillDecoder.many keep consuming until the stream ends.
once reads or writes exactly one value, which is useful for a fixed-size
protocol header that precedes the variable-length body.
Encoding a header with RillEncoder.once
Encode the header with RillEncoder.once, then concatenate the body stream
encoded with RillEncoder.many using the + operator:
IO<Unit> sendEventsWithHeader(
Header header,
Rill<Event> events,
SocketAddress addr,
) => Network.connect(addr).use(
(socket) =>
// RillEncoder.once sends the header exactly once, then the events follow
(Rill.emit(header).through(RillEncoder.once(Header.codec).toPipeByte) +
events.through(RillEncoder.many(Event.codec).toPipeByte))
.through(socket.writes)
.compile
.drain,
);
Decoding a header with RillDecoder.once
On the receiving side, RillDecoder.once reads the header and flatMap hands
the remaining bytes to the next decoder. The composed decoder is a plain
Pipe<int, Event> that hides all the framing logic:
// RillDecoder.once reads the header, flatMap hands the remainder to many(Event.codec)
final headerThenEvents = RillDecoder.once(
Header.codec,
).flatMap((_) => RillDecoder.many(Event.codec));
IO<Unit> handleClientWithHeader(Socket socket) =>
socket.reads.through(headerThenEvents.toPipeByte).foreach(processEvent).compile.drain;
flatMap can also branch on the decoded header value — for example to select a
different body Codec based on a protocol version field.
With Dart Streams
StreamEncoder
StreamEncoder is a StreamTransformer<A, BitVector>. Pipe a Stream<A>
through it and each element is encoded to a BitVector using the provided
Codec. You then convert each BitVector to a Uint8List and hand it off to
whatever sink you need:
Stream<Event> events() => throw UnimplementedError('TODO');
final socket = await Socket.connect('localhost', 12345);
final Future<void> eventWriter = events()
// Encodes each Event to BitVector
.transform(StreamEncoder(Event.codec))
// Convert BitVector to Uint8List
.map((bitVector) => bitVector.toByteArray())
// Write each Uint8List to Socket
.forEach((byteList) => socket.add(byteList));
StreamDecoder
StreamDecoder is the reverse: a StreamTransformer<BitVector, A>. It buffers
incoming BitVector chunks and emits a decoded A as soon as enough bits have
arrived. The receiving side maps raw bytes to BitVector first, then applies the
transformer:
void storeEvent(Event evt) => throw UnimplementedError('TODO');
Future<void> handleClient(Socket clientSocket) => clientSocket
// Convert Uint8List to BitVector
.map((bytes) => ByteVector(bytes).bits)
// Convert BitVector to Event
.transform(StreamDecoder(Event.codec))
// Do something with the events
.forEach(storeEvent);
final socket = await ServerSocket.bind('0.0.0.0', 12345);
final events = socket.forEach(handleClient);