StreamingHelper - sebamar88/bytekit GitHub Wiki
EN: Static utilities for streaming JSON lines (NDJSON), Server-Sent Events (SSE), and file downloads with progress tracking. Includes
fetchSSEβ a fetch-based SSE consumer that supports POST, AbortSignal, multiple event types, andfor await(async generator).ES: Utilidades estΓ‘ticas para streaming de lΓneas JSON (NDJSON), Server-Sent Events (SSE) y descargas de archivos con seguimiento de progreso. Incluye
fetchSSEβ un consumidor SSE basado en fetch que soporta POST, AbortSignal, mΓΊltiples tipos de evento yfor await(generador asΓncrono).
ES: Cuando trabajas con endpoints que devuelven datos incrementalmente β como logs en tiempo real, pipelines de IA, o datasets NDJSON grandes β necesitas procesar cada fragmento sin cargar todo en memoria. StreamingHelper provee dos APIs para SSE:
streamSSE (legacy) |
fetchSSE β (nuevo) |
|
|---|---|---|
| Transporte |
EventSource (solo GET) |
fetch + ReadableStream
|
| POST con body | β | β |
AbortSignal |
β | β |
| MΓΊltiples event types | Solo 1 eventType
|
Todos, o filtra con eventTypes[]
|
| API de consumo | Callbacks .subscribe()
|
for await β¦ of (async generator) |
id: / retry: fields |
β | β |
EN: When working with endpoints that return data incrementally β like real-time logs, AI pipelines, or large NDJSON datasets β you need to process each fragment without loading everything into memory. StreamingHelper provides two APIs for SSE:
streamSSE (legacy) |
fetchSSE β (new) |
|
|---|---|---|
| Transport |
EventSource (GET-only) |
fetch + ReadableStream
|
| POST with body | β | β |
AbortSignal |
β | β |
| Multiple event types | Single eventType
|
All, or filter with eventTypes[]
|
| Consumer API | Callbacks .subscribe()
|
for await β¦ of (async generator) |
id: / retry: fields |
β | β |
import { StreamingHelper } from "bytekit/streaming";
// Types (if needed) | Tipos (si es necesario):
import type { SSEEvent, FetchSSEOptions, StreamOptions, StreamResponse } from "bytekit/streaming";| Method / MΓ©todo | Signature / Firma | Description (EN) | DescripciΓ³n (ES) |
|---|---|---|---|
β fetchSSE
|
fetchSSE<T>(endpoint, options?): AsyncGenerator<SSEEvent<T>> |
fetch-based SSE with POST, abort, multi-event, for await
|
SSE basado en fetch con POST, abort, multi-evento, for await
|
streamSSE |
streamSSE<T>(endpoint, options?): { subscribe, close } |
Legacy EventSource-based SSE (GET only) | SSE basado en EventSource (solo GET) |
streamJsonLines |
streamJsonLines<T>(endpoint, options?): Promise<StreamResponse<T>> |
Stream NDJSON lines from an endpoint | Transmite lΓneas NDJSON desde un endpoint |
downloadStream |
downloadStream(endpoint, options?): Promise<Blob> |
Download a file with progress tracking | Descarga un archivo con seguimiento de progreso |
EN: A single parsed SSE event yielded by
fetchSSE. ES: Un evento SSE parseado individual, emitido porfetchSSE.
interface SSEEvent<T = unknown> {
event: string; // Event type (default: "message") | Tipo de evento (default: "message")
data: T | string; // Parsed JSON or raw string | JSON parseado o string crudo
id?: string; // Optional server-sent ID | ID opcional enviado por el servidor
retry?: number; // Optional retry interval (ms) | Intervalo de reintento opcional (ms)
}| Property / Propiedad | Type / Tipo | Default | Description (EN) | DescripciΓ³n (ES) |
|---|---|---|---|---|
method |
string |
"GET" |
HTTP method (POST, PUT, etc.) | MΓ©todo HTTP (POST, PUT, etc.) |
body |
BodyInit | Record<string, unknown> | unknown |
β | Request body. Objects are auto-JSON-stringified | Body de la peticiΓ³n. Los objetos se serializan a JSON automΓ‘ticamente |
headers |
Record<string, string> |
{} |
Custom headers. Accept: text/event-stream is always set |
Headers personalizados. Accept: text/event-stream se envΓa siempre |
signal |
AbortSignal |
β | Cancellation signal | SeΓ±al de cancelaciΓ³n |
eventTypes |
string[] |
β | Filter: only yield these event types. Omit = yield all | Filtro: solo emite estos tipos de evento. Omitir = emitir todos |
raw |
boolean |
false |
If true, data is always a raw string (no JSON parse) |
Si es true, data siempre es string crudo (sin JSON.parse) |
| Property / Propiedad | Type / Tipo | Default | Description (EN) | DescripciΓ³n (ES) |
|---|---|---|---|---|
timeout |
number |
30000 |
Request timeout in milliseconds | Timeout de la peticiΓ³n en milisegundos |
headers |
Record<string, string> |
{} |
Custom request headers | Headers personalizados |
onChunk |
(chunk: string) => void |
β | Called for each successfully parsed chunk | Se llama por cada chunk parseado exitosamente |
onError |
(error: Error) => void |
β | Called when an error occurs | Se llama cuando ocurre un error |
onComplete |
() => void |
β | Called when the stream finishes | Se llama cuando el stream termina |
interface StreamResponse<T> {
data: T[]; // All parsed items collected | Todos los items parseados recolectados
complete: boolean; // Whether stream finished successfully | Si el stream terminΓ³ exitosamente
error?: Error; // Error object if stream failed | Objeto Error si el stream fallΓ³
}EN:
fetchSSE()callsfetch()with the given method, body, and headers. It reads the response as aReadableStream, parses lines following the SSE specification, andyieldsSSEEventobjects. The parser handles multi-linedata:fields,event:,id:,retry:fields, and comment lines (:prefix). Because it returns anAsyncGenerator, you consume it withfor await β¦ of.ES:
fetchSSE()llama afetch()con el mΓ©todo, body y headers dados. Lee la respuesta comoReadableStream, parsea las lΓneas siguiendo la especificaciΓ³n SSE, y haceyieldde objetosSSEEvent. El parser maneja camposdata:multi-lΓnea,event:,id:,retry:, y lΓneas de comentario (prefijo:). Como retorna unAsyncGenerator, se consume confor await β¦ of.
fetchSSE() β fetch + ReadableStream + SSE parser
ββββββββββββββββββββββββββββββββββββββββββββββββββ
fetch(endpoint, { method, body, headers, signal })
β
ReadableStream.getReader()
β
ββββΌββββββββββββββββββββββββββββββββββββ
β Buffer accumulates text chunks β
β β
β Parse SSE fields line by line: β
β event: data β
β data: {"token":"hello"} β
β id: evt-42 β
β retry: 5000 β
β : comment (skipped) β
β β
β On blank line β yield SSEEvent<T> β
ββββ¬ββββββββββββββββββββββββββββββββββββ
β
for await (const ev of ...) { ... }
streamSSE() β EventSource (GET only)
βββββββββββββββββββββββββββββββββββββββββ
new EventSource(endpoint)
β
addEventListener(eventType, handler)
β
ββββΌββββββββββββββββββββββββββββββββββββ
β SSE event β JSON.parse β subscribers β
β SSE event β JSON.parse β subscribers β
ββββ¬ββββββββββββββββββββββββββββββββββββ
β
close() to disconnect
import { StreamingHelper } from "bytekit/streaming";
import type { SSEEvent } from "bytekit/streaming";
// EN: Your critical use case β POST to an AI orchestrator with body + abort + multi-event
// ES: Tu caso de uso crΓtico β POST a un orquestador de IA con body + abort + multi-evento
interface PipelineChunk { token?: string; text?: string; }
const ac = new AbortController();
// Cancel after 30 seconds if needed
setTimeout(() => ac.abort(), 30_000);
for await (const ev of StreamingHelper.fetchSSE<PipelineChunk>(
"/bff/ai/orchestrator-stream",
{
method: "POST",
body: { prompt: "Explain DDD in 3 sentences", model: "gpt-4o" },
signal: ac.signal,
eventTypes: ["data", "log", "heartbeat"],
}
)) {
switch (ev.event) {
case "data":
process.stdout.write(ev.data.token ?? "");
break;
case "log":
console.debug("[server log]", ev.data);
break;
case "heartbeat":
// keep-alive β ignore
break;
}
}import { StreamingHelper } from "bytekit/streaming";
import type { SSEEvent } from "bytekit/streaming";
import { useEffect, useRef, useState } from "react";
interface TokenChunk { token: string; done: boolean; }
function useAIStream(prompt: string) {
const [text, setText] = useState("");
const [loading, setLoading] = useState(false);
const abortRef = useRef<AbortController | null>(null);
const start = async () => {
abortRef.current?.abort();
const ac = new AbortController();
abortRef.current = ac;
setLoading(true);
setText("");
try {
for await (const ev of StreamingHelper.fetchSSE<TokenChunk>(
"/bff/ai/chat-stream",
{
method: "POST",
body: { prompt },
signal: ac.signal,
eventTypes: ["data"],
}
)) {
const chunk = ev.data as TokenChunk;
setText((prev) => prev + chunk.token);
if (chunk.done) break;
}
} catch (err) {
if ((err as Error).name !== "AbortError") throw err;
} finally {
setLoading(false);
}
};
const stop = () => abortRef.current?.abort();
useEffect(() => () => abortRef.current?.abort(), []);
return { text, loading, start, stop };
}// EN: Even for simple GET, fetchSSE gives you for-await
// ES: Incluso para GET simple, fetchSSE te da for-await
for await (const ev of StreamingHelper.fetchSSE<{ price: number }>(
"https://api.example.com/stock/AAPL"
)) {
console.log(`AAPL: $${(ev.data as { price: number }).price}`);
}import { StreamingHelper } from "bytekit/streaming";
interface LogEntry { timestamp: string; level: string; message: string; }
const result = await StreamingHelper.streamJsonLines<LogEntry>(
"https://api.example.com/logs/stream",
{
timeout: 60000,
headers: { Authorization: "Bearer token123" },
onChunk: (raw) => console.log("Received line:", raw),
onComplete: () => console.log("Stream finished!"),
onError: (err) => console.error("Stream error:", err.message),
}
);
console.log(`Received ${result.data.length} log entries`);// EN: Still works for simple GET SSE. Use fetchSSE for anything more.
// ES: Sigue funcionando para SSE GET simple. Usa fetchSSE para cualquier otra cosa.
const stream = StreamingHelper.streamSSE<{ title: string }>(
"https://api.example.com/events",
{ eventType: "notification" }
);
const unsub = stream.subscribe((data) => console.log(data.title));
// Later:
unsub();
stream.close();const blob = await StreamingHelper.downloadStream(
"https://api.example.com/reports/annual.pdf",
{
timeout: 120000,
onProgress: (pct) => console.log(`${pct}%`),
onComplete: () => console.log("Download complete!"),
}
);
const url = URL.createObjectURL(blob);
const a = document.createElement("a");
a.href = url;
a.download = "annual-report.pdf";
a.click();
URL.revokeObjectURL(url);- ApiClient β HTTP client for standard request/response patterns / Cliente HTTP para patrones estΓ‘ndar
- WebSocketHelper β Bidirectional real-time communication / ComunicaciΓ³n bidireccional en tiempo real
-
Async-Toolkit β
withTimeout,retryβ combine withfetchSSEfor resilience / combina confetchSSEpara resiliencia - FileUploadHelper β Upload files with chunking and progress / Sube archivos con chunking y progreso