Skip to main content

Streaming

Ribs JSON also comes with an AsyncParser that supports emitting JSON elements as they arrive in situations like reading bytes from a file or a streaming HTTP response.

Unwrap Array

The first mode of streaming Ribs supports is unwrapArray, which expects the streamed JSON to consist of a top level JSON array. The parser will then emit events as each child element is completely received.

To illustrate, let's start with a basic setup with a couple domain models and JSON codecs defined for each:

Models
final class Event {
final String id;
final String type;
final Repo repo;

const Event(this.id, this.type, this.repo);

static final codec = Codec.product3(
'id'.as(Codec.string),
'type'.as(Codec.string),
'repo'.as(Repo.codec),
Event.new,
(evt) => (evt.id, evt.type, evt.repo),
);
}

final class Repo {
final int id;
final String name;

const Repo(this.id, this.name);

static final codec = Codec.product2(
'id'.as(Codec.integer),
'name'.as(Codec.string),
Repo.new,
(repo) => (repo.id, repo.name),
);
}

Now consider that the incoming JSON, (whether it be from a file, a socket or an HTTP response), will take on the following structure:

[
{
"id": "2489651045",
"type": "CreateEvent",
"repo": {
"id": 28688495,
"name": "petroav/6.828",
"url": "https://api.github.com/repos/petroav/6.828"
}
},
{
"id": "2489651051",
"type": "PushEvent",
"repo": {
"id": 28671719,
"name": "rspt/rspt-theme",
"url": "https://api.github.com/repos/rspt/rspt-theme"
}
},
//
// Tens, hundreds, thousands of more events....
//
{
"id": "2489651591",
"type": "WatchEvent",
"repo": {
"id": 21289110,
"name": "vinta/awesome-python",
"url": "https://api.github.com/repos/vinta/awesome-python"
}
}
]

We can emit each of these individual Events using the JsonTransformer in the Ribs Json library. Here's a simple example of how it's used:

Unwrap Array Transformer
// Original stream of bytes
final Stream<List<int>> byteStream =
File('path/to/big-array-file.json').openRead();

// Use JsonTransformer to transform the bytes into individual JSON events
final Stream<Json> jsonStream =
byteStream.transform(JsonTransformer.bytes(AsyncParserMode.unwrapArray));

// Decode each Json stream element into an event, accounting for failure
final Stream<Either<DecodingFailure, Event>> decodeStream =
jsonStream.map((json) => Event.codec.decode(json));

// One step further...drop any decoding errors
final Stream<Event> eventStream = decodeStream.expand((element) => element.fold(
(err) => <Event>[],
(event) => [event],
));

Value Stream

In the case the stream contains multiple top-level JSON elements, you'll want to use AsyncParserMode.valueStream when creating your JsonTransformer.

Consider the incoming JSON will look something like this:

["Name", "Session", "Score", "Completed"]
{"name": "Gilbert", "wins": [["straight", "7♣"], ["one pair", "10♥"]]}
["Gilbert", "2013", 24, true]
//
// tens, hundreds, thousands of other JSON elements...
//
{"name": "Deloise", "wins": [["three of a kind", "5♣"]]}
["Deloise", "2012A", 19, true]

We'll creating our transformer in exactly the same way, but change the mode the parser will operate as:

Value Stream Transformer
final Socket sock = await Socket.connect('192.168.0.100', 12345);

final Stream<Json> jsonStream = sock
.map((event) => event.toList())
.transform(JsonTransformer.bytes(AsyncParserMode.valueStream));

Single Value

The last mode available expects a single top level JSON element and won't emit the JSON event until it's entirely received. This effectively turns the JsonTransformer into a standard synchronous parser but could still be useful in some situations.