Skip to main content

Streaming

It's a common scenario to read and write bytes to and from file, socket or some other Stream. Ribs provides the 2 basic tools to handle the machinery necessary for these situations.

Let's assume we've got a simple type of model with our binary Codec already defined:

Domain Model
final class Event {
final DateTime timestamp;
final int id;
final String message;

const Event(this.timestamp, this.id, this.message);

static final codec = Codec.product3(
Codec.int64.xmap(
(i) => DateTime.fromMillisecondsSinceEpoch(i),
(date) => date.millisecondsSinceEpoch,
),
Codec.uint24,
Codec.utf8_32,
Event.new,
(evt) => (evt.timestamp, evt.id, evt.message),
);
}

StreamEncoder

Now image we generate these events on the fly and need to send them out a socket after converting them to binary. StreamEncoder is the perfect solution:

StreamEncoder
Stream<Event> events() => throw UnimplementedError('TODO');

final socket = await Socket.connect('localhost', 12345);

final Future<void> eventWriter = events()
// Encodes each Event to BitVector
.transform(StreamEncoder(Event.codec))
// Convert BitVector to Uint8List
.map((bitVector) => bitVector.toByteArray())
// Write each Uint8List to Socket
.forEach((byteList) => socket.add(byteList));

We've built a capable yet expressive and readable program with minimal effort!

StreamDecoder

Now we can imagine needing to write a program that will receive events off a socket and do something meaningful with them:

StreamEncoder
void storeEvent(Event evt) => throw UnimplementedError('TODO');

Future<void> handleClient(Socket clientSocket) => clientSocket
// Convert Uint8List to BitVector
.map((bytes) => ByteVector(bytes).bits)
// Convert BitVector to Event
.transform(StreamDecoder(Event.codec))
// Do something with the events
.forEach(storeEvent);

final socket = await ServerSocket.bind('0.0.0.0', 12345);
final events = socket.forEach(handleClient);

Using the Ribs Binary streaming API, it's easy to transform byte streams into your domain objects.