Skip to main content

Chunk

Chunk<O> is the internal unit of data in a Rill. Every value a Rill emits travels as part of a Chunk — never as a bare element.

Why chunks?

Processing elements one-by-one is expensive. Each function call, each allocation, and each garbage-collection event adds up quickly when a stream processes thousands of values per second.

Chunk addresses this by keeping elements together in a contiguous block:

  • Fewer function calls. Combinators like filter and map operate on an entire Chunk at a time rather than invoking a closure per element.
  • Less allocation pressure. A Chunk of 64 integers is one object on the heap; 64 separate boxed integers are 64 objects.
  • Cache-friendly access. Elements stored in a compact array are read sequentially, staying in CPU cache across the whole batch.
  • Lazy concatenation. Appending two chunks with + produces a _ConcatChunk wrapper without copying. compact() defers the single-array copy to the point it is actually needed.

Internally, Rill is built on Pull<O, Unit>, which carries Chunk<O> between operators. When you call map(f) on a Rill<O>, the implementation is:

Rill<O2> map<O2>(Function1<O, O2> f) => mapChunks((c) => c.map(f));

The closure f is applied to the whole chunk in one go — not once per element in userspace.


Observing chunks inside a Rill

chunks() converts Rill<O> into Rill<Chunk<O>>, surfacing the raw batches as stream elements. This is primarily a diagnostic and low-level tool; most pipelines never need to call it directly.

// chunks() exposes the raw Rill<Chunk<O>>, revealing how elements are batched.

IO<Unit> chunkObserve() {
// emits() packs the whole List into a single chunk
final singleChunk = Rill.emits([1, 2, 3, 4, 5]).chunks().compile.toIList.flatMap(
(IList<Chunk<int>> cs) => IO.print(
'emits → ${cs.length} chunk, sizes: ${cs.map((Chunk<int> c) => c.size)}',
),
); // emits → 1 chunk, sizes: IList(5)

// range() emits in chunks of 64 by default; 200 elements → [64, 64, 72]
final multiChunk = Rill.range(0, 200).chunks().compile.toIList.flatMap(
(IList<Chunk<int>> cs) => IO.print(
'range → ${cs.length} chunks, sizes: ${cs.map((Chunk<int> c) => c.size)}',
),
); // range → 3 chunks, sizes: IList(64, 64, 72)

return IO.both(singleChunk, multiChunk).voided();
}

The chunk boundaries depend entirely on how the Rill was constructed:

  • Rill.emits(list) packs the whole list into one chunk.
  • Rill.range(start, stop) emits chunks of 64 elements by default (configurable via the chunkSize parameter).
  • Rill.fromIterator(it) uses a default chunk size of 64.
  • Rill.eval(io) produces a single-element chunk.

Upstream operators preserve chunk boundaries where possible; downstream operators may split or merge them.


Reshaping chunks

Sometimes the natural chunk boundaries do not match the needs of downstream code. A database bulk-insert might require exactly 500 rows per batch; a network protocol might impose a maximum frame size. The reshaping combinators let you re-batch without changing the logical stream of elements.

IO<Unit> chunkReshape() {
// chunkN(n): re-batch into fixed-size chunks, last chunk may be smaller
final fixedSize = Rill.range(0, 25)
.chunkN(10)
.compile
.toIList
.flatMap(
(IList<Chunk<int>> cs) => IO.print(
'chunkN(10) sizes: ${cs.map((Chunk<int> c) => c.size)}',
),
); // IList(10, 10, 5)

// chunkAll(): accumulate every element into one chunk — use with care on large streams
final allInOne = Rill.range(0, 25).chunkAll().compile.toIList.flatMap(
(IList<Chunk<int>> cs) => IO.print(
'chunkAll sizes: ${cs.map((Chunk<int> c) => c.size)}',
),
); // IList(25)

// chunkLimit(n): cap chunk size at n without merging smaller chunks
final capped = Rill.emits([1, 2, 3, 4, 5, 6, 7])
.chunkLimit(3)
.compile
.toIList
.flatMap(
(IList<Chunk<int>> cs) => IO.print(
'chunkLimit(3) sizes: ${cs.map((Chunk<int> c) => c.size)}',
),
); // IList(3, 3, 1)

return fixedSize.productR(() => allInOne).productR(() => capped);
}
CombinatorReturnsDescription
chunkN(n)Rill<Chunk<O>>Re-batch into chunks of exactly n (last may be smaller)
chunkAll()Rill<Chunk<O>>Accumulate every element into one chunk
chunkLimit(n)Rill<Chunk<O>>Cap chunk size at n; does not merge smaller chunks
chunkMin(n)Rill<Chunk<O>>Guarantee chunks of at least n elements
buffer(n)Rill<O>Re-batch transparently — elements unchanged, boundaries reset to n
caution

chunkAll() accumulates the entire stream in memory before emitting. Only use it when you know the stream is bounded and fits comfortably in RAM.


Bulk transforms with mapChunks

mapChunks(f) applies a function to each Chunk as a whole and emits the results as a flat stream of elements. It is the lowest-overhead way to transform a stream: there is no dispatch per element, and the chunk's internal array is passed directly to f.

// mapChunks transforms each chunk as a whole, returning Rill<O2>.
// This is the same mechanism used internally by filter, map, collect, etc.

IO<Unit> chunkMapChunks() => Rill.range(0, 20)
// Apply filter and map at chunk granularity — no per-element overhead
.mapChunks(
(Chunk<int> c) => c.filter((int n) => n.isEven).map((int n) => n * 10),
)
.compile
.toIList
.flatMap((IList<int> xs) => IO.print('evens x10: $xs'));

mapChunks is how the built-in combinators are implemented:

// These are the actual implementations inside Rill
Rill<O> filter(pred) => mapChunks((c) => c.filter(pred));
Rill<O2> map(f) => mapChunks((c) => c.map(f));
Rill<O2> collect(f) => mapChunks((c) => c.collect(f));

Use mapChunks directly when you want to apply multiple chunk-level operations in a single pass, or when you need access to the chunk's structural properties (its size, a splitAt, etc.) before emitting.