Import
import { StreamingHelper } from "bytekit/streaming";
What it does
StreamingHelper provides static methods for consuming server-sent events (SSE), newline-delimited JSON (NDJSON) streams, and file downloads with progress tracking. The new fetchSSE method uses the Fetch API instead of EventSource, unlocking POST requests, custom headers, AbortSignal, and typed multi-event streams.
Methods
fetchSSE<T>(endpoint, options?) ⭐
const ac = new AbortController();
for await (const ev of StreamingHelper.fetchSSE<ChatChunk>("/bff/ai/stream", {
method: "POST",
body: { prompt: "Explain monads" },
headers: { Authorization: `Bearer ${token}` },
signal: ac.signal,
eventTypes: ["delta", "done"],
})) {
if (ev.event === "delta") appendChunk(ev.data);
if (ev.event === "done") finish();
}
Returns an AsyncGenerator<SSEEvent<T>> that you consume with for await. The connection closes automatically when the server ends the stream or when you abort via signal.
| Parameter | Type | Description |
|---|
endpoint | string | URL of the SSE endpoint. |
options | FetchSSEOptions | Optional configuration (see below). |
fetchSSE is the recommended approach for modern SSE consumption. It supports every HTTP method, request bodies, and cancellation — none of which are possible with the browser-native EventSource API.
streamSSE<T>(endpoint, options?)
const sse = StreamingHelper.streamSSE<NotificationEvent>("/events", {
headers: { Authorization: `Bearer ${token}` },
});
sse.subscribe((event) => console.log(event));
// Later…
sse.close();
Legacy EventSource-based SSE. Returns { subscribe, close }.
| Parameter | Type | Description |
|---|
endpoint | string | URL of the SSE endpoint. |
options | StreamOptions | Optional configuration. |
streamJsonLines<T>(endpoint, options?)
const response = await StreamingHelper.streamJsonLines<LogEntry>("/logs/stream", {
onChunk: (entry) => appendLog(entry),
timeout: 30_000,
});
console.log(response.data); // LogEntry[]
console.log(response.complete); // true
Consumes a newline-delimited JSON (NDJSON) stream. Returns a Promise<StreamResponse<T>> once the stream completes.
downloadStream(endpoint, options?)
const blob = await StreamingHelper.downloadStream("/files/report.pdf", {
onChunk: (chunk) => updateProgress(chunk),
});
saveAs(blob, "report.pdf");
Downloads a binary resource as a Blob with optional progress tracking.
Types
SSEEvent<T>
| Property | Type | Description |
|---|
event | string | Event name (defaults to "message"). |
data | T | string | Parsed payload (JSON) or raw string when raw: true. |
id | string | Optional event ID sent by the server. |
retry | number | Optional reconnect interval hint (ms). |
FetchSSEOptions
| Property | Type | Default | Description |
|---|
method | string | "GET" | HTTP method. |
body | unknown | — | Request body (automatically serialized to JSON). |
headers | Record<string, string> | — | Extra request headers. |
signal | AbortSignal | — | Abort signal for cancellation. |
eventTypes | string[] | — | Only yield events whose event field is in this list. |
raw | boolean | false | When true, data is returned as a raw string without JSON parsing. |
StreamOptions
| Property | Type | Default | Description |
|---|
timeout | number | — | Connection timeout in ms. |
headers | Record<string, string> | — | Extra request headers. |
onChunk | (chunk: T) => void | — | Callback fired for each received item. |
onError | (error: Error) => void | — | Error handler. |
onComplete | () => void | — | Called when the stream finishes. |
StreamResponse<T>
| Property | Type | Description |
|---|
data | T[] | Collected items. |
complete | boolean | Whether the stream finished normally. |
error | Error | Error (if the stream failed). |
streamSSE vs fetchSSE
| Feature | streamSSE | fetchSSE |
|---|
| Transport | EventSource | fetch + ReadableStream |
| HTTP methods | GET only | Any (GET, POST, …) |
| Request body | ❌ | ✅ |
| Custom headers | Limited | ✅ |
AbortSignal | ❌ | ✅ |
| Multiple event types | Single handler | Filter with eventTypes |
| Consumer API | subscribe / close | for await … of |
| Browser support | All browsers | Browsers with ReadableStream |
Examples
AI chat streaming with abort
import { StreamingHelper } from "bytekit/streaming";
interface ChatChunk {
text: string;
done: boolean;
}
const controller = new AbortController();
// Wire the cancel button
cancelBtn.onclick = () => controller.abort();
for await (const ev of StreamingHelper.fetchSSE<ChatChunk>("/bff/ai/stream", {
method: "POST",
body: { prompt: userInput, model: "gpt-4" },
signal: controller.signal,
})) {
outputEl.textContent += ev.data.text;
if (ev.data.done) break;
}
NDJSON log tail
import { StreamingHelper } from "bytekit/streaming";
const { data, complete } = await StreamingHelper.streamJsonLines<LogEntry>(
"/api/logs?follow=true",
{
onChunk: (entry) => renderLogLine(entry),
onError: (err) => console.error("Stream error:", err),
timeout: 60_000,
},
);
File download with progress
import { StreamingHelper } from "bytekit/streaming";
const blob = await StreamingHelper.downloadStream("/exports/data.csv", {
onChunk: () => progressBar.increment(),
});
const url = URL.createObjectURL(blob);
downloadLink.href = url;
Use fetchSSE with an AbortController for AI/LLM streaming — it gives you full control over cancellation, unlike EventSource.
streamSSE relies on the EventSource API which only supports GET requests. If your endpoint requires POST or custom headers, use fetchSSE instead.