Streaming Element streams

Element streams

connectElementStream emits one full row per chunk — for SSE, LLM Responses elements, and paginated REST.

Use connectElementStream when the source emits one full row per chunk: LLM Responses elements, SSE messages where each event is a complete row, paginated REST results streamed in.

Signature

ts
function connectElementStream<TRow extends Record<string, unknown>>( grid: GridLike<TRow>, stream: AsyncIterable<TRow>, ): StreamConnection;
  • grid — anything implementing GridLike (a method applyTransaction({ add?, update?, remove? })). The Pretable engine implements this directly; for React state you can supply a thin shim, as shown below.
  • stream — any AsyncIterable<TRow>. The function reads it to completion and applies each yielded row via applyTransaction({ add: [row] }), batched per microtask.
  • Returns a StreamConnection{ done: Promise<void>; dispose(): void }.

Lifecycle

  • done resolves when the source iterator finishes; rejects if it throws.
  • dispose() stops consuming the source. Already-buffered rows still flush; further chunks are ignored.

Recipe — OpenAI Responses

tsx
"use client"; import { useEffect, useState } from "react"; import { connectElementStream } from "@pretable/stream-adapter"; import { Pretable } from "@pretable/react"; import { openai } from "./openai-client"; interface ChatRow { id: string; role: "user" | "assistant"; content: string; } export function ChatGrid({ prompt }: { prompt: string }) { const [rows, setRows] = useState<ChatRow[]>([]); useEffect(() => { const conn = connectElementStream<ChatRow>( { applyTransaction: (tx) => { if (tx.add) setRows((r) => [...r, ...tx.add!]); }, }, openai.responses.stream({ model: "gpt-5", input: prompt }), ); return () => conn.dispose(); }, [prompt]); return ( <Pretable rows={rows} columns={[ { id: "role", header: "Role", widthPx: 100 }, { id: "content", header: "Content", widthPx: 480, wrap: true }, ]} getRowId={(r) => r.id} /> ); }

Recipe — SSE / EventSource

ts
async function* fromEventSource(url: string): AsyncIterable<Row> { const es = new EventSource(url); const queue: Row[] = []; let resolve!: () => void; let pending = new Promise<void>((r) => (resolve = r)); es.onmessage = (e) => { queue.push(JSON.parse(e.data)); resolve(); pending = new Promise<void>((r) => (resolve = r)); }; while (true) { while (queue.length) yield queue.shift()!; await pending; } } connectElementStream(grid, fromEventSource("/api/events"));

When you have raw JSON instead of typed values

If your source yields strings instead of objects (e.g. Response.body from fetch), pipe through parseElementStream first:

ts
import { parseElementStream } from "@pretable/stream-adapter"; const res = await fetch("/api/events"); connectElementStream(grid, parseElementStream(res.body));

See Parsers →.