Skip to main content

Import

import { StreamingHelper } from "bytekit/streaming";

What it does

StreamingHelper provides static methods for consuming server-sent events (SSE), newline-delimited JSON (NDJSON) streams, and file downloads with progress tracking. The new fetchSSE method uses the Fetch API instead of EventSource, unlocking POST requests, custom headers, AbortSignal, and typed multi-event streams.

Methods

fetchSSE<T>(endpoint, options?)

const ac = new AbortController();

for await (const ev of StreamingHelper.fetchSSE<ChatChunk>("/bff/ai/stream", {
  method: "POST",
  body: { prompt: "Explain monads" },
  headers: { Authorization: `Bearer ${token}` },
  signal: ac.signal,
  eventTypes: ["delta", "done"],
})) {
  if (ev.event === "delta") appendChunk(ev.data);
  if (ev.event === "done") finish();
}
Returns an AsyncGenerator<SSEEvent<T>> that you consume with for await. The connection closes automatically when the server ends the stream or when you abort via signal.
ParameterTypeDescription
endpointstringURL of the SSE endpoint.
optionsFetchSSEOptionsOptional configuration (see below).
fetchSSE is the recommended approach for modern SSE consumption. It supports every HTTP method, request bodies, and cancellation — none of which are possible with the browser-native EventSource API.

streamSSE<T>(endpoint, options?)

const sse = StreamingHelper.streamSSE<NotificationEvent>("/events", {
  headers: { Authorization: `Bearer ${token}` },
});

sse.subscribe((event) => console.log(event));

// Later…
sse.close();
Legacy EventSource-based SSE. Returns { subscribe, close }.
ParameterTypeDescription
endpointstringURL of the SSE endpoint.
optionsStreamOptionsOptional configuration.

streamJsonLines<T>(endpoint, options?)

const response = await StreamingHelper.streamJsonLines<LogEntry>("/logs/stream", {
  onChunk: (entry) => appendLog(entry),
  timeout: 30_000,
});

console.log(response.data);     // LogEntry[]
console.log(response.complete);  // true
Consumes a newline-delimited JSON (NDJSON) stream. Returns a Promise<StreamResponse<T>> once the stream completes.

downloadStream(endpoint, options?)

const blob = await StreamingHelper.downloadStream("/files/report.pdf", {
  onChunk: (chunk) => updateProgress(chunk),
});

saveAs(blob, "report.pdf");
Downloads a binary resource as a Blob with optional progress tracking.

Types

SSEEvent<T>

PropertyTypeDescription
eventstringEvent name (defaults to "message").
dataT | stringParsed payload (JSON) or raw string when raw: true.
idstringOptional event ID sent by the server.
retrynumberOptional reconnect interval hint (ms).

FetchSSEOptions

PropertyTypeDefaultDescription
methodstring"GET"HTTP method.
bodyunknownRequest body (automatically serialized to JSON).
headersRecord<string, string>Extra request headers.
signalAbortSignalAbort signal for cancellation.
eventTypesstring[]Only yield events whose event field is in this list.
rawbooleanfalseWhen true, data is returned as a raw string without JSON parsing.

StreamOptions

PropertyTypeDefaultDescription
timeoutnumberConnection timeout in ms.
headersRecord<string, string>Extra request headers.
onChunk(chunk: T) => voidCallback fired for each received item.
onError(error: Error) => voidError handler.
onComplete() => voidCalled when the stream finishes.

StreamResponse<T>

PropertyTypeDescription
dataT[]Collected items.
completebooleanWhether the stream finished normally.
errorErrorError (if the stream failed).

streamSSE vs fetchSSE

FeaturestreamSSEfetchSSE
TransportEventSourcefetch + ReadableStream
HTTP methodsGET onlyAny (GET, POST, …)
Request body
Custom headersLimited
AbortSignal
Multiple event typesSingle handlerFilter with eventTypes
Consumer APIsubscribe / closefor await … of
Browser supportAll browsersBrowsers with ReadableStream

Examples

AI chat streaming with abort

import { StreamingHelper } from "bytekit/streaming";

interface ChatChunk {
  text: string;
  done: boolean;
}

const controller = new AbortController();

// Wire the cancel button
cancelBtn.onclick = () => controller.abort();

for await (const ev of StreamingHelper.fetchSSE<ChatChunk>("/bff/ai/stream", {
  method: "POST",
  body: { prompt: userInput, model: "gpt-4" },
  signal: controller.signal,
})) {
  outputEl.textContent += ev.data.text;
  if (ev.data.done) break;
}

NDJSON log tail

import { StreamingHelper } from "bytekit/streaming";

const { data, complete } = await StreamingHelper.streamJsonLines<LogEntry>(
  "/api/logs?follow=true",
  {
    onChunk: (entry) => renderLogLine(entry),
    onError: (err) => console.error("Stream error:", err),
    timeout: 60_000,
  },
);

File download with progress

import { StreamingHelper } from "bytekit/streaming";

const blob = await StreamingHelper.downloadStream("/exports/data.csv", {
  onChunk: () => progressBar.increment(),
});

const url = URL.createObjectURL(blob);
downloadLink.href = url;
Use fetchSSE with an AbortController for AI/LLM streaming — it gives you full control over cancellation, unlike EventSource.
streamSSE relies on the EventSource API which only supports GET requests. If your endpoint requires POST or custom headers, use fetchSSE instead.