Skip to main content

Streaming

Ribs JSON provides an AsyncParser that emits JSON values incrementally as bytes or strings arrive — no need to buffer an entire document before parsing begins. The parser is exposed through JsonTransformer, a StreamTransformerBase that plugs directly into Dart's Stream API. For ribs_rill users, a thin bridge makes it equally at home in a Rill pipeline.

The behaviour is controlled by AsyncParserMode:

ModeDescription
unwrapArrayExpects a top-level JSON array; emits each element as it completes
valueStreamExpects a sequence of top-level JSON values; emits each one in turn
singleValueBuffers until the whole stream is consumed, then emits one Json

Input variants

JsonTransformer has two factory constructors depending on the element type of your source stream:

  • JsonTransformer.bytes(mode) — accepts Stream<List<int>> (raw bytes)
  • JsonTransformer.strings(mode) — accepts Stream<String> (text chunks)

Both return Stream<Json>.


Unwrap Array

The unwrapArray mode expects the streamed JSON to be a top-level array. The parser emits each child element as soon as it is fully received, which means downstream processing can begin before the array closes.

To illustrate, let's start with a basic setup with a couple domain models and JSON codecs defined for each:

Models
final class Event {
final String id;
final String type;
final Repo repo;

const Event(this.id, this.type, this.repo);

static final codec = Codec.product3(
'id'.as(Codec.string),
'type'.as(Codec.string),
'repo'.as(Repo.codec),
Event.new,
(evt) => (evt.id, evt.type, evt.repo),
);
}

final class Repo {
final int id;
final String name;

const Repo(this.id, this.name);

static final codec = Codec.product2(
'id'.as(Codec.integer),
'name'.as(Codec.string),
Repo.new,
(repo) => (repo.id, repo.name),
);
}

Now consider that the incoming JSON — whether from a file, a socket, or an HTTP response — takes this shape:

[
{
"id": "2489651045",
"type": "CreateEvent",
"repo": { "id": 28688495, "name": "petroav/6.828" }
},
{
"id": "2489651051",
"type": "PushEvent",
"repo": { "id": 28671719, "name": "rspt/rspt-theme" }
}
]

JsonTransformer.bytes with unwrapArray streams the file and decodes each Event as it arrives. Decoding failures are surfaced as Either<DecodingFailure, Event> so they can be handled explicitly:

Unwrap Array
// Original stream of bytes
final Stream<List<int>> byteStream = File('path/to/big-array-file.json').openRead();

// Use JsonTransformer to transform the bytes into individual JSON events
final Stream<Json> jsonStream = byteStream.transform(
JsonTransformer.bytes(AsyncParserMode.unwrapArray),
);

// Decode each Json stream element into an event, accounting for failure
final Stream<Either<DecodingFailure, Event>> decodeStream = jsonStream.map(
(json) => Event.codec.decode(json),
);

// One step further...drop any decoding errors
final Stream<Event> eventStream = decodeStream.expand(
(element) => element.fold(
(err) => <Event>[],
(event) => [event],
),
);

Value Stream

When the source produces multiple top-level JSON values rather than a single array, use valueStream. The data can be any sequence of heterogeneous JSON values — objects, arrays, primitives — emitted one after another:

["Name", "Session", "Score", "Completed"]
{"name": "Gilbert", "wins": [["straight", "7♣"], ["one pair", "10♥"]]}
["Gilbert", "2013", 24, true]
{"name": "Deloise", "wins": [["three of a kind", "5♣"]]}
Value Stream
final Socket sock = await Socket.connect('192.168.0.100', 12345);

final Stream<Json> jsonStream = sock
.map((event) => event.toList())
.transform(JsonTransformer.bytes(AsyncParserMode.valueStream));

Single Value

singleValue mode buffers the entire byte stream and emits exactly one Json once the stream closes. It behaves like a streaming version of Json.parse — useful when the source delivers a complete document in chunks but you still want the memory-efficient byte-at-a-time reading that file or socket streams provide:

Single Value
// singleValue buffers the entire stream before emitting exactly one Json value.
// Useful when you receive a complete JSON document in chunks and want a single
// fully-parsed result at the end.
final Stream<Json> singleDocument = File(
'config.json',
).openRead().transform(JsonTransformer.bytes(AsyncParserMode.singleValue));

String input

If the data source already produces UTF-8 text chunks (for example, some HTTP clients decode the body on your behalf), use JsonTransformer.strings to skip the byte-decoding step:

String chunks
// JsonTransformer.strings accepts a Stream<String> instead of Stream<List<int>>.
// This is convenient when the data source already decodes bytes to UTF-8 text
// (e.g. some HTTP clients, or a text file reader).
final Stream<String> stringChunks = Stream.fromIterable([
'[{"id": "1", "type": "PushEvent", "repo": {"id": 1, "name": "a/b"}}, ',
' {"id": "2", "type": "CreateEvent", "repo": {"id": 2, "name": "c/d"}}]',
]);

final Stream<Json> fromStrings = stringChunks.transform(
JsonTransformer.strings(AsyncParserMode.unwrapArray),
);

Streaming with Rill

If you are using ribs_rill and ribs_rill_io, Rill<int> is the natural type for byte streams — produced by Files.readAll for files and by socket.reads for network sockets.

Rather than bridging through Dart's Stream API, AsyncParser can be driven directly inside the Rill pipeline. A mutable parser is created once via IO.delay, and each Chunk<int> is fed to parser.absorb through evalMap, keeping the side effect inside IO. A finalAbsorb step is appended with + to flush any buffered data when the source terminates — this is required for singleValue mode, which holds back the result until EOF.

Rill — file
// Drive AsyncParser directly from a Rill<int> byte stream.
//
// Each Chunk<int> is fed to parser.absorb via IO.delay, keeping side effects
// inside IO. A final flush (finalAbsorb) is appended to handle modes that
// buffer until EOF — notably singleValue.
Rill<Json> parseJsonRill(Rill<int> bytes, AsyncParserMode mode) =>
Rill.pure(AsyncParser(mode: mode)).flatMap((parser) {
final absorbed = bytes
.chunks()
.evalMap((chunk) => IO.delay(() => parser.absorb(Uint8List.fromList(chunk.toDartList()))))
.flatMap(
(result) => result.fold(
(err) => Rill.raiseError<Json>(err),
(jsons) => Rill.emits(jsons.toList()),
),
);

final flushed = Rill.eval(
IO.delay(() => parser.finalAbsorb(Uint8List(0))),
).flatMap(
(result) => result.fold(
(err) => Rill.raiseError<Json>(err),
(jsons) => Rill.emits(jsons.toList()),
),
);

return absorbed + flushed;
});

// Files.readAll streams the file as Rill<int>; parseJsonRill feeds each chunk
// directly into AsyncParser without leaving the Rill ecosystem.
IO<Unit> processEventsFromFile(Path path) =>
parseJsonRill(
Files.readAll(path),
AsyncParserMode.unwrapArray,
).evalMap((json) => IO.fromEither(Event.codec.decode(json))).foreach(storeEvent).compile.drain;

Because parseJsonRill accepts any Rill<int>, the same helper works for a network socket with no changes to the parsing or decoding logic:

Rill — socket
// Network.connect gives a Resource<Socket> whose reads is Rill<int>.
// The same parseJsonRill helper slots in unchanged.
IO<Unit> consumeSocketEvents(SocketAddress addr) => Network.connect(addr).use(
(socket) =>
parseJsonRill(socket.reads, AsyncParserMode.valueStream)
.evalMap((json) => IO.fromEither(Event.codec.decode(json)))
.foreach(storeEvent)
.compile
.drain,
);

IO.fromEither promotes a decoding failure into an IO-level error, terminating the stream. To silently skip undecodable elements instead, replace it with a flatMap that returns Rill.empty() on the left branch and Rill.emit(event) on the right.