Production · Architecture

LLM Execution Models

Synchronous, async, streaming, and batch execution patterns — designing responsive and cost-efficient LLM applications

4Patterns
7Sections
Python-firstCode Examples
In this guide
  1. Pattern Overview
  2. Synchronous
  3. Async
  4. Streaming
  5. Batch
  6. Request Queuing
  7. Error Handling
01 — Fundamentals

Execution Pattern Overview

LLM applications require different execution patterns depending on latency, throughput, and cost requirements. Synchronous execution is simple but blocks on every call. Async execution multiplexes requests. Streaming delivers tokens incrementally for better UX. Batch processing trades latency for cost — up to 50% cheaper but with SLA tradeoffs. Choose based on your use case.

Pattern Comparison Table

PatternLatencyThroughputCostComplexityBest For
Sync1-3sLow100%LowSimple scripts
Async1-3sHigh100%MediumWeb services
Streaming100ms-300msHigh100%MediumChat apps
Batch24 hoursVery High50%LowBulk processing
💡 Hybrid approach: Use streaming for interactive features, batch for backend processing, async for internal APIs.
02 — Simple Pattern

Synchronous Execution

The simplest pattern: call the LLM API and wait for the response. Suitable for scripts, offline tools, and low-traffic applications. Always include retry logic and timeouts.

from openai import OpenAI import time client = OpenAI(api_key="...") # Simple blocking call response = client.chat.completions.create( model="gpt-4o-mini", messages=[{"role": "user", "content": "Explain AI"}], max_tokens=500, timeout=30.0 # 30 second timeout ) print(response.choices[0].message.content) # Retry with exponential backoff def call_with_retry(prompt, max_retries=3): for attempt in range(max_retries): try: response = client.chat.completions.create( model="gpt-4o-mini", messages=[{"role": "user", "content": prompt}], timeout=30.0 ) return response.choices[0].message.content except Exception as e: if attempt < max_retries - 1: wait = 2 ** attempt print(f"Attempt {attempt+1} failed, retry in {wait}s") time.sleep(wait) else: raise
03 — Concurrent

Async Execution

Non-blocking async pattern using asyncio. Multiplexes many requests, ideal for web services and high concurrency. Use AsyncOpenAI client.

import asyncio from openai import AsyncOpenAI client = AsyncOpenAI(api_key="...") async def call_llm(prompt): response = await client.chat.completions.create( model="gpt-4o-mini", messages=[{"role": "user", "content": prompt}], timeout=30.0 ) return response.choices[0].message.content async def main(): prompts = [ "Explain quantum computing", "What is machine learning?", "Describe neural networks" ] # Concurrent execution tasks = [call_llm(p) for p in prompts] results = await asyncio.gather(*tasks) for p, r in zip(prompts, results): print(f"Q: {p}\nA: {r}\n") asyncio.run(main())

Rate Limiting with Semaphores

# Control concurrency with asyncio.Semaphore async def call_with_limit(prompt, sem): async with sem: return await call_llm(prompt) async def main(): sem = asyncio.Semaphore(5) # Max 5 concurrent prompts = [...100 prompts...] tasks = [call_with_limit(p, sem) for p in prompts] results = await asyncio.gather(*tasks)

FastAPI Integration

from fastapi import FastAPI from openai import AsyncOpenAI app = FastAPI() client = AsyncOpenAI(api_key="...") @app.post("/complete") async def complete(prompt: str): response = await client.chat.completions.create( model="gpt-4o-mini", messages=[{"role": "user", "content": prompt}], timeout=30.0 ) return {"response": response.choices[0].message.content}
04 — Token-by-Token

Streaming Execution

Stream tokens as they arrive instead of waiting for the full response. Dramatically improves perceived latency — users see content appearing in real-time. Essential for chat applications.

from openai import OpenAI from fastapi import FastAPI from fastapi.responses import StreamingResponse client = OpenAI(api_key="...") # Direct streaming with client.chat.completions.create( model="gpt-4o-mini", messages=[{"role": "user", "content": "Explain AI"}], stream=True ) as stream: for text in stream.text_stream: print(text, end="", flush=True) # FastAPI streaming endpoint app = FastAPI() async def generate_content(prompt: str): with client.chat.completions.create( model="gpt-4o-mini", messages=[{"role": "user", "content": prompt}], stream=True ) as stream: for chunk in stream: if chunk.choices[0].delta.content: yield chunk.choices[0].delta.content @app.post("/stream") async def stream_endpoint(prompt: str): return StreamingResponse( generate_content(prompt), media_type="text/event-stream" )
05 — Cost-Optimized

Batch Execution

Submit many requests for processing within 24 hours. Costs ~50% less per token. Ideal for bulk evaluation, annotation, non-urgent analysis.

from openai import OpenAI import json client = OpenAI(api_key="...") # Prepare JSONL batch requests = [] for i in range(10): requests.append({ "custom_id": f"req-{i}", "method": "POST", "url": "/v1/chat/completions", "body": { "model": "gpt-4o-mini", "messages": [{"role": "user", "content": f"Q{i}"}], "max_tokens": 500 } }) with open("batch.jsonl", "w") as f: for req in requests: f.write(json.dumps(req) + "\n") # Upload and submit with open("batch.jsonl", "rb") as f: batch_file = client.files.create(file=f, purpose="batch") batch = client.batches.create( input_file_id=batch_file.id, endpoint="/v1/chat/completions", timeout_minutes=1440 ) print(f"Batch: {batch.id} - Status: {batch.status}")
06 — Managing Load

Request Queuing

Queue requests to prevent overwhelming the API. Use Redis + Celery or simple in-memory queues. Implement priority levels for urgent vs standard requests.

from celery import Celery from openai import OpenAI app = Celery('tasks', broker='redis://localhost') client = OpenAI(api_key="...") @app.task(priority=5) def call_llm(prompt): response = client.chat.completions.create( model="gpt-4o-mini", messages=[{"role": "user", "content": prompt}] ) return response.choices[0].message.content @app.task(priority=10) # Higher priority def urgent_call(prompt): return call_llm(prompt) # Enqueue result = call_llm.delay("standard") urgent = urgent_call.delay("important")
07 — Reliability

Timeout and Error Handling

Handle rate limits, timeouts, network errors, and API errors gracefully. Implement circuit breakers to fail fast and prevent cascading failures.

from openai import RateLimitError, APIError import time def call_with_backoff(prompt, max_retries=5): for attempt in range(max_retries): try: return client.chat.completions.create( model="gpt-4o-mini", messages=[{"role": "user", "content": prompt}], timeout=30.0 ).choices[0].message.content except RateLimitError: # Exponential backoff for rate limits wait = 2 ** attempt + (random.random() * 0.1) print(f"Rate limited, waiting {wait:.1f}s") time.sleep(wait) except APIError as e: print(f"API error: {e}") if attempt < max_retries - 1: time.sleep(2 ** attempt) else: raise except Exception as e: print(f"Unexpected error: {e}") raise
Tools

Execution Pattern Tools

Framework
FastAPI
Modern async web framework for building streaming and async LLM endpoints with minimal boilerplate.
Async
asyncio
Python's built-in async framework. Core for concurrent LLM request handling in web services.
Queue
Celery
Distributed task queue for managing LLM requests with priorities and retry logic.
HTTP
httpx
Async HTTP client. Alternative to aiohttp for making LLM API calls.
Streaming
Server-Sent Events
Protocol for streaming data from server to client. Perfect for token-by-token delivery.
Batching
OpenAI Batch API
Official batch processing API. 50% cost savings with 24-hour SLA.
References

Further Reading

Official Documentation
Practitioner Guides