Chunk
Chunk<O> is the internal unit of data in a Rill. Every value a Rill
emits travels as part of a Chunk — never as a bare element.
Why chunks?
Processing elements one-by-one is expensive. Each function call, each allocation, and each garbage-collection event adds up quickly when a stream processes thousands of values per second.
Chunk addresses this by keeping elements together in a contiguous block:
- Fewer function calls. Combinators like
filterandmapoperate on an entireChunkat a time rather than invoking a closure per element. - Less allocation pressure. A
Chunkof 64 integers is one object on the heap; 64 separate boxed integers are 64 objects. - Cache-friendly access. Elements stored in a compact array are read sequentially, staying in CPU cache across the whole batch.
- Lazy concatenation. Appending two chunks with
+produces a_ConcatChunkwrapper without copying.compact()defers the single-array copy to the point it is actually needed.
Internally, Rill is built on Pull<O, Unit>, which carries Chunk<O>
between operators. When you call map(f) on a Rill<O>, the implementation
is:
Rill<O2> map<O2>(Function1<O, O2> f) => mapChunks((c) => c.map(f));
The closure f is applied to the whole chunk in one go — not once per element
in userspace.
Observing chunks inside a Rill
chunks() converts Rill<O> into Rill<Chunk<O>>, surfacing the raw batches
as stream elements. This is primarily a diagnostic and low-level tool; most
pipelines never need to call it directly.
// chunks() exposes the raw Rill<Chunk<O>>, revealing how elements are batched.
IO<Unit> chunkObserve() {
// emits() packs the whole List into a single chunk
final singleChunk = Rill.emits([1, 2, 3, 4, 5]).chunks().compile.toIList.flatMap(
(IList<Chunk<int>> cs) => IO.print(
'emits → ${cs.length} chunk, sizes: ${cs.map((Chunk<int> c) => c.size)}',
),
); // emits → 1 chunk, sizes: IList(5)
// range() emits in chunks of 64 by default; 200 elements → [64, 64, 72]
final multiChunk = Rill.range(0, 200).chunks().compile.toIList.flatMap(
(IList<Chunk<int>> cs) => IO.print(
'range → ${cs.length} chunks, sizes: ${cs.map((Chunk<int> c) => c.size)}',
),
); // range → 3 chunks, sizes: IList(64, 64, 72)
return IO.both(singleChunk, multiChunk).voided();
}
The chunk boundaries depend entirely on how the Rill was constructed:
Rill.emits(list)packs the whole list into one chunk.Rill.range(start, stop)emits chunks of 64 elements by default (configurable via thechunkSizeparameter).Rill.fromIterator(it)uses a default chunk size of 64.Rill.eval(io)produces a single-element chunk.
Upstream operators preserve chunk boundaries where possible; downstream operators may split or merge them.
Reshaping chunks
Sometimes the natural chunk boundaries do not match the needs of downstream code. A database bulk-insert might require exactly 500 rows per batch; a network protocol might impose a maximum frame size. The reshaping combinators let you re-batch without changing the logical stream of elements.
IO<Unit> chunkReshape() {
// chunkN(n): re-batch into fixed-size chunks, last chunk may be smaller
final fixedSize = Rill.range(0, 25)
.chunkN(10)
.compile
.toIList
.flatMap(
(IList<Chunk<int>> cs) => IO.print(
'chunkN(10) sizes: ${cs.map((Chunk<int> c) => c.size)}',
),
); // IList(10, 10, 5)
// chunkAll(): accumulate every element into one chunk — use with care on large streams
final allInOne = Rill.range(0, 25).chunkAll().compile.toIList.flatMap(
(IList<Chunk<int>> cs) => IO.print(
'chunkAll sizes: ${cs.map((Chunk<int> c) => c.size)}',
),
); // IList(25)
// chunkLimit(n): cap chunk size at n without merging smaller chunks
final capped = Rill.emits([1, 2, 3, 4, 5, 6, 7])
.chunkLimit(3)
.compile
.toIList
.flatMap(
(IList<Chunk<int>> cs) => IO.print(
'chunkLimit(3) sizes: ${cs.map((Chunk<int> c) => c.size)}',
),
); // IList(3, 3, 1)
return fixedSize.productR(() => allInOne).productR(() => capped);
}
| Combinator | Returns | Description |
|---|---|---|
chunkN(n) | Rill<Chunk<O>> | Re-batch into chunks of exactly n (last may be smaller) |
chunkAll() | Rill<Chunk<O>> | Accumulate every element into one chunk |
chunkLimit(n) | Rill<Chunk<O>> | Cap chunk size at n; does not merge smaller chunks |
chunkMin(n) | Rill<Chunk<O>> | Guarantee chunks of at least n elements |
buffer(n) | Rill<O> | Re-batch transparently — elements unchanged, boundaries reset to n |
chunkAll() accumulates the entire stream in memory before emitting. Only use
it when you know the stream is bounded and fits comfortably in RAM.
Bulk transforms with mapChunks
mapChunks(f) applies a function to each Chunk as a whole and emits the
results as a flat stream of elements. It is the lowest-overhead way to
transform a stream: there is no dispatch per element, and the chunk's internal
array is passed directly to f.
// mapChunks transforms each chunk as a whole, returning Rill<O2>.
// This is the same mechanism used internally by filter, map, collect, etc.
IO<Unit> chunkMapChunks() => Rill.range(0, 20)
// Apply filter and map at chunk granularity — no per-element overhead
.mapChunks(
(Chunk<int> c) => c.filter((int n) => n.isEven).map((int n) => n * 10),
)
.compile
.toIList
.flatMap((IList<int> xs) => IO.print('evens x10: $xs'));
mapChunks is how the built-in combinators are implemented:
// These are the actual implementations inside Rill
Rill<O> filter(pred) => mapChunks((c) => c.filter(pred));
Rill<O2> map(f) => mapChunks((c) => c.map(f));
Rill<O2> collect(f) => mapChunks((c) => c.collect(f));
Use mapChunks directly when you want to apply multiple chunk-level operations
in a single pass, or when you need access to the chunk's structural properties
(its size, a splitAt, etc.) before emitting.