> ## Documentation Index
> Fetch the complete documentation index at: https://bytekit.mintlify.site/llms.txt
> Use this file to discover all available pages before exploring further.

# Pipeline

> Typed, lazy, immutable data pipelines — compose map, filter, and reduce with full TypeScript inference across sync and async transformations.

## Import

```ts theme={null}
import { pipe, map, filter, reduce, Pipeline } from "bytekit/pipeline";
// or from the root entry:
import { pipe, map, filter, reduce } from "bytekit";
```

## What it does

The pipeline system lets you compose typed transformation operators into a reusable, **lazy**, **immutable** pipeline. Nothing executes until you call `.process(data)`. Each `.pipe()` call returns a **new** `Pipeline` instance — the original is never mutated.

* **`map`** — transform each element. Async functions run **concurrently** (`Promise.all`).
* **`filter`** — retain elements by predicate. Async predicates run **concurrently**.
* **`reduce`** — accumulate to a scalar. Reducer steps run **sequentially**.

## `PipelineOp<TIn, TOut>`

```ts theme={null}
type PipelineOp<TIn, TOut> = (input: TIn) => TOut | Promise<TOut>;
```

The atomic unit of a pipeline — a function that transforms a value synchronously or asynchronously.

## `Pipeline<TIn, TOut>`

```ts theme={null}
class Pipeline<TIn, TOut> {
  pipe<TNext>(op: PipelineOp<TOut, TNext>): Pipeline<TIn, TNext>;
  process(data: TIn): Promise<TOut>;
}
```

| Method          | Description                                                                |
| --------------- | -------------------------------------------------------------------------- |
| `pipe(op)`      | Appends an operator. Returns a **new** `Pipeline`. Does not mutate `this`. |
| `process(data)` | Executes all operators sequentially. Always returns a `Promise`.           |

## `pipe(...ops)`

Creates a typed `Pipeline` from a sequence of operators. Type inference flows left-to-right for up to 7 operators. Use the escape-hatch overload for dynamic/longer pipelines.

```ts theme={null}
// 1-op — full inference
const p1 = pipe(filter<number>((n) => n > 0));

// 3-op — Pipeline<number[], number> inferred automatically
const p3 = pipe(
  filter<number>((n) => n > 0),
  map<number, string>((n) => n.toFixed(2)),
  reduce<string, number>((acc, s) => acc + s.length, 0)
);

// Escape hatch for dynamic / long pipelines (no type inference):
const dynamic = pipe<number[]>(...ops);
```

## `map<T, U>(fn)`

```ts theme={null}
function map<T, U>(
  fn: (item: T, index: number) => U | Promise<U>
): PipelineOp<T[], U[]>
```

| Parameter | Type                                          | Description                                            |
| --------- | --------------------------------------------- | ------------------------------------------------------ |
| `fn`      | `(item: T, index: number) => U \| Promise<U>` | Mapping function. Receives item and its 0-based index. |
| Returns   | `PipelineOp<T[], U[]>`                        | All items processed concurrently; order preserved.     |

## `filter<T>(fn)`

```ts theme={null}
function filter<T>(
  fn: (item: T, index: number) => boolean | Promise<boolean>
): PipelineOp<T[], T[]>
```

| Parameter | Type                                                      | Description                                                                    |
| --------- | --------------------------------------------------------- | ------------------------------------------------------------------------------ |
| `fn`      | `(item: T, index: number) => boolean \| Promise<boolean>` | Predicate function.                                                            |
| Returns   | `PipelineOp<T[], T[]>`                                    | All predicates evaluated concurrently; retained items preserve original order. |

## `reduce<T, U>(fn, initial)`

```ts theme={null}
function reduce<T, U>(
  fn: (acc: U, item: T, index: number) => U | Promise<U>,
  initial: U
): PipelineOp<T[], U>
```

| Parameter | Type                                                  | Description                                                          |
| --------- | ----------------------------------------------------- | -------------------------------------------------------------------- |
| `fn`      | `(acc: U, item: T, index: number) => U \| Promise<U>` | Reducer function. Runs sequentially — each step awaits the previous. |
| `initial` | `U`                                                   | Starting accumulator value. Returned unchanged for empty arrays.     |
| Returns   | `PipelineOp<T[], U>`                                  | Collapses the array to a single accumulated value.                   |

## Behaviour guarantees

| Guarantee           | Details                                                           |
| ------------------- | ----------------------------------------------------------------- |
| Lazy execution      | No operator runs until `.process()` is called                     |
| Immutability        | `pipe()` and `.pipe()` never mutate existing instances            |
| Async normalisation | Sync return values are wrapped in `Promise.resolve` automatically |
| Concurrency         | `map` and `filter` run all items concurrently via `Promise.all`   |
| Sequential reduce   | Each reducer step awaits the previous; deterministic output       |
| Error propagation   | Errors from operators propagate unchanged from `.process()`       |
| Empty input         | `map` → `[]`, `filter` → `[]`, `reduce` → `initial`               |

## Examples

### Sync composition

```ts theme={null}
import { pipe, filter, map, reduce } from "bytekit/pipeline";

interface Product {
  id: number;
  name: string;
  priceCents: number;
  inStock: boolean;
}

const totalRevenue = await pipe(
  filter<Product>((p) => p.inStock),
  map<Product, number>((p) => p.priceCents),
  reduce<number, number>((acc, price) => acc + price, 0)
).process(products);
// Type: number
```

### Async map (concurrent enrichment)

```ts theme={null}
import { pipe, map } from "bytekit/pipeline";

const enriched = await pipe(
  map<Order, EnrichedOrder>(async (order) => ({
    ...order,
    userName: await fetchUser(order.userId),
  }))
).process(orders);
// All fetchUser() calls run concurrently — output order is preserved
```

### Immutable builder — reuse and extend

```ts theme={null}
const base = pipe(filter<Product>((p) => p.inStock));

// base is not mutated; each .pipe() returns a new instance
const names  = base.pipe(map<Product, string>((p) => p.name));
const prices = base.pipe(map<Product, number>((p) => p.priceCents / 100));

const [nameList, priceList] = await Promise.all([
  names.process(products),
  prices.process(products),
]);
```

### ApiClient integration

Apply a pipeline to the response body automatically, after parsing and validation.

```ts theme={null}
import { ApiClient, pipe, filter, map } from "bytekit";

interface RawProduct { id: number; price_cents: number; active: boolean }
interface Product    { id: number; price: number }

const client = new ApiClient({ baseUrl: "https://api.example.com" });

const products = await client.get<RawProduct[]>("/products", {
  pipeline: pipe(
    filter<RawProduct>((p) => p.active),
    map<RawProduct, Product>((p) => ({
      id: p.id,
      price: p.price_cents / 100,
    }))
  ),
});
// Type inferred from pipeline output
```

### Error handling

```ts theme={null}
try {
  await pipe(
    map<number, number>((n) => {
      if (n < 0) throw new RangeError("negative");
      return Math.sqrt(n);
    })
  ).process([4, 9, -1]);
} catch (err) {
  // err is the original RangeError — not wrapped
}
```

<Tip>
  All three operator factories accept both sync and async functions — you don't need to change anything to switch between them.
</Tip>

<Warning>
  `reduce` is always sequential. If you need parallel reduction (e.g., independent batch writes), use `map` followed by `Promise.all` outside the pipeline.
</Warning>
