Production & Infra

LLM Streaming

Deliver LLM responses token-by-token for responsive UIs. SSE, async iterators, and real-time client updates.

SSE
Protocol
Token-by-Token
Delivery
~10×
Perceived Latency Improvement

Table of Contents

SECTION 01

Why Streaming Matters

LLMs are slow. A typical request takes 2-5 seconds (TTFB) to return the first token, then 30-60+ seconds more to complete a full response. Users hate waiting. Streaming fixes perceived latency by showing progress.

TTFB vs TTLT vs TTC

With Streaming vs Without

Streaming doesn't reduce absolute latency, but it reduces perceived latency by ~10×. Users feel the response immediately.

When NOT to Stream

Core Insight: Streaming doesn't make LLMs faster—it makes users feel like they're faster. The moment a user sees tokens appearing, they stop watching the clock and start reading.
SECTION 02

Server-Sent Events (SSE)

SSE is the HTTP standard for one-directional server-to-client streams over HTTP/1.1. Simple, no WebSocket complexity, built-in browser support.

SSE Format

Plain HTTP response with newline-delimited JSON objects:

HTTP/1.1 200 OK Content-Type: text/event-stream Cache-Control: no-cache Connection: keep-alive data: {"content":"Hello"} data: {"content":" there"} data: {"content":"."} data: [DONE]

Anatomy of SSE Events

OpenAI Streaming Format

data: {"choices":[{"delta":{"content":"Hello"}}]} data: {"choices":[{"delta":{"content":" world"}}]} data: [DONE] # Notes: # 1. Each line starts with "data: " # 2. Delta contains only the new content token # 3. Last event is [DONE] sentinel (required for client parsers)

Connection Management

SSE auto-reconnects on disconnect (browser native):

eventSource = new EventSource("/stream") eventSource.onmessage = (event) => { const data = JSON.parse(event.data) console.log(data.content) } eventSource.onerror = () => { console.log("Connection lost, retrying...") // Browser automatically retries (configurable) } // Manual close eventSource.close()
Why SSE for LLMs: One-way (server → client) is perfect for LLM streams. No bidirectional messaging needed. Browser EventSource API has built-in reconnect. Far simpler than WebSocket.
SECTION 03

Streaming with OpenAI SDK

OpenAI SDK supports streaming with a simple flag and async iterators.

Synchronous Streaming

from openai import OpenAI client = OpenAI(api_key="sk-...") with client.chat.completions.create( model="gpt-4", messages=[{"role": "user", "content": "Write a poem"}], stream=True ) as stream: for chunk in stream: if chunk.choices[0].delta.content: print(chunk.choices[0].delta.content, end="", flush=True)

Async Streaming

import asyncio from openai import AsyncOpenAI async def stream_poem(): client = AsyncOpenAI(api_key="sk-...") stream = await client.chat.completions.create( model="gpt-4", messages=[{"role": "user", "content": "Poem"}], stream=True ) async for chunk in stream: if chunk.choices[0].delta.content: print(chunk.choices[0].delta.content, end="") asyncio.run(stream_poem())

Getting Full Response While Streaming

OpenAI SDK provides a convenience method to accumulate the full response:

from openai import OpenAI client = OpenAI() with client.chat.completions.create( model="gpt-4", messages=[{"role": "user", "content": "Hello"}], stream=True ) as stream: full_response = stream.until_done() print(f"Full text: {full_response.choices[0].message.content}") print(f"Usage: {full_response.usage}")

With FastAPI Response Streaming

from fastapi import FastAPI from fastapi.responses import StreamingResponse from openai import OpenAI app = FastAPI() client = OpenAI() @app.get("/stream") async def stream_endpoint(query: str): def generate(): stream = client.chat.completions.create( model="gpt-4", messages=[{"role": "user", "content": query}], stream=True ) for chunk in stream: if chunk.choices[0].delta.content: yield f"data: {chunk.choices[0].delta.content}\n\n" yield "data: [DONE]\n\n" return StreamingResponse(generate(), media_type="text/event-stream")
Chunk Structure: Each chunk has `.choices[0].delta` which is minimal—only includes the new content token. Use `if chunk.choices[0].delta.content:` to check for actual text (ignores None deltas).
SECTION 04

Streaming with Anthropic SDK

Anthropic's streaming uses a context manager pattern and is similarly simple to OpenAI.

Basic Streaming

from anthropic import Anthropic client = Anthropic(api_key="sk-ant-...") with client.messages.stream( model="claude-3-5-sonnet-20241022", max_tokens=1024, messages=[ {"role": "user", "content": "Write a short story"} ] ) as stream: for text in stream.text_stream: print(text, end="", flush=True)

Async Streaming

import asyncio from anthropic import AsyncAnthropic async def stream_with_anthropic(): client = AsyncAnthropic(api_key="sk-ant-...") with client.messages.stream( model="claude-3-5-sonnet-20241022", max_tokens=1024, messages=[ {"role": "user", "content": "Story"} ] ) as stream: async for text in stream.text_stream: print(text, end="") asyncio.run(stream_with_anthropic())

Accessing Final Message While Streaming

from anthropic import Anthropic client = Anthropic() with client.messages.stream( model="claude-3-5-sonnet-20241022", max_tokens=1024, messages=[...] ) as stream: full_message = stream.get_final_message() print(f"Text: {full_message.content[0].text}") print(f"Stop reason: {full_message.stop_reason}") print(f"Usage: {full_message.usage}")

With FastAPI

from fastapi import FastAPI from fastapi.responses import StreamingResponse from anthropic import Anthropic app = FastAPI() client = Anthropic() @app.get("/stream") async def stream_endpoint(query: str): def generate(): with client.messages.stream( model="claude-3-5-sonnet-20241022", max_tokens=1024, messages=[{"role": "user", "content": query}] ) as stream: for text in stream.text_stream: yield f"data: {text}\n\n" yield "data: [DONE]\n\n" return StreamingResponse(generate(), media_type="text/event-stream")
Anthropic vs OpenAI API: Anthropic's `.text_stream` directly yields text chunks (simpler than OpenAI's `.delta.content` extraction). Both are equally valid; pick whichever feels more natural.
SECTION 05

Streaming in FastAPI

FastAPI makes it trivial to serve SSE streams to browsers with StreamingResponse.

SSE Endpoint Template

from fastapi import FastAPI from fastapi.responses import StreamingResponse from openai import OpenAI app = FastAPI() client = OpenAI() @app.get("/stream-chat") async def stream_chat(query: str): def generate(): """Generator that yields SSE-formatted chunks.""" stream = client.chat.completions.create( model="gpt-4", messages=[{"role": "user", "content": query}], stream=True ) for chunk in stream: if chunk.choices[0].delta.content: # Yield in SSE format: "data: {json}\n\n" content = chunk.choices[0].delta.content yield f"data: {content}\n\n" # Signal completion yield "data: [DONE]\n\n" return StreamingResponse( generate(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "X-Accel-Buffering": "no" # Nginx: don't buffer } )

CORS Headers for Browser Clients

from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=["*"], # Or specific domains allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.get("/stream-chat") async def stream_chat(query: str): # ... streaming logic

JSON-formatted Streams

If you want to send structured data alongside text:

@app.get("/stream-with-meta") async def stream_with_metadata(query: str): def generate(): stream = client.chat.completions.create( model="gpt-4", messages=[{"role": "user", "content": query}], stream=True ) token_count = 0 for chunk in stream: if chunk.choices[0].delta.content: token_count += 1 event_data = { "type": "token", "content": chunk.choices[0].delta.content, "token_num": token_count } import json yield f"data: {json.dumps(event_data)}\n\n" yield "data: [DONE]\n\n" return StreamingResponse(generate(), media_type="text/event-stream")
Production Tip: Add `X-Accel-Buffering: no` header to prevent reverse proxies (Nginx, CloudFlare) from buffering SSE streams. Without this, users see large chunks at once instead of smooth token-by-token delivery.
SECTION 06

Tool Call Streaming

LLMs can be interrupted mid-token-generation if they decide to call a tool. Streaming tool calls requires special handling.

The Challenge

When an LLM calls a tool, it returns JSON with tool name and arguments. But during streaming, the JSON arrives incomplete: `{"name": "get_weather"` → `{"name": "get_weather", "arguments": "{"location":"` → `... full JSON`.

Streaming Agent with Tool Calls

from openai import OpenAI client = OpenAI() tools = [...] # Your tool definitions stream = client.chat.completions.create( model="gpt-4", messages=[{"role": "user", "content": "What's the weather?"}], tools=tools, stream=True ) tool_calls = [] current_tool_call = None for chunk in stream: delta = chunk.choices[0].delta if delta.tool_calls: # New tool call starting current_tool_call = delta.tool_calls[0] tool_calls.append(current_tool_call) elif delta.content: # Regular text response print(delta.content, end="", flush=True) # After streaming, execute all tool calls for tool_call in tool_calls: tool_name = tool_call.function.name arguments = json.loads(tool_call.function.arguments) result = execute_tool(tool_name, arguments)

Accumulating Partial JSON Arguments

Tool argument streaming arrives in pieces. Accumulate until complete:

import json tool_call_args = "" for chunk in stream: delta = chunk.choices[0].delta if hasattr(delta, "tool_calls") and delta.tool_calls: tool_call = delta.tool_calls[0] if hasattr(tool_call.function, "arguments"): tool_call_args += tool_call.function.arguments # Try to parse (in case we have enough) try: parsed = json.loads(tool_call_args) print(f"Tool ready: {parsed}") except json.JSONDecodeError: # Still incomplete, wait for more pass
Tool Streaming Complexity: Unlike text streaming (where each chunk is instantly readable), tool call streaming requires buffering and JSON parsing. For better UX, consider not streaming tool calls—wait for the full response, then execute. Stream only for text responses.
SECTION 07

Client-Side Implementation

Browser-side JavaScript to consume SSE streams and update UI in real-time.

Fetch with ReadableStream (Modern Approach)

async function streamResponse(query) { const response = await fetch(`/stream-chat?query=${query}`); const reader = response.body.getReader(); const decoder = new TextDecoder(); while (true) { const { done, value } = await reader.read(); if (done) break; const chunk = decoder.decode(value); const lines = chunk.split("\n"); for (const line of lines) { if (line.startsWith("data: ")) { const data = line.slice(6); // Remove "data: " if (data === "[DONE]") { console.log("Stream complete"); return; } // Update UI with token const element = document.getElementById("response"); element.textContent += data; } } } } // Usage document.getElementById("send-btn").addEventListener("click", () => { const query = document.getElementById("input").value; streamResponse(query); });

EventSource API (Simpler)

function streamWithEventSource(query) { const eventSource = new EventSource(`/stream-chat?query=${query}`); const element = document.getElementById("response"); element.textContent = ""; eventSource.onmessage = (event) => { if (event.data === "[DONE]") { eventSource.close(); return; } element.textContent += event.data; }; eventSource.onerror = () => { console.error("Stream error"); eventSource.close(); }; }

React Hook for Streaming

function useStreamChat() { const [response, setResponse] = useState(""); const [loading, setLoading] = useState(false); const stream = async (query) => { setLoading(true); setResponse(""); const eventSource = new EventSource( `/stream-chat?query=${encodeURIComponent(query)}` ); eventSource.onmessage = (event) => { if (event.data === "[DONE]") { eventSource.close(); setLoading(false); return; } setResponse(prev => prev + event.data); }; eventSource.onerror = () => { eventSource.close(); setLoading(false); }; }; return { response, loading, stream }; } // Usage function ChatApp() { const { response, loading, stream } = useStreamChat(); return (

{response}

); }

Error Handling & Reconnect

function robustStream(query, maxRetries = 3) { let retries = 0; function connect() { const eventSource = new EventSource(`/stream?query=${query}`); eventSource.onmessage = (event) => { if (event.data === "[DONE]") { eventSource.close(); retries = 0; // Reset on success } else { updateUI(event.data); } }; eventSource.onerror = () => { eventSource.close(); if (retries < maxRetries) { retries++; console.log(`Retry ${retries}/${maxRetries}`); setTimeout(connect, 1000 * retries); // Exponential backoff } else { console.error("Stream failed after retries"); } }; } connect(); }
EventSource vs Fetch: EventSource is simpler and has built-in reconnect logic. Use it for basic streaming. Use Fetch with ReadableStream when you need more control (custom headers, request body, binary data).
SECTION 08

Streaming Architecture Patterns

Choosing the right streaming architecture depends on your latency requirements, client diversity, and infrastructure constraints. The table below compares the four main patterns.

PatternTransportTTFT ImpactBest ForLimitation
Direct SSEHTTP/1.1 SSELowest (~100ms)Web browsers, simple APIsNo multiplexing; one connection per stream
WebSocketFull-duplex TCPLowInteractive chat, real-time toolsStateful — harder to load-balance
gRPC streamingHTTP/2LowService-to-service, mobileRequires gRPC client; browser support via proxy
Buffered proxyAnyHighest (full latency)Caching, rate-limiting, loggingDefeats purpose of streaming; only use for middleware

For public-facing APIs, SSE is the default choice — it works over plain HTTP, survives proxies and CDNs, and needs no special client code. For mobile apps where battery and connection stability matter, a WebSocket with heartbeats is more reliable. Avoid buffering proxies in the hot path; if you need to log completions, write to a queue asynchronously after forwarding each chunk, not before.