Skip to main content

Import

import { pipe, map, filter, reduce, Pipeline } from "bytekit/pipeline";
// or from the root entry:
import { pipe, map, filter, reduce } from "bytekit";

What it does

The pipeline system lets you compose typed transformation operators into a reusable, lazy, immutable pipeline. Nothing executes until you call .process(data). Each .pipe() call returns a new Pipeline instance — the original is never mutated.
  • map — transform each element. Async functions run concurrently (Promise.all).
  • filter — retain elements by predicate. Async predicates run concurrently.
  • reduce — accumulate to a scalar. Reducer steps run sequentially.

PipelineOp<TIn, TOut>

type PipelineOp<TIn, TOut> = (input: TIn) => TOut | Promise<TOut>;
The atomic unit of a pipeline — a function that transforms a value synchronously or asynchronously.

Pipeline<TIn, TOut>

class Pipeline<TIn, TOut> {
  pipe<TNext>(op: PipelineOp<TOut, TNext>): Pipeline<TIn, TNext>;
  process(data: TIn): Promise<TOut>;
}
MethodDescription
pipe(op)Appends an operator. Returns a new Pipeline. Does not mutate this.
process(data)Executes all operators sequentially. Always returns a Promise.

pipe(...ops)

Creates a typed Pipeline from a sequence of operators. Type inference flows left-to-right for up to 7 operators. Use the escape-hatch overload for dynamic/longer pipelines.
// 1-op — full inference
const p1 = pipe(filter<number>((n) => n > 0));

// 3-op — Pipeline<number[], number> inferred automatically
const p3 = pipe(
  filter<number>((n) => n > 0),
  map<number, string>((n) => n.toFixed(2)),
  reduce<string, number>((acc, s) => acc + s.length, 0)
);

// Escape hatch for dynamic / long pipelines (no type inference):
const dynamic = pipe<number[]>(...ops);

map<T, U>(fn)

function map<T, U>(
  fn: (item: T, index: number) => U | Promise<U>
): PipelineOp<T[], U[]>
ParameterTypeDescription
fn(item: T, index: number) => U | Promise<U>Mapping function. Receives item and its 0-based index.
ReturnsPipelineOp<T[], U[]>All items processed concurrently; order preserved.

filter<T>(fn)

function filter<T>(
  fn: (item: T, index: number) => boolean | Promise<boolean>
): PipelineOp<T[], T[]>
ParameterTypeDescription
fn(item: T, index: number) => boolean | Promise<boolean>Predicate function.
ReturnsPipelineOp<T[], T[]>All predicates evaluated concurrently; retained items preserve original order.

reduce<T, U>(fn, initial)

function reduce<T, U>(
  fn: (acc: U, item: T, index: number) => U | Promise<U>,
  initial: U
): PipelineOp<T[], U>
ParameterTypeDescription
fn(acc: U, item: T, index: number) => U | Promise<U>Reducer function. Runs sequentially — each step awaits the previous.
initialUStarting accumulator value. Returned unchanged for empty arrays.
ReturnsPipelineOp<T[], U>Collapses the array to a single accumulated value.

Behaviour guarantees

GuaranteeDetails
Lazy executionNo operator runs until .process() is called
Immutabilitypipe() and .pipe() never mutate existing instances
Async normalisationSync return values are wrapped in Promise.resolve automatically
Concurrencymap and filter run all items concurrently via Promise.all
Sequential reduceEach reducer step awaits the previous; deterministic output
Error propagationErrors from operators propagate unchanged from .process()
Empty inputmap[], filter[], reduceinitial

Examples

Sync composition

import { pipe, filter, map, reduce } from "bytekit/pipeline";

interface Product {
  id: number;
  name: string;
  priceCents: number;
  inStock: boolean;
}

const totalRevenue = await pipe(
  filter<Product>((p) => p.inStock),
  map<Product, number>((p) => p.priceCents),
  reduce<number, number>((acc, price) => acc + price, 0)
).process(products);
// Type: number

Async map (concurrent enrichment)

import { pipe, map } from "bytekit/pipeline";

const enriched = await pipe(
  map<Order, EnrichedOrder>(async (order) => ({
    ...order,
    userName: await fetchUser(order.userId),
  }))
).process(orders);
// All fetchUser() calls run concurrently — output order is preserved

Immutable builder — reuse and extend

const base = pipe(filter<Product>((p) => p.inStock));

// base is not mutated; each .pipe() returns a new instance
const names  = base.pipe(map<Product, string>((p) => p.name));
const prices = base.pipe(map<Product, number>((p) => p.priceCents / 100));

const [nameList, priceList] = await Promise.all([
  names.process(products),
  prices.process(products),
]);

ApiClient integration

Apply a pipeline to the response body automatically, after parsing and validation.
import { ApiClient, pipe, filter, map } from "bytekit";

interface RawProduct { id: number; price_cents: number; active: boolean }
interface Product    { id: number; price: number }

const client = new ApiClient({ baseUrl: "https://api.example.com" });

const products = await client.get<RawProduct[]>("/products", {
  pipeline: pipe(
    filter<RawProduct>((p) => p.active),
    map<RawProduct, Product>((p) => ({
      id: p.id,
      price: p.price_cents / 100,
    }))
  ),
});
// Type inferred from pipeline output

Error handling

try {
  await pipe(
    map<number, number>((n) => {
      if (n < 0) throw new RangeError("negative");
      return Math.sqrt(n);
    })
  ).process([4, 9, -1]);
} catch (err) {
  // err is the original RangeError — not wrapped
}
All three operator factories accept both sync and async functions — you don’t need to change anything to switch between them.
reduce is always sequential. If you need parallel reduction (e.g., independent batch writes), use map followed by Promise.all outside the pipeline.