01 — Fundamentals
Execution Pattern Overview
LLM applications require different execution patterns depending on latency, throughput, and cost requirements. Synchronous execution is simple but blocks on every call. Async execution multiplexes requests. Streaming delivers tokens incrementally for better UX. Batch processing trades latency for cost — up to 50% cheaper but with SLA tradeoffs. Choose based on your use case.
Pattern Comparison Table
| Pattern | Latency | Throughput | Cost | Complexity | Best For |
| Sync | 1-3s | Low | 100% | Low | Simple scripts |
| Async | 1-3s | High | 100% | Medium | Web services |
| Streaming | 100ms-300ms | High | 100% | Medium | Chat apps |
| Batch | 24 hours | Very High | 50% | Low | Bulk processing |
💡
Hybrid approach: Use streaming for interactive features, batch for backend processing, async for internal APIs.
02 — Simple Pattern
Synchronous Execution
The simplest pattern: call the LLM API and wait for the response. Suitable for scripts, offline tools, and low-traffic applications. Always include retry logic and timeouts.
from openai import OpenAI
import time
client = OpenAI(api_key="...")
# Simple blocking call
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": "Explain AI"}],
max_tokens=500,
timeout=30.0 # 30 second timeout
)
print(response.choices[0].message.content)
# Retry with exponential backoff
def call_with_retry(prompt, max_retries=3):
for attempt in range(max_retries):
try:
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
timeout=30.0
)
return response.choices[0].message.content
except Exception as e:
if attempt < max_retries - 1:
wait = 2 ** attempt
print(f"Attempt {attempt+1} failed, retry in {wait}s")
time.sleep(wait)
else:
raise
03 — Concurrent
Async Execution
Non-blocking async pattern using asyncio. Multiplexes many requests, ideal for web services and high concurrency. Use AsyncOpenAI client.
import asyncio
from openai import AsyncOpenAI
client = AsyncOpenAI(api_key="...")
async def call_llm(prompt):
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
timeout=30.0
)
return response.choices[0].message.content
async def main():
prompts = [
"Explain quantum computing",
"What is machine learning?",
"Describe neural networks"
]
# Concurrent execution
tasks = [call_llm(p) for p in prompts]
results = await asyncio.gather(*tasks)
for p, r in zip(prompts, results):
print(f"Q: {p}\nA: {r}\n")
asyncio.run(main())
Rate Limiting with Semaphores
# Control concurrency with asyncio.Semaphore
async def call_with_limit(prompt, sem):
async with sem:
return await call_llm(prompt)
async def main():
sem = asyncio.Semaphore(5) # Max 5 concurrent
prompts = [...100 prompts...]
tasks = [call_with_limit(p, sem) for p in prompts]
results = await asyncio.gather(*tasks)
FastAPI Integration
from fastapi import FastAPI
from openai import AsyncOpenAI
app = FastAPI()
client = AsyncOpenAI(api_key="...")
@app.post("/complete")
async def complete(prompt: str):
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
timeout=30.0
)
return {"response": response.choices[0].message.content}
04 — Token-by-Token
Streaming Execution
Stream tokens as they arrive instead of waiting for the full response. Dramatically improves perceived latency — users see content appearing in real-time. Essential for chat applications.
from openai import OpenAI
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
client = OpenAI(api_key="...")
# Direct streaming
with client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": "Explain AI"}],
stream=True
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
# FastAPI streaming endpoint
app = FastAPI()
async def generate_content(prompt: str):
with client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
stream=True
) as stream:
for chunk in stream:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
@app.post("/stream")
async def stream_endpoint(prompt: str):
return StreamingResponse(
generate_content(prompt),
media_type="text/event-stream"
)
05 — Cost-Optimized
Batch Execution
Submit many requests for processing within 24 hours. Costs ~50% less per token. Ideal for bulk evaluation, annotation, non-urgent analysis.
from openai import OpenAI
import json
client = OpenAI(api_key="...")
# Prepare JSONL batch
requests = []
for i in range(10):
requests.append({
"custom_id": f"req-{i}",
"method": "POST",
"url": "/v1/chat/completions",
"body": {
"model": "gpt-4o-mini",
"messages": [{"role": "user", "content": f"Q{i}"}],
"max_tokens": 500
}
})
with open("batch.jsonl", "w") as f:
for req in requests:
f.write(json.dumps(req) + "\n")
# Upload and submit
with open("batch.jsonl", "rb") as f:
batch_file = client.files.create(file=f, purpose="batch")
batch = client.batches.create(
input_file_id=batch_file.id,
endpoint="/v1/chat/completions",
timeout_minutes=1440
)
print(f"Batch: {batch.id} - Status: {batch.status}")
06 — Managing Load
Request Queuing
Queue requests to prevent overwhelming the API. Use Redis + Celery or simple in-memory queues. Implement priority levels for urgent vs standard requests.
from celery import Celery
from openai import OpenAI
app = Celery('tasks', broker='redis://localhost')
client = OpenAI(api_key="...")
@app.task(priority=5)
def call_llm(prompt):
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
@app.task(priority=10) # Higher priority
def urgent_call(prompt):
return call_llm(prompt)
# Enqueue
result = call_llm.delay("standard")
urgent = urgent_call.delay("important")
07 — Reliability
Timeout and Error Handling
Handle rate limits, timeouts, network errors, and API errors gracefully. Implement circuit breakers to fail fast and prevent cascading failures.
from openai import RateLimitError, APIError
import time
def call_with_backoff(prompt, max_retries=5):
for attempt in range(max_retries):
try:
return client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
timeout=30.0
).choices[0].message.content
except RateLimitError:
# Exponential backoff for rate limits
wait = 2 ** attempt + (random.random() * 0.1)
print(f"Rate limited, waiting {wait:.1f}s")
time.sleep(wait)
except APIError as e:
print(f"API error: {e}")
if attempt < max_retries - 1:
time.sleep(2 ** attempt)
else:
raise
except Exception as e:
print(f"Unexpected error: {e}")
raise
References
Further Reading
Official Documentation
Practitioner Guides