Import
import {
sleep,
timeout,
withTimeout,
retry,
parallel,
sequential,
race,
allSettled,
debounceAsync,
throttleAsync,
PromisePool,
RequestQueue,
RequestBatcher,
TimeoutError,
AbortError,
RetryError,
PoolTimeoutError,
QueueAbortError,
} from "bytekit/async";
What this module is for
Use bytekit/async when you need control over timing, retries, concurrency, and flow control outside the HTTP client itself. Every export is a standalone function or class — composable and zero-dependency.
Timing
sleep
Delay execution for a given number of milliseconds. Supports an optional AbortSignal.
await sleep(1000);
// With abort support
const controller = new AbortController();
await sleep(5000, controller.signal);
timeout
Add a deadline to any promise. Throws TimeoutError if the promise does not settle in time.
const data = await timeout(fetchReport(), 5000);
// Custom error message
const data = await timeout(fetchReport(), 5000, "Report fetch timed out");
withTimeout
Wrap a function so every call has a built-in timeout.
const safeFetch = withTimeout(fetchReport, 5000);
const data = await safeFetch(reportId);
Retry
retry
Retry a failed async operation with configurable backoff.
const data = await retry(() => fetchData(), {
maxAttempts: 5,
baseDelay: 1000,
backoff: "exponential",
});
RetryOptions
| Option | Type | Default | Description |
|---|
maxAttempts | number | 3 | Maximum number of attempts. |
baseDelay | number | 1000 | Starting delay between retries (ms). |
maxDelay | number | Infinity | Maximum delay cap (ms). |
backoff | "exponential" | "linear" | (attempt: number) => number | "exponential" | Backoff strategy or custom function. |
shouldRetry | (error: Error) => boolean | — | Predicate to decide whether to retry. |
signal | AbortSignal | — | Cancellation signal. |
Backoff strategies
// Exponential (default): 1s → 2s → 4s → 8s ...
await retry(fn, { backoff: "exponential" });
// Linear: 1s → 2s → 3s → 4s ...
await retry(fn, { backoff: "linear" });
// Custom function
await retry(fn, { backoff: (attempt) => attempt * 500 });
Concurrency and coordination
parallel
Run async tasks with an optional concurrency limit.
const results = await parallel(
files.map((file) => () => uploadFile(file)),
{ concurrency: 5 }
);
ParallelOptions
| Option | Type | Description |
|---|
concurrency | number | Maximum number of tasks running at once. |
sequential
Run async tasks one after another.
const results = await sequential(
steps.map((s) => () => processStep(s)),
{ continueOnError: true }
);
SequentialOptions
| Option | Type | Description |
|---|
continueOnError | boolean | Keep executing after a task failure. |
race
Enhanced race that resolves with the first successful promise. If every promise rejects, throws an AggregateError.
const fastest = await race([
fetchFromPrimary(),
fetchFromSecondary(),
sleep(5000).then(() => { throw new Error("Timeout"); }),
]);
allSettled
Collect all results into fulfilled and rejected buckets with their original indices.
const { fulfilled, rejected } = await allSettled([
fetchUser(1),
fetchUser(2),
fetchUser(999),
]);
// fulfilled: [{ value: User, index: 0 }, { value: User, index: 1 }]
// rejected: [{ reason: Error, index: 2 }]
AllSettledResult<T>
| Property | Type |
|---|
fulfilled | Array<{ value: T; index: number }> |
rejected | Array<{ reason: unknown; index: number }> |
PromisePool
A reusable, class-based concurrency controller. Unlike parallel(), PromisePool:
- Persists across multiple
run() calls.
- Isolates task errors via
onError — the pool keeps running when one task fails.
- Supports per-task timeouts that throw
PoolTimeoutError.
const pool = new PromisePool({ concurrency: 3, timeout: 5000 });
const results = await pool.run(
files.map((file) => () => uploadFile(file))
);
// Reuse the same pool for a second batch
const more = await pool.run(
others.map((file) => () => uploadFile(file))
);
PromisePoolOptions
| Option | Type | Default | Description |
|---|
concurrency | number | — | Max tasks running at once. Minimum: 1. |
timeout | number | — | Per-task timeout in ms. Throws PoolTimeoutError when exceeded. |
onError | (error: Error, taskIndex: number) => void | — | Called when a task fails. Pool continues executing remaining tasks. |
Error handling with onError
const pool = new PromisePool({
concurrency: 2,
timeout: 3000,
onError(error, taskIndex) {
if (error instanceof PoolTimeoutError) {
console.warn(`Task ${taskIndex} timed out`);
} else {
console.error(`Task ${taskIndex} failed:`, error.message);
}
},
});
const results = await pool.run(
endpoints.map((url) => () => fetch(url).then((r) => r.json()))
);
ApiClient integration
Pass a pool option to ApiClient to rate-limit all outgoing requests:
const api = new ApiClient({
baseUrl: "https://api.example.com",
pool: { concurrency: 2, timeout: 5000 },
});
// All 4 requests share the same pool — at most 2 run at a time
const [users, posts, comments, tags] = await Promise.all([
api.request("/users"),
api.request("/posts"),
api.request("/comments"),
api.request("/tags"),
]);
Debounce and throttle
debounceAsync
Debounce an async function. Returns a DebouncedFunction with cancel() and flush() methods.
const search = debounceAsync(
async (query: string) => {
const res = await fetch(`/api/search?q=${query}`);
return res.json();
},
300,
{ leading: false, trailing: true }
);
// Use it
const results = await search("bytekit");
// Cancel pending execution
search.cancel();
// Force immediate execution
const immediate = await search.flush();
DebounceOptions
| Option | Type | Description |
|---|
leading | boolean | Fire on the leading edge. |
trailing | boolean | Fire on the trailing edge. |
throttleAsync
Throttle an async function. Returns a ThrottledFunction with a cancel() method.
const save = throttleAsync(
async (data: FormData) => {
await fetch("/api/save", { method: "POST", body: data });
},
2000,
{ trailing: true }
);
save(formData);
// Cancel pending trailing call
save.cancel();
ThrottleOptions
| Option | Type | Description |
|---|
trailing | boolean | Fire a trailing call after the interval. |
Error classes
TimeoutError
Thrown by timeout() and withTimeout() when a deadline is exceeded.
| Property | Type | Description |
|---|
timeout | number | The timeout value that was exceeded (ms). |
AbortError
Thrown when an operation is cancelled via AbortSignal.
RetryError
Thrown when all retry attempts are exhausted.
| Property | Type | Description |
|---|
attempts | number | Total attempts executed. |
lastError | Error | The error from the final attempt. |
PoolTimeoutError
Thrown by PromisePool when an individual task exceeds its configured timeout.
try {
await pool.run([() => slowOperation()]);
} catch (error) {
if (error instanceof PoolTimeoutError) {
console.error(error.message); // "Task timed out after 5000ms"
}
}
QueueAbortError
Thrown by RequestQueue when a queued task is cancelled — either via an external AbortSignal or the internal cancel(id) mechanism.
const controller = new AbortController();
queue.add(myTask, { signal: controller.signal }).catch((err) => {
if (err instanceof QueueAbortError) {
console.log("Task was cancelled");
}
});
controller.abort(); // fires QueueAbortError
RequestQueue
A priority-aware, concurrency-limited task queue. At most concurrency tasks run simultaneously. Three priority lanes — high, normal, low — control execution order when multiple tasks are waiting.
Constructor
const queue = new RequestQueue({ concurrency: 3 });
| Option | Type | Required | Description |
|---|
concurrency | number | ✅ | Max simultaneous tasks. Must be ≥ 1. |
onError | (error: Error, id: string) => void | — | Called when a task rejects. Queue continues. |
add(task, options?)
Enqueues a task and returns a Promise<T> that resolves/rejects with the task result.
const result = await queue.add(
(signal) => fetch("/api/data", { signal }),
{ priority: "high" }
);
| Option | Type | Default | Description |
|---|
priority | "high" | "normal" | "low" | "normal" | Execution lane. |
signal | AbortSignal | — | External cancellation. Fires QueueAbortError if aborted before the task starts. |
flush()
Returns a Promise<void> that resolves when all currently queued and running tasks have settled.
queue.add(taskA);
queue.add(taskB);
await queue.flush(); // waits for both to complete
State getters
| Getter | Type | Description |
|---|
size | number | Tasks waiting to start (queued, not running). |
running | number | Tasks currently executing. |
pending | number | size + running — total active work. |
Full example
import { RequestQueue, QueueAbortError } from "bytekit/async";
const queue = new RequestQueue({
concurrency: 3,
onError: (err, id) => console.warn(`Task ${id} failed:`, err.message),
});
// Priority ordering
const controller = new AbortController();
const critical = queue.add(
(signal) => fetch("/api/important", { signal }),
{ priority: "high" }
);
const background = queue.add(
(signal) => fetch("/api/sync", { signal }),
{ priority: "low", signal: controller.signal }
);
controller.abort(); // cancels `background` with QueueAbortError
await queue.flush();
RequestBatcher
Coalesces same-key HTTP requests within a time window into a single fetcher invocation. All callers sharing the same key receive the same resolved value.
The deduplication key defaults to "METHOD:url:body" and is fully customisable.
Constructor
const batcher = new RequestBatcher({ windowMs: 50 });
| Option | Type | Default | Description |
|---|
windowMs | number | — | Time window in ms. Must be > 0. |
maxSize | number | Infinity | Max requests per batch. Flushes early when reached. |
sliding | boolean | false | Reset the timer on each new request (sliding window). |
keyFn | (url, init) => string | "METHOD:url:body" | Custom key function for grouping requests. |
add(url, init, fetcher)
Adds a request to the current window. Returns Promise<T> resolving to the response.
const result = await batcher.add(
"/api/users",
{ method: "GET" },
(url, init) => fetch(url, init).then(r => r.json())
);
flush()
Forces immediate dispatch of all pending batches. Useful in tests or when you need results without waiting for the window.
pendingCount
Number of requests waiting across all batch buckets.
batcher.add("/a", { method: "GET" }, fetcher);
batcher.add("/a", { method: "GET" }, fetcher);
console.log(batcher.pendingCount); // 2
Full example
import { RequestBatcher } from "bytekit/async";
const batcher = new RequestBatcher({ windowMs: 50, maxSize: 20 });
// These three calls share the same key → fetcher invoked once
const [a, b, c] = await Promise.all([
batcher.add("/api/users", { method: "GET" }, fetch),
batcher.add("/api/users", { method: "GET" }, fetch),
batcher.add("/api/users", { method: "GET" }, fetch),
]);
// After 50ms (or when flush() is called), fetch fires once and all three resolve
// Different body → different key → dispatched separately
batcher.add("/api/items", { method: "POST", body: '{"id":1}' }, fetch);
batcher.add("/api/items", { method: "POST", body: '{"id":2}' }, fetch);
ApiClient integration
Pass queue or batch options to ApiClient for transparent integration:
import { ApiClient } from "bytekit";
// Concurrency-limited: at most 5 requests in-flight at once
const client = new ApiClient({
baseUrl: "https://api.example.com",
queue: { concurrency: 5 },
});
// Deduplication: same-URL GETs within 100ms share one response
const batchedClient = new ApiClient({
baseUrl: "https://api.example.com",
batch: { windowMs: 100 },
});
Combined example
import { PromisePool, PoolTimeoutError, retry, timeout, debounceAsync } from "bytekit/async";
// Retry a slow endpoint with a per-call timeout
const data = await retry(
() => timeout(fetchData(), 5000),
{ maxAttempts: 4, baseDelay: 250 }
);
// Upload files with bounded concurrency and per-task timeout
const pool = new PromisePool({
concurrency: 3,
timeout: 10_000,
onError: (err, i) => console.warn(`Upload ${i} failed:`, err instanceof PoolTimeoutError ? "timed out" : err.message),
});
const uploads = await pool.run(
files.map((f) => () => uploadFile(f))
);
// Debounced search input
const search = debounceAsync(fetchResults, 300);
All timing values are in milliseconds. Functions that accept an AbortSignal will throw AbortError when the signal fires.