Skip to main content

Stream JSON Lines

Use StreamingHelper.streamJsonLines when a dataset is too large to load all at once.
import { StreamingHelper } from "bytekit";

interface LogEntry {
  id: string;
  level: "info" | "error";
  message: string;
  timestamp: string;
}

const result = await StreamingHelper.streamJsonLines<LogEntry>(
  "https://api.example.com/logs/stream",
  {
    timeout: 60_000,
    onChunk: (chunk) => {
      console.log("received", chunk);
    },
    onComplete: () => {
      console.log("stream finished");
    },
  }
);
fetchSSE is a fetch-based SSE consumer that supports any HTTP method, request bodies, AbortSignal, multiple event types, and returns an AsyncGenerator so you can use for await … of.
Use fetchSSE instead of streamSSE whenever you need POST requests, request bodies, cancellation, or multiple event types. streamSSE (EventSource-based) remains available for simple GET-only SSE.

POST with body and cancellation

import { StreamingHelper } from "bytekit";

interface TokenChunk {
  token: string;
  done: boolean;
}

const ac = new AbortController();

for await (const ev of StreamingHelper.fetchSSE<TokenChunk>(
  "/bff/ai/orchestrator-stream",
  {
    method: "POST",
    body: { prompt: "Explain DDD", model: "gpt-4o" },
    signal: ac.signal,
    eventTypes: ["data", "log", "heartbeat"],
  }
)) {
  if (ev.event === "data") {
    const chunk = ev.data as TokenChunk;
    process.stdout.write(chunk.token);
    if (chunk.done) break;
  }
}

Filter by event type

Pass eventTypes to restrict which SSE event types are yielded. Omit it to receive all events.
for await (const ev of StreamingHelper.fetchSSE<{ price: number }>(
  "https://api.example.com/stock/stream",
  { eventTypes: ["price-update"] }
)) {
  console.log(`Price: $${(ev.data as { price: number }).price}`);
}

Legacy SSE with streamSSE

For simple GET endpoints where you don’t need POST, body, or for await, the EventSource-based API still works:
import { StreamingHelper } from "bytekit";

const stream = StreamingHelper.streamSSE<{ symbol: string; price: number }>(
  "https://api.example.com/prices",
  {
    onError: (error) => console.error(error),
    onComplete: () => console.log("connection closed"),
  }
);

const unsubscribe = stream.subscribe((data) => {
  console.log(`${data.symbol}: ${data.price}`);
});

// Later:
// unsubscribe();
// stream.close();

Track streamed downloads

import { StreamingHelper } from "bytekit";

const blob = await StreamingHelper.downloadStream(
  "https://example.com/large-file.zip",
  {
    onProgress: (percent) => {
      console.log(`progress: ${percent}%`);
    },
  }
);

When to use which method

NeedMethod
NDJSON / JSON linesstreamJsonLines
SSE with POST, body, abort, multi-eventfetchSSE
SSE with simple GET (legacy)streamSSE
File download with progressdownloadStream

Best practices

  • Use fetchSSE for any SSE endpoint that requires POST or cancellation.
  • Pass an AbortSignal to cancel long-lived streams when components unmount.
  • Set explicit timeouts for long-lived streams.
  • Process chunks incrementally if the dataset is large.
  • Filter with eventTypes to ignore keep-alive or debug events you don’t need.