# Element streams

connectElementStream emits one full row per chunk — for SSE, LLM Responses elements, and paginated REST.


Use `connectElementStream` when the source emits one full row per chunk: LLM Responses elements, SSE messages where each event is a complete row, paginated REST results streamed in.

## Signature

```ts
function connectElementStream<TRow extends Record<string, unknown>>(
  grid: GridLike<TRow>,
  stream: AsyncIterable<TRow>,
): StreamConnection;
```

- `grid` — anything implementing `GridLike` (a method `applyTransaction({ add?, update?, remove? })`). The Pretable engine implements this directly; for React state you can supply a thin shim, as shown below.
- `stream` — any `AsyncIterable<TRow>`. The function reads it to completion and applies each yielded row via `applyTransaction({ add: [row] })`, batched per microtask.
- Returns a `StreamConnection` — `{ done: Promise<void>; dispose(): void }`.

## Lifecycle

- `done` resolves when the source iterator finishes; rejects if it throws.
- `dispose()` stops consuming the source. Already-buffered rows still flush; further chunks are ignored.

## Recipe — OpenAI Responses

```tsx
"use client";
import { useEffect, useState } from "react";
import { connectElementStream } from "@pretable/stream-adapter";
import { Pretable } from "@pretable/react";
import { openai } from "./openai-client";

interface ChatRow {
  id: string;
  role: "user" | "assistant";
  content: string;
}

export function ChatGrid({ prompt }: { prompt: string }) {
  const [rows, setRows] = useState<ChatRow[]>([]);
  useEffect(() => {
    const conn = connectElementStream<ChatRow>(
      {
        applyTransaction: (tx) => {
          if (tx.add) setRows((r) => [...r, ...tx.add!]);
        },
      },
      openai.responses.stream({ model: "gpt-5", input: prompt }),
    );
    return () => conn.dispose();
  }, [prompt]);
  return (
    <Pretable
      rows={rows}
      columns={[
        { id: "role", header: "Role", widthPx: 100 },
        { id: "content", header: "Content", widthPx: 480, wrap: true },
      ]}
      getRowId={(r) => r.id}
    />
  );
}
```

## Recipe — SSE / EventSource

```ts
async function* fromEventSource(url: string): AsyncIterable<Row> {
  const es = new EventSource(url);
  const queue: Row[] = [];
  let resolve!: () => void;
  let pending = new Promise<void>((r) => (resolve = r));

  es.onmessage = (e) => {
    queue.push(JSON.parse(e.data));
    resolve();
    pending = new Promise<void>((r) => (resolve = r));
  };

  while (true) {
    while (queue.length) yield queue.shift()!;
    await pending;
  }
}

connectElementStream(grid, fromEventSource("/api/events"));
```

## When you have raw JSON instead of typed values

If your source yields strings instead of objects (e.g. `Response.body` from `fetch`), pipe through `parseElementStream` first:

```ts
import { parseElementStream } from "@pretable/stream-adapter";

const res = await fetch("/api/events");
connectElementStream(grid, parseElementStream(res.body));
```

See [Parsers →](/docs/streaming/parsers).
