LLMs are slow. A typical request takes 2-5 seconds (TTFB) to return the first token, then 30-60+ seconds more to complete a full response. Users hate waiting. Streaming fixes perceived latency by showing progress.
TTFB vs TTLT vs TTC
TTFB (Time to First Byte): Latency until first token arrives. ~2s for GPT-4. Without streaming, user sees blank screen until TTLT completes.
TTLT (Time to Last Token): Total time for full response. ~45s for a 500-token response at 11 tokens/sec.
TTC (Time to Comprehension): How long until user understands the answer. Streaming dramatically reduces this because users start reading at TTFB, not TTLT.
With Streaming vs Without
Without streaming: Wait 2s (TTFB) → still waiting → 45s (TTLT) → finally see full response
With streaming: Wait 2s (TTFB) → see first token → read while tokens arrive → 45s (TTLT)
Streaming doesn't reduce absolute latency, but it reduces perceived latency by ~10×. Users feel the response immediately.
When NOT to Stream
Batch processing (no user watching): Stream adds overhead without benefit
Very short responses: Streaming overhead > latency saved
Structured outputs only (JSON): Stream while generating, parse when complete
Tool calls that need full output: Can't call tools on partial JSON
Core Insight: Streaming doesn't make LLMs faster—it makes users feel like they're faster. The moment a user sees tokens appearing, they stop watching the clock and start reading.
SECTION 02
Server-Sent Events (SSE)
SSE is the HTTP standard for one-directional server-to-client streams over HTTP/1.1. Simple, no WebSocket complexity, built-in browser support.
SSE Format
Plain HTTP response with newline-delimited JSON objects:
retry: Optional. Milliseconds to wait before reconnect
OpenAI Streaming Format
data: {"choices":[{"delta":{"content":"Hello"}}]}
data: {"choices":[{"delta":{"content":" world"}}]}
data: [DONE]
# Notes:
# 1. Each line starts with "data: "
# 2. Delta contains only the new content token
# 3. Last event is [DONE] sentinel (required for client parsers)
Connection Management
SSE auto-reconnects on disconnect (browser native):
Why SSE for LLMs: One-way (server → client) is perfect for LLM streams. No bidirectional messaging needed. Browser EventSource API has built-in reconnect. Far simpler than WebSocket.
SECTION 03
Streaming with OpenAI SDK
OpenAI SDK supports streaming with a simple flag and async iterators.
Synchronous Streaming
from openai import OpenAI
client = OpenAI(api_key="sk-...")
with client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "Write a poem"}],
stream=True
) as stream:
for chunk in stream:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)
Async Streaming
import asyncio
from openai import AsyncOpenAI
async def stream_poem():
client = AsyncOpenAI(api_key="sk-...")
stream = await client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "Poem"}],
stream=True
)
async for chunk in stream:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="")
asyncio.run(stream_poem())
Getting Full Response While Streaming
OpenAI SDK provides a convenience method to accumulate the full response:
from openai import OpenAI
client = OpenAI()
with client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "Hello"}],
stream=True
) as stream:
full_response = stream.until_done()
print(f"Full text: {full_response.choices[0].message.content}")
print(f"Usage: {full_response.usage}")
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import OpenAI
app = FastAPI()
client = OpenAI()
@app.get("/stream")
async def stream_endpoint(query: str):
def generate():
stream = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": query}],
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content:
yield f"data: {chunk.choices[0].delta.content}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
Chunk Structure: Each chunk has `.choices[0].delta` which is minimal—only includes the new content token. Use `if chunk.choices[0].delta.content:` to check for actual text (ignores None deltas).
SECTION 04
Streaming with Anthropic SDK
Anthropic's streaming uses a context manager pattern and is similarly simple to OpenAI.
Basic Streaming
from anthropic import Anthropic
client = Anthropic(api_key="sk-ant-...")
with client.messages.stream(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=[
{"role": "user", "content": "Write a short story"}
]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
Async Streaming
import asyncio
from anthropic import AsyncAnthropic
async def stream_with_anthropic():
client = AsyncAnthropic(api_key="sk-ant-...")
with client.messages.stream(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=[
{"role": "user", "content": "Story"}
]
) as stream:
async for text in stream.text_stream:
print(text, end="")
asyncio.run(stream_with_anthropic())
Accessing Final Message While Streaming
from anthropic import Anthropic
client = Anthropic()
with client.messages.stream(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=[...]
) as stream:
full_message = stream.get_final_message()
print(f"Text: {full_message.content[0].text}")
print(f"Stop reason: {full_message.stop_reason}")
print(f"Usage: {full_message.usage}")
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from anthropic import Anthropic
app = FastAPI()
client = Anthropic()
@app.get("/stream")
async def stream_endpoint(query: str):
def generate():
with client.messages.stream(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=[{"role": "user", "content": query}]
) as stream:
for text in stream.text_stream:
yield f"data: {text}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
Anthropic vs OpenAI API: Anthropic's `.text_stream` directly yields text chunks (simpler than OpenAI's `.delta.content` extraction). Both are equally valid; pick whichever feels more natural.
SECTION 05
Streaming in FastAPI
FastAPI makes it trivial to serve SSE streams to browsers with StreamingResponse.
SSE Endpoint Template
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import OpenAI
app = FastAPI()
client = OpenAI()
@app.get("/stream-chat")
async def stream_chat(query: str):
def generate():
"""Generator that yields SSE-formatted chunks."""
stream = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": query}],
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content:
# Yield in SSE format: "data: {json}\n\n"
content = chunk.choices[0].delta.content
yield f"data: {content}\n\n"
# Signal completion
yield "data: [DONE]\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no" # Nginx: don't buffer
}
)
CORS Headers for Browser Clients
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Or specific domains
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/stream-chat")
async def stream_chat(query: str):
# ... streaming logic
JSON-formatted Streams
If you want to send structured data alongside text:
Production Tip: Add `X-Accel-Buffering: no` header to prevent reverse proxies (Nginx, CloudFlare) from buffering SSE streams. Without this, users see large chunks at once instead of smooth token-by-token delivery.
SECTION 06
Tool Call Streaming
LLMs can be interrupted mid-token-generation if they decide to call a tool. Streaming tool calls requires special handling.
The Challenge
When an LLM calls a tool, it returns JSON with tool name and arguments. But during streaming, the JSON arrives incomplete: `{"name": "get_weather"` → `{"name": "get_weather", "arguments": "{"location":"` → `... full JSON`.
Streaming Agent with Tool Calls
from openai import OpenAI
client = OpenAI()
tools = [...] # Your tool definitions
stream = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "What's the weather?"}],
tools=tools,
stream=True
)
tool_calls = []
current_tool_call = None
for chunk in stream:
delta = chunk.choices[0].delta
if delta.tool_calls:
# New tool call starting
current_tool_call = delta.tool_calls[0]
tool_calls.append(current_tool_call)
elif delta.content:
# Regular text response
print(delta.content, end="", flush=True)
# After streaming, execute all tool calls
for tool_call in tool_calls:
tool_name = tool_call.function.name
arguments = json.loads(tool_call.function.arguments)
result = execute_tool(tool_name, arguments)
Accumulating Partial JSON Arguments
Tool argument streaming arrives in pieces. Accumulate until complete:
import json
tool_call_args = ""
for chunk in stream:
delta = chunk.choices[0].delta
if hasattr(delta, "tool_calls") and delta.tool_calls:
tool_call = delta.tool_calls[0]
if hasattr(tool_call.function, "arguments"):
tool_call_args += tool_call.function.arguments
# Try to parse (in case we have enough)
try:
parsed = json.loads(tool_call_args)
print(f"Tool ready: {parsed}")
except json.JSONDecodeError:
# Still incomplete, wait for more
pass
Tool Streaming Complexity: Unlike text streaming (where each chunk is instantly readable), tool call streaming requires buffering and JSON parsing. For better UX, consider not streaming tool calls—wait for the full response, then execute. Stream only for text responses.
SECTION 07
Client-Side Implementation
Browser-side JavaScript to consume SSE streams and update UI in real-time.
Fetch with ReadableStream (Modern Approach)
async function streamResponse(query) {
const response = await fetch(`/stream-chat?query=${query}`);
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split("\n");
for (const line of lines) {
if (line.startsWith("data: ")) {
const data = line.slice(6); // Remove "data: "
if (data === "[DONE]") {
console.log("Stream complete");
return;
}
// Update UI with token
const element = document.getElementById("response");
element.textContent += data;
}
}
}
}
// Usage
document.getElementById("send-btn").addEventListener("click", () => {
const query = document.getElementById("input").value;
streamResponse(query);
});
function robustStream(query, maxRetries = 3) {
let retries = 0;
function connect() {
const eventSource = new EventSource(`/stream?query=${query}`);
eventSource.onmessage = (event) => {
if (event.data === "[DONE]") {
eventSource.close();
retries = 0; // Reset on success
} else {
updateUI(event.data);
}
};
eventSource.onerror = () => {
eventSource.close();
if (retries < maxRetries) {
retries++;
console.log(`Retry ${retries}/${maxRetries}`);
setTimeout(connect, 1000 * retries); // Exponential backoff
} else {
console.error("Stream failed after retries");
}
};
}
connect();
}
EventSource vs Fetch: EventSource is simpler and has built-in reconnect logic. Use it for basic streaming. Use Fetch with ReadableStream when you need more control (custom headers, request body, binary data).
SECTION 08
Streaming Architecture Patterns
Choosing the right streaming architecture depends on your latency requirements, client diversity, and infrastructure constraints. The table below compares the four main patterns.
Pattern
Transport
TTFT Impact
Best For
Limitation
Direct SSE
HTTP/1.1 SSE
Lowest (~100ms)
Web browsers, simple APIs
No multiplexing; one connection per stream
WebSocket
Full-duplex TCP
Low
Interactive chat, real-time tools
Stateful — harder to load-balance
gRPC streaming
HTTP/2
Low
Service-to-service, mobile
Requires gRPC client; browser support via proxy
Buffered proxy
Any
Highest (full latency)
Caching, rate-limiting, logging
Defeats purpose of streaming; only use for middleware
For public-facing APIs, SSE is the default choice — it works over plain HTTP, survives proxies and CDNs, and needs no special client code. For mobile apps where battery and connection stability matter, a WebSocket with heartbeats is more reliable. Avoid buffering proxies in the hot path; if you need to log completions, write to a queue asynchronously after forwarding each chunk, not before.