Import
import { pipe, map, filter, reduce, Pipeline } from "bytekit/pipeline";
// or from the root entry:
import { pipe, map, filter, reduce } from "bytekit";
What it does
The pipeline system lets you compose typed transformation operators into a reusable, lazy, immutable pipeline. Nothing executes until you call .process(data). Each .pipe() call returns a new Pipeline instance — the original is never mutated.
map — transform each element. Async functions run concurrently (Promise.all).
filter — retain elements by predicate. Async predicates run concurrently.
reduce — accumulate to a scalar. Reducer steps run sequentially.
PipelineOp<TIn, TOut>
type PipelineOp<TIn, TOut> = (input: TIn) => TOut | Promise<TOut>;
The atomic unit of a pipeline — a function that transforms a value synchronously or asynchronously.
Pipeline<TIn, TOut>
class Pipeline<TIn, TOut> {
pipe<TNext>(op: PipelineOp<TOut, TNext>): Pipeline<TIn, TNext>;
process(data: TIn): Promise<TOut>;
}
| Method | Description |
|---|
pipe(op) | Appends an operator. Returns a new Pipeline. Does not mutate this. |
process(data) | Executes all operators sequentially. Always returns a Promise. |
pipe(...ops)
Creates a typed Pipeline from a sequence of operators. Type inference flows left-to-right for up to 7 operators. Use the escape-hatch overload for dynamic/longer pipelines.
// 1-op — full inference
const p1 = pipe(filter<number>((n) => n > 0));
// 3-op — Pipeline<number[], number> inferred automatically
const p3 = pipe(
filter<number>((n) => n > 0),
map<number, string>((n) => n.toFixed(2)),
reduce<string, number>((acc, s) => acc + s.length, 0)
);
// Escape hatch for dynamic / long pipelines (no type inference):
const dynamic = pipe<number[]>(...ops);
map<T, U>(fn)
function map<T, U>(
fn: (item: T, index: number) => U | Promise<U>
): PipelineOp<T[], U[]>
| Parameter | Type | Description |
|---|
fn | (item: T, index: number) => U | Promise<U> | Mapping function. Receives item and its 0-based index. |
| Returns | PipelineOp<T[], U[]> | All items processed concurrently; order preserved. |
filter<T>(fn)
function filter<T>(
fn: (item: T, index: number) => boolean | Promise<boolean>
): PipelineOp<T[], T[]>
| Parameter | Type | Description |
|---|
fn | (item: T, index: number) => boolean | Promise<boolean> | Predicate function. |
| Returns | PipelineOp<T[], T[]> | All predicates evaluated concurrently; retained items preserve original order. |
reduce<T, U>(fn, initial)
function reduce<T, U>(
fn: (acc: U, item: T, index: number) => U | Promise<U>,
initial: U
): PipelineOp<T[], U>
| Parameter | Type | Description |
|---|
fn | (acc: U, item: T, index: number) => U | Promise<U> | Reducer function. Runs sequentially — each step awaits the previous. |
initial | U | Starting accumulator value. Returned unchanged for empty arrays. |
| Returns | PipelineOp<T[], U> | Collapses the array to a single accumulated value. |
Behaviour guarantees
| Guarantee | Details |
|---|
| Lazy execution | No operator runs until .process() is called |
| Immutability | pipe() and .pipe() never mutate existing instances |
| Async normalisation | Sync return values are wrapped in Promise.resolve automatically |
| Concurrency | map and filter run all items concurrently via Promise.all |
| Sequential reduce | Each reducer step awaits the previous; deterministic output |
| Error propagation | Errors from operators propagate unchanged from .process() |
| Empty input | map → [], filter → [], reduce → initial |
Examples
Sync composition
import { pipe, filter, map, reduce } from "bytekit/pipeline";
interface Product {
id: number;
name: string;
priceCents: number;
inStock: boolean;
}
const totalRevenue = await pipe(
filter<Product>((p) => p.inStock),
map<Product, number>((p) => p.priceCents),
reduce<number, number>((acc, price) => acc + price, 0)
).process(products);
// Type: number
Async map (concurrent enrichment)
import { pipe, map } from "bytekit/pipeline";
const enriched = await pipe(
map<Order, EnrichedOrder>(async (order) => ({
...order,
userName: await fetchUser(order.userId),
}))
).process(orders);
// All fetchUser() calls run concurrently — output order is preserved
Immutable builder — reuse and extend
const base = pipe(filter<Product>((p) => p.inStock));
// base is not mutated; each .pipe() returns a new instance
const names = base.pipe(map<Product, string>((p) => p.name));
const prices = base.pipe(map<Product, number>((p) => p.priceCents / 100));
const [nameList, priceList] = await Promise.all([
names.process(products),
prices.process(products),
]);
ApiClient integration
Apply a pipeline to the response body automatically, after parsing and validation.
import { ApiClient, pipe, filter, map } from "bytekit";
interface RawProduct { id: number; price_cents: number; active: boolean }
interface Product { id: number; price: number }
const client = new ApiClient({ baseUrl: "https://api.example.com" });
const products = await client.get<RawProduct[]>("/products", {
pipeline: pipe(
filter<RawProduct>((p) => p.active),
map<RawProduct, Product>((p) => ({
id: p.id,
price: p.price_cents / 100,
}))
),
});
// Type inferred from pipeline output
Error handling
try {
await pipe(
map<number, number>((n) => {
if (n < 0) throw new RangeError("negative");
return Math.sqrt(n);
})
).process([4, 9, -1]);
} catch (err) {
// err is the original RangeError — not wrapped
}
All three operator factories accept both sync and async functions — you don’t need to change anything to switch between them.
reduce is always sequential. If you need parallel reduction (e.g., independent batch writes), use map followed by Promise.all outside the pipeline.