Skip to content

Pipe

A Pipe<I, O> is a stream transformation — a reusable function that turns a Rill<I> into a Rill<O>.

dart
typedef Pipe<I, O> = Function1<Rill<I>, Rill<O>>;

Because it is just a function, every Dart function that takes a Rill<I> and returns a Rill<O> is already a valid Pipe. There is no wrapping or special constructor required.

Applying a Pipe

Rill<O>.through(pipe) applies a pipe to a stream. It is equivalent to calling the pipe function directly, but reads more naturally in a chain:

dart
// These two lines are identical:
rill.through(myPipe)
myPipe(rill)
dart
// A Pipe<I, O> is just a type alias for a function from Rill<I> to Rill<O>.
// Define one as an ordinary Dart function or a named variable.

Pipe<int, String> numberToLabel = (rill) => rill.map((int n) => 'item-$n');

IO<Unit> pipeBasic() => Rill.range(1, 4)
    .through(numberToLabel)
    .compile
    .toIList
    .flatMap((IList<String> xs) => IO.print('labels: $xs')); // [item-1, item-2, item-3]

Composing Pipes

Pipes compose by chaining .through calls, or by wrapping multiple pipes in a new function to give the combination a name:

dart
// Compose pipes by applying one after the other with .through, or wrap
// two pipes in a new function to create a named composed pipe.

Pipe<int, String> evensAsLabels = (rill) => rill.filter((int n) => n.isEven).through(numberToLabel);

IO<Unit> pipeCompose() => Rill.range(1, 11)
    .through(evensAsLabels)
    .compile
    .toIList
    .flatMap(
      (IList<String> xs) => IO.print('even labels: $xs'),
    ); // [item-2, item-4, item-6, item-8, item-10]

Because Pipe<I, O> is a plain function type, you can also store and pass pipes as first-class values, return them from factories, and parameterise them over runtime configuration — all without any special machinery.

Built-in Pipes

Pipes.text provides pipes for common text and binary encoding tasks:

UTF-8

PipeTypeDescription
Pipes.text.utf8.decodePipe<int, String>Decode raw bytes as UTF-8
Pipes.text.utf8.encodePipe<String, int>Encode strings to UTF-8 bytes

Lines

PipeTypeDescription
Pipes.text.linesPipe<String, String>Split a string stream at \n / \r\n
Pipes.text.linesLimited(n)Pipe<String, String>Same, but raises an error if any line exceeds n characters

Base64

PipeTypeDescription
Pipes.text.base64.decodePipe<String, int>Decode base64 text to bytes
Pipes.text.base64.encodePipe<int, String>Encode bytes to base64 text
Pipes.text.base64.decodeWithAlphabet(a)Pipe<String, int>Decode with a custom Base64Alphabet
Pipes.text.base64.encodeWithAlphabet(a)Pipe<int, String>Encode with a custom Base64Alphabet

Hex

PipeTypeDescription
Pipes.text.hex.decodePipe<String, int>Decode hex text to bytes (strips leading 0x/0X)
Pipes.text.hex.encodePipe<int, String>Encode bytes to lowercase hex text
Pipes.text.hex.decodeWithAlphabet(a)Pipe<String, int>Decode with a custom HexAlphabet
Pipes.text.hex.encodeWithAlphabet(a)Pipe<int, String>Encode with a custom HexAlphabet

Text processing pipeline

The most common use of Pipes.text is decoding a raw byte stream from a file or network socket into lines of text. utf8.decode followed by lines is the standard pattern:

dart
/// Decode a raw byte stream to newline-delimited strings.
///
/// This is the typical shape for reading a text file:
///   bytes → UTF-8 decode → split by line
Rill<String> decodeLines(Rill<int> bytes) =>
    bytes.through(Pipes.text.utf8.decode).through(Pipes.text.lines);

/// Encode a `Rill<String>` to UTF-8 bytes then to a base64 string stream.
Rill<String> encodeBase64(Rill<String> text) =>
    text.through(Pipes.text.utf8.encode).through(Pipes.text.base64.encode);

IO<Unit> pipeTextExample() {
  // Simulate a raw byte stream for "hello\nworld" encoded as UTF-8.
  final bytes = Rill.emits('hello\nworld'.codeUnits.toList());

  return decodeLines(bytes).compile.toIList.flatMap(
    (IList<String> lines) => IO.print('lines: $lines'),
  ); // [hello, world]
}

The same composability applies to encoding: chain utf8.encode into base64.encode to produce a base64 representation of any text stream.