Streaming
Ribs JSON provides an AsyncParser that emits JSON values incrementally as
bytes or strings arrive — no need to buffer an entire document before parsing
begins. The parser is exposed through JsonTransformer, a
StreamTransformerBase that plugs directly into Dart's Stream API. For
ribs_rill users, a thin bridge makes it equally at home in a Rill pipeline.
The behaviour is controlled by AsyncParserMode:
| Mode | Description |
|---|---|
unwrapArray | Expects a top-level JSON array; emits each element as it completes |
valueStream | Expects a sequence of top-level JSON values; emits each one in turn |
singleValue | Buffers until the whole stream is consumed, then emits one Json |
Input variants
JsonTransformer has two factory constructors depending on the element type of
your source stream:
JsonTransformer.bytes(mode)— acceptsStream<List<int>>(raw bytes)JsonTransformer.strings(mode)— acceptsStream<String>(text chunks)
Both return Stream<Json>.
Unwrap Array
The unwrapArray mode expects the streamed JSON to be a top-level array. The
parser emits each child element as soon as it is fully received, which means
downstream processing can begin before the array closes.
To illustrate, let's start with a basic setup with a couple domain models and JSON codecs defined for each:
final class Event {
final String id;
final String type;
final Repo repo;
const Event(this.id, this.type, this.repo);
static final codec = Codec.product3(
'id'.as(Codec.string),
'type'.as(Codec.string),
'repo'.as(Repo.codec),
Event.new,
(evt) => (evt.id, evt.type, evt.repo),
);
}
final class Repo {
final int id;
final String name;
const Repo(this.id, this.name);
static final codec = Codec.product2(
'id'.as(Codec.integer),
'name'.as(Codec.string),
Repo.new,
(repo) => (repo.id, repo.name),
);
}
Now consider that the incoming JSON — whether from a file, a socket, or an HTTP response — takes this shape:
[
{
"id": "2489651045",
"type": "CreateEvent",
"repo": { "id": 28688495, "name": "petroav/6.828" }
},
{
"id": "2489651051",
"type": "PushEvent",
"repo": { "id": 28671719, "name": "rspt/rspt-theme" }
}
]
JsonTransformer.bytes with unwrapArray streams the file and decodes each
Event as it arrives. Decoding failures are surfaced as
Either<DecodingFailure, Event> so they can be handled explicitly:
// Original stream of bytes
final Stream<List<int>> byteStream = File('path/to/big-array-file.json').openRead();
// Use JsonTransformer to transform the bytes into individual JSON events
final Stream<Json> jsonStream = byteStream.transform(
JsonTransformer.bytes(AsyncParserMode.unwrapArray),
);
// Decode each Json stream element into an event, accounting for failure
final Stream<Either<DecodingFailure, Event>> decodeStream = jsonStream.map(
(json) => Event.codec.decode(json),
);
// One step further...drop any decoding errors
final Stream<Event> eventStream = decodeStream.expand(
(element) => element.fold(
(err) => <Event>[],
(event) => [event],
),
);
Value Stream
When the source produces multiple top-level JSON values rather than a single
array, use valueStream. The data can be any sequence of heterogeneous JSON
values — objects, arrays, primitives — emitted one after another:
["Name", "Session", "Score", "Completed"]
{"name": "Gilbert", "wins": [["straight", "7♣"], ["one pair", "10♥"]]}
["Gilbert", "2013", 24, true]
{"name": "Deloise", "wins": [["three of a kind", "5♣"]]}
final Socket sock = await Socket.connect('192.168.0.100', 12345);
final Stream<Json> jsonStream = sock
.map((event) => event.toList())
.transform(JsonTransformer.bytes(AsyncParserMode.valueStream));
Single Value
singleValue mode buffers the entire byte stream and emits exactly one Json
once the stream closes. It behaves like a streaming version of Json.parse —
useful when the source delivers a complete document in chunks but you still want
the memory-efficient byte-at-a-time reading that file or socket streams provide:
// singleValue buffers the entire stream before emitting exactly one Json value.
// Useful when you receive a complete JSON document in chunks and want a single
// fully-parsed result at the end.
final Stream<Json> singleDocument = File(
'config.json',
).openRead().transform(JsonTransformer.bytes(AsyncParserMode.singleValue));
String input
If the data source already produces UTF-8 text chunks (for example, some HTTP
clients decode the body on your behalf), use JsonTransformer.strings to skip
the byte-decoding step:
// JsonTransformer.strings accepts a Stream<String> instead of Stream<List<int>>.
// This is convenient when the data source already decodes bytes to UTF-8 text
// (e.g. some HTTP clients, or a text file reader).
final Stream<String> stringChunks = Stream.fromIterable([
'[{"id": "1", "type": "PushEvent", "repo": {"id": 1, "name": "a/b"}}, ',
' {"id": "2", "type": "CreateEvent", "repo": {"id": 2, "name": "c/d"}}]',
]);
final Stream<Json> fromStrings = stringChunks.transform(
JsonTransformer.strings(AsyncParserMode.unwrapArray),
);
Streaming with Rill
If you are using ribs_rill and ribs_rill_io, Rill<int> is the natural
type for byte streams — produced by Files.readAll for files and by
socket.reads for network sockets.
Rather than bridging through Dart's Stream API, AsyncParser can be driven
directly inside the Rill pipeline. A mutable parser is created once via
IO.delay, and each Chunk<int> is fed to parser.absorb through evalMap,
keeping the side effect inside IO. A finalAbsorb step is appended with +
to flush any buffered data when the source terminates — this is required for
singleValue mode, which holds back the result until EOF.
// Drive AsyncParser directly from a Rill<int> byte stream.
//
// Each Chunk<int> is fed to parser.absorb via IO.delay, keeping side effects
// inside IO. A final flush (finalAbsorb) is appended to handle modes that
// buffer until EOF — notably singleValue.
Rill<Json> parseJsonRill(Rill<int> bytes, AsyncParserMode mode) =>
Rill.pure(AsyncParser(mode: mode)).flatMap((parser) {
final absorbed = bytes
.chunks()
.evalMap((chunk) => IO.delay(() => parser.absorb(Uint8List.fromList(chunk.toDartList()))))
.flatMap(
(result) => result.fold(
(err) => Rill.raiseError<Json>(err),
(jsons) => Rill.emits(jsons.toList()),
),
);
final flushed = Rill.eval(
IO.delay(() => parser.finalAbsorb(Uint8List(0))),
).flatMap(
(result) => result.fold(
(err) => Rill.raiseError<Json>(err),
(jsons) => Rill.emits(jsons.toList()),
),
);
return absorbed + flushed;
});
// Files.readAll streams the file as Rill<int>; parseJsonRill feeds each chunk
// directly into AsyncParser without leaving the Rill ecosystem.
IO<Unit> processEventsFromFile(Path path) =>
parseJsonRill(
Files.readAll(path),
AsyncParserMode.unwrapArray,
).evalMap((json) => IO.fromEither(Event.codec.decode(json))).foreach(storeEvent).compile.drain;
Because parseJsonRill accepts any Rill<int>, the same helper works for a
network socket with no changes to the parsing or decoding logic:
// Network.connect gives a Resource<Socket> whose reads is Rill<int>.
// The same parseJsonRill helper slots in unchanged.
IO<Unit> consumeSocketEvents(SocketAddress addr) => Network.connect(addr).use(
(socket) =>
parseJsonRill(socket.reads, AsyncParserMode.valueStream)
.evalMap((json) => IO.fromEither(Event.codec.decode(json)))
.foreach(storeEvent)
.compile
.drain,
);
IO.fromEither promotes a decoding failure into an IO-level error, terminating
the stream. To silently skip undecodable elements instead, replace it with a
flatMap that returns Rill.empty() on the left branch and Rill.emit(event)
on the right.