Skip to main content

Import

import {
  sleep,
  timeout,
  withTimeout,
  retry,
  parallel,
  sequential,
  race,
  allSettled,
  debounceAsync,
  throttleAsync,
  PromisePool,
  RequestQueue,
  RequestBatcher,
  TimeoutError,
  AbortError,
  RetryError,
  PoolTimeoutError,
  QueueAbortError,
} from "bytekit/async";

What this module is for

Use bytekit/async when you need control over timing, retries, concurrency, and flow control outside the HTTP client itself. Every export is a standalone function or class — composable and zero-dependency.

Timing

sleep

Delay execution for a given number of milliseconds. Supports an optional AbortSignal.
await sleep(1000);

// With abort support
const controller = new AbortController();
await sleep(5000, controller.signal);

timeout

Add a deadline to any promise. Throws TimeoutError if the promise does not settle in time.
const data = await timeout(fetchReport(), 5000);

// Custom error message
const data = await timeout(fetchReport(), 5000, "Report fetch timed out");

withTimeout

Wrap a function so every call has a built-in timeout.
const safeFetch = withTimeout(fetchReport, 5000);
const data = await safeFetch(reportId);

Retry

retry

Retry a failed async operation with configurable backoff.
const data = await retry(() => fetchData(), {
  maxAttempts: 5,
  baseDelay: 1000,
  backoff: "exponential",
});

RetryOptions

OptionTypeDefaultDescription
maxAttemptsnumber3Maximum number of attempts.
baseDelaynumber1000Starting delay between retries (ms).
maxDelaynumberInfinityMaximum delay cap (ms).
backoff"exponential" | "linear" | (attempt: number) => number"exponential"Backoff strategy or custom function.
shouldRetry(error: Error) => booleanPredicate to decide whether to retry.
signalAbortSignalCancellation signal.

Backoff strategies

// Exponential (default): 1s → 2s → 4s → 8s ...
await retry(fn, { backoff: "exponential" });

// Linear: 1s → 2s → 3s → 4s ...
await retry(fn, { backoff: "linear" });

// Custom function
await retry(fn, { backoff: (attempt) => attempt * 500 });

Concurrency and coordination

parallel

Run async tasks with an optional concurrency limit.
const results = await parallel(
  files.map((file) => () => uploadFile(file)),
  { concurrency: 5 }
);

ParallelOptions

OptionTypeDescription
concurrencynumberMaximum number of tasks running at once.

sequential

Run async tasks one after another.
const results = await sequential(
  steps.map((s) => () => processStep(s)),
  { continueOnError: true }
);

SequentialOptions

OptionTypeDescription
continueOnErrorbooleanKeep executing after a task failure.

race

Enhanced race that resolves with the first successful promise. If every promise rejects, throws an AggregateError.
const fastest = await race([
  fetchFromPrimary(),
  fetchFromSecondary(),
  sleep(5000).then(() => { throw new Error("Timeout"); }),
]);

allSettled

Collect all results into fulfilled and rejected buckets with their original indices.
const { fulfilled, rejected } = await allSettled([
  fetchUser(1),
  fetchUser(2),
  fetchUser(999),
]);

// fulfilled: [{ value: User, index: 0 }, { value: User, index: 1 }]
// rejected:  [{ reason: Error, index: 2 }]

AllSettledResult<T>

PropertyType
fulfilledArray<{ value: T; index: number }>
rejectedArray<{ reason: unknown; index: number }>

PromisePool

A reusable, class-based concurrency controller. Unlike parallel(), PromisePool:
  • Persists across multiple run() calls.
  • Isolates task errors via onError — the pool keeps running when one task fails.
  • Supports per-task timeouts that throw PoolTimeoutError.
const pool = new PromisePool({ concurrency: 3, timeout: 5000 });

const results = await pool.run(
  files.map((file) => () => uploadFile(file))
);

// Reuse the same pool for a second batch
const more = await pool.run(
  others.map((file) => () => uploadFile(file))
);

PromisePoolOptions

OptionTypeDefaultDescription
concurrencynumberMax tasks running at once. Minimum: 1.
timeoutnumberPer-task timeout in ms. Throws PoolTimeoutError when exceeded.
onError(error: Error, taskIndex: number) => voidCalled when a task fails. Pool continues executing remaining tasks.

Error handling with onError

const pool = new PromisePool({
  concurrency: 2,
  timeout: 3000,
  onError(error, taskIndex) {
    if (error instanceof PoolTimeoutError) {
      console.warn(`Task ${taskIndex} timed out`);
    } else {
      console.error(`Task ${taskIndex} failed:`, error.message);
    }
  },
});

const results = await pool.run(
  endpoints.map((url) => () => fetch(url).then((r) => r.json()))
);

ApiClient integration

Pass a pool option to ApiClient to rate-limit all outgoing requests:
const api = new ApiClient({
  baseUrl: "https://api.example.com",
  pool: { concurrency: 2, timeout: 5000 },
});

// All 4 requests share the same pool — at most 2 run at a time
const [users, posts, comments, tags] = await Promise.all([
  api.request("/users"),
  api.request("/posts"),
  api.request("/comments"),
  api.request("/tags"),
]);

Debounce and throttle

debounceAsync

Debounce an async function. Returns a DebouncedFunction with cancel() and flush() methods.
const search = debounceAsync(
  async (query: string) => {
    const res = await fetch(`/api/search?q=${query}`);
    return res.json();
  },
  300,
  { leading: false, trailing: true }
);

// Use it
const results = await search("bytekit");

// Cancel pending execution
search.cancel();

// Force immediate execution
const immediate = await search.flush();

DebounceOptions

OptionTypeDescription
leadingbooleanFire on the leading edge.
trailingbooleanFire on the trailing edge.

throttleAsync

Throttle an async function. Returns a ThrottledFunction with a cancel() method.
const save = throttleAsync(
  async (data: FormData) => {
    await fetch("/api/save", { method: "POST", body: data });
  },
  2000,
  { trailing: true }
);

save(formData);

// Cancel pending trailing call
save.cancel();

ThrottleOptions

OptionTypeDescription
trailingbooleanFire a trailing call after the interval.

Error classes

TimeoutError

Thrown by timeout() and withTimeout() when a deadline is exceeded.
PropertyTypeDescription
timeoutnumberThe timeout value that was exceeded (ms).

AbortError

Thrown when an operation is cancelled via AbortSignal.

RetryError

Thrown when all retry attempts are exhausted.
PropertyTypeDescription
attemptsnumberTotal attempts executed.
lastErrorErrorThe error from the final attempt.

PoolTimeoutError

Thrown by PromisePool when an individual task exceeds its configured timeout.
try {
  await pool.run([() => slowOperation()]);
} catch (error) {
  if (error instanceof PoolTimeoutError) {
    console.error(error.message); // "Task timed out after 5000ms"
  }
}

QueueAbortError

Thrown by RequestQueue when a queued task is cancelled — either via an external AbortSignal or the internal cancel(id) mechanism.
const controller = new AbortController();
queue.add(myTask, { signal: controller.signal }).catch((err) => {
  if (err instanceof QueueAbortError) {
    console.log("Task was cancelled");
  }
});
controller.abort(); // fires QueueAbortError

RequestQueue

A priority-aware, concurrency-limited task queue. At most concurrency tasks run simultaneously. Three priority lanes — high, normal, low — control execution order when multiple tasks are waiting.

Constructor

const queue = new RequestQueue({ concurrency: 3 });
OptionTypeRequiredDescription
concurrencynumberMax simultaneous tasks. Must be ≥ 1.
onError(error: Error, id: string) => voidCalled when a task rejects. Queue continues.

add(task, options?)

Enqueues a task and returns a Promise<T> that resolves/rejects with the task result.
const result = await queue.add(
  (signal) => fetch("/api/data", { signal }),
  { priority: "high" }
);
OptionTypeDefaultDescription
priority"high" | "normal" | "low""normal"Execution lane.
signalAbortSignalExternal cancellation. Fires QueueAbortError if aborted before the task starts.

flush()

Returns a Promise<void> that resolves when all currently queued and running tasks have settled.
queue.add(taskA);
queue.add(taskB);
await queue.flush(); // waits for both to complete

State getters

GetterTypeDescription
sizenumberTasks waiting to start (queued, not running).
runningnumberTasks currently executing.
pendingnumbersize + running — total active work.

Full example

import { RequestQueue, QueueAbortError } from "bytekit/async";

const queue = new RequestQueue({
  concurrency: 3,
  onError: (err, id) => console.warn(`Task ${id} failed:`, err.message),
});

// Priority ordering
const controller = new AbortController();
const critical = queue.add(
  (signal) => fetch("/api/important", { signal }),
  { priority: "high" }
);
const background = queue.add(
  (signal) => fetch("/api/sync", { signal }),
  { priority: "low", signal: controller.signal }
);

controller.abort(); // cancels `background` with QueueAbortError

await queue.flush();

RequestBatcher

Coalesces same-key HTTP requests within a time window into a single fetcher invocation. All callers sharing the same key receive the same resolved value. The deduplication key defaults to "METHOD:url:body" and is fully customisable.

Constructor

const batcher = new RequestBatcher({ windowMs: 50 });
OptionTypeDefaultDescription
windowMsnumberTime window in ms. Must be > 0.
maxSizenumberInfinityMax requests per batch. Flushes early when reached.
slidingbooleanfalseReset the timer on each new request (sliding window).
keyFn(url, init) => string"METHOD:url:body"Custom key function for grouping requests.

add(url, init, fetcher)

Adds a request to the current window. Returns Promise<T> resolving to the response.
const result = await batcher.add(
  "/api/users",
  { method: "GET" },
  (url, init) => fetch(url, init).then(r => r.json())
);

flush()

Forces immediate dispatch of all pending batches. Useful in tests or when you need results without waiting for the window.
await batcher.flush();

pendingCount

Number of requests waiting across all batch buckets.
batcher.add("/a", { method: "GET" }, fetcher);
batcher.add("/a", { method: "GET" }, fetcher);
console.log(batcher.pendingCount); // 2

Full example

import { RequestBatcher } from "bytekit/async";

const batcher = new RequestBatcher({ windowMs: 50, maxSize: 20 });

// These three calls share the same key → fetcher invoked once
const [a, b, c] = await Promise.all([
  batcher.add("/api/users", { method: "GET" }, fetch),
  batcher.add("/api/users", { method: "GET" }, fetch),
  batcher.add("/api/users", { method: "GET" }, fetch),
]);
// After 50ms (or when flush() is called), fetch fires once and all three resolve

// Different body → different key → dispatched separately
batcher.add("/api/items", { method: "POST", body: '{"id":1}' }, fetch);
batcher.add("/api/items", { method: "POST", body: '{"id":2}' }, fetch);

ApiClient integration

Pass queue or batch options to ApiClient for transparent integration:
import { ApiClient } from "bytekit";

// Concurrency-limited: at most 5 requests in-flight at once
const client = new ApiClient({
  baseUrl: "https://api.example.com",
  queue: { concurrency: 5 },
});

// Deduplication: same-URL GETs within 100ms share one response
const batchedClient = new ApiClient({
  baseUrl: "https://api.example.com",
  batch: { windowMs: 100 },
});

Combined example

import { PromisePool, PoolTimeoutError, retry, timeout, debounceAsync } from "bytekit/async";

// Retry a slow endpoint with a per-call timeout
const data = await retry(
  () => timeout(fetchData(), 5000),
  { maxAttempts: 4, baseDelay: 250 }
);

// Upload files with bounded concurrency and per-task timeout
const pool = new PromisePool({
  concurrency: 3,
  timeout: 10_000,
  onError: (err, i) => console.warn(`Upload ${i} failed:`, err instanceof PoolTimeoutError ? "timed out" : err.message),
});
const uploads = await pool.run(
  files.map((f) => () => uploadFile(f))
);

// Debounced search input
const search = debounceAsync(fetchResults, 300);
All timing values are in milliseconds. Functions that accept an AbortSignal will throw AbortError when the signal fires.