Streaming
Ribs JSON also comes with an AsyncParser
that supports emitting JSON elements
as they arrive in situations like reading bytes from a file or a streaming HTTP
response.
Unwrap Array
The first mode of streaming Ribs supports is unwrapArray
, which expects the
streamed JSON to consist of a top level JSON array. The parser will then emit
events as each child element is completely received.
To illustrate, let's start with a basic setup with a couple domain models and JSON codecs defined for each:
final class Event {
final String id;
final String type;
final Repo repo;
const Event(this.id, this.type, this.repo);
static final codec = Codec.product3(
'id'.as(Codec.string),
'type'.as(Codec.string),
'repo'.as(Repo.codec),
Event.new,
(evt) => (evt.id, evt.type, evt.repo),
);
}
final class Repo {
final int id;
final String name;
const Repo(this.id, this.name);
static final codec = Codec.product2(
'id'.as(Codec.integer),
'name'.as(Codec.string),
Repo.new,
(repo) => (repo.id, repo.name),
);
}
Now consider that the incoming JSON, (whether it be from a file, a socket or an HTTP response), will take on the following structure:
[
{
"id": "2489651045",
"type": "CreateEvent",
"repo": {
"id": 28688495,
"name": "petroav/6.828",
"url": "https://api.github.com/repos/petroav/6.828"
}
},
{
"id": "2489651051",
"type": "PushEvent",
"repo": {
"id": 28671719,
"name": "rspt/rspt-theme",
"url": "https://api.github.com/repos/rspt/rspt-theme"
}
},
//
// Tens, hundreds, thousands of more events....
//
{
"id": "2489651591",
"type": "WatchEvent",
"repo": {
"id": 21289110,
"name": "vinta/awesome-python",
"url": "https://api.github.com/repos/vinta/awesome-python"
}
}
]
We can emit each of these individual Event
s using the JsonTransformer
in the Ribs Json library. Here's a simple example of how it's used:
// Original stream of bytes
final Stream<List<int>> byteStream =
File('path/to/big-array-file.json').openRead();
// Use JsonTransformer to transform the bytes into individual JSON events
final Stream<Json> jsonStream =
byteStream.transform(JsonTransformer.bytes(AsyncParserMode.unwrapArray));
// Decode each Json stream element into an event, accounting for failure
final Stream<Either<DecodingFailure, Event>> decodeStream =
jsonStream.map((json) => Event.codec.decode(json));
// One step further...drop any decoding errors
final Stream<Event> eventStream = decodeStream.expand((element) => element.fold(
(err) => <Event>[],
(event) => [event],
));
Value Stream
In the case the stream contains multiple top-level JSON elements, you'll want
to use AsyncParserMode.valueStream
when creating your JsonTransformer
.
Consider the incoming JSON will look something like this:
["Name", "Session", "Score", "Completed"]
{"name": "Gilbert", "wins": [["straight", "7♣"], ["one pair", "10♥"]]}
["Gilbert", "2013", 24, true]
//
// tens, hundreds, thousands of other JSON elements...
//
{"name": "Deloise", "wins": [["three of a kind", "5♣"]]}
["Deloise", "2012A", 19, true]
We'll creating our transformer in exactly the same way, but change the mode the parser will operate as:
final Socket sock = await Socket.connect('192.168.0.100', 12345);
final Stream<Json> jsonStream = sock
.map((event) => event.toList())
.transform(JsonTransformer.bytes(AsyncParserMode.valueStream));
Single Value
The last mode available expects a single top level JSON element and won't emit
the JSON event until it's entirely received. This effectively turns the
JsonTransformer
into a standard synchronous parser but could still be useful
in some situations.