Pipe
A Pipe<I, O> is a stream transformation — a reusable function that turns a Rill<I> into a Rill<O>.
typedef Pipe<I, O> = Function1<Rill<I>, Rill<O>>;Because it is just a function, every Dart function that takes a Rill<I> and returns a Rill<O> is already a valid Pipe. There is no wrapping or special constructor required.
Applying a Pipe
Rill<O>.through(pipe) applies a pipe to a stream. It is equivalent to calling the pipe function directly, but reads more naturally in a chain:
// These two lines are identical:
rill.through(myPipe)
myPipe(rill)// A Pipe<I, O> is just a type alias for a function from Rill<I> to Rill<O>.
// Define one as an ordinary Dart function or a named variable.
Pipe<int, String> numberToLabel = (rill) => rill.map((int n) => 'item-$n');
IO<Unit> pipeBasic() => Rill.range(1, 4)
.through(numberToLabel)
.compile
.toIList
.flatMap((IList<String> xs) => IO.print('labels: $xs')); // [item-1, item-2, item-3]Composing Pipes
Pipes compose by chaining .through calls, or by wrapping multiple pipes in a new function to give the combination a name:
// Compose pipes by applying one after the other with .through, or wrap
// two pipes in a new function to create a named composed pipe.
Pipe<int, String> evensAsLabels = (rill) => rill.filter((int n) => n.isEven).through(numberToLabel);
IO<Unit> pipeCompose() => Rill.range(1, 11)
.through(evensAsLabels)
.compile
.toIList
.flatMap(
(IList<String> xs) => IO.print('even labels: $xs'),
); // [item-2, item-4, item-6, item-8, item-10]Because Pipe<I, O> is a plain function type, you can also store and pass pipes as first-class values, return them from factories, and parameterise them over runtime configuration — all without any special machinery.
Built-in Pipes
Pipes.text provides pipes for common text and binary encoding tasks:
UTF-8
| Pipe | Type | Description |
|---|---|---|
Pipes.text.utf8.decode | Pipe<int, String> | Decode raw bytes as UTF-8 |
Pipes.text.utf8.encode | Pipe<String, int> | Encode strings to UTF-8 bytes |
Lines
| Pipe | Type | Description |
|---|---|---|
Pipes.text.lines | Pipe<String, String> | Split a string stream at \n / \r\n |
Pipes.text.linesLimited(n) | Pipe<String, String> | Same, but raises an error if any line exceeds n characters |
Base64
| Pipe | Type | Description |
|---|---|---|
Pipes.text.base64.decode | Pipe<String, int> | Decode base64 text to bytes |
Pipes.text.base64.encode | Pipe<int, String> | Encode bytes to base64 text |
Pipes.text.base64.decodeWithAlphabet(a) | Pipe<String, int> | Decode with a custom Base64Alphabet |
Pipes.text.base64.encodeWithAlphabet(a) | Pipe<int, String> | Encode with a custom Base64Alphabet |
Hex
| Pipe | Type | Description |
|---|---|---|
Pipes.text.hex.decode | Pipe<String, int> | Decode hex text to bytes (strips leading 0x/0X) |
Pipes.text.hex.encode | Pipe<int, String> | Encode bytes to lowercase hex text |
Pipes.text.hex.decodeWithAlphabet(a) | Pipe<String, int> | Decode with a custom HexAlphabet |
Pipes.text.hex.encodeWithAlphabet(a) | Pipe<int, String> | Encode with a custom HexAlphabet |
Text processing pipeline
The most common use of Pipes.text is decoding a raw byte stream from a file or network socket into lines of text. utf8.decode followed by lines is the standard pattern:
/// Decode a raw byte stream to newline-delimited strings.
///
/// This is the typical shape for reading a text file:
/// bytes → UTF-8 decode → split by line
Rill<String> decodeLines(Rill<int> bytes) =>
bytes.through(Pipes.text.utf8.decode).through(Pipes.text.lines);
/// Encode a `Rill<String>` to UTF-8 bytes then to a base64 string stream.
Rill<String> encodeBase64(Rill<String> text) =>
text.through(Pipes.text.utf8.encode).through(Pipes.text.base64.encode);
IO<Unit> pipeTextExample() {
// Simulate a raw byte stream for "hello\nworld" encoded as UTF-8.
final bytes = Rill.emits('hello\nworld'.codeUnits.toList());
return decodeLines(bytes).compile.toIList.flatMap(
(IList<String> lines) => IO.print('lines: $lines'),
); // [hello, world]
}The same composability applies to encoding: chain utf8.encode into base64.encode to produce a base64 representation of any text stream.