01 — Economics
Cost Anatomy: Understanding LLM Pricing
LLM pricing is per-token: prompt tokens (input) and completion tokens (output). Completion tokens typically cost 3-4× more than prompt tokens. One thousand tokens ≈ 750 words. Always estimate input size before calling the API.
Pricing Models (March 2026)
| Model | Prompt | Completion | Ratio |
| GPT-4o | $5/1M | $15/1M | 1:3 |
| Claude 3.5 Sonnet | $3/1M | $15/1M | 1:5 |
| Gemini 2.0 | $0.075/1K | $0.3/1K | 1:4 |
| Llama 3.3 (via API) | $0.6/1M | $2.4/1M | 1:4 |
💡
Cost is non-linear: Doubling input size doubles cost. Doubling output size also doubles cost. Context windows are expensive — watch input length carefully.
Real-World Calculations
# Estimate cost before calling API
import tiktoken
enc = tiktoken.encoding_for_model("gpt-4o")
prompt = "Your long prompt here..."
prompt_tokens = len(enc.encode(prompt))
# Max output tokens
max_completion = 500
completion_tokens = max_completion
# Cost calculation
prompt_cost = (prompt_tokens / 1_000_000) * 5 # $5 per 1M
completion_cost = (completion_tokens / 1_000_000) * 15 # $15 per 1M
total_cost = prompt_cost + completion_cost
print(f"Estimated cost: ${total_cost:.6f}")
02 — Budgets
Token Budgeting
Set per-request token limits via max_tokens. Track token consumption in production. Implement alerts when monthly spend exceeds budget. Use dynamic budgets based on task priority or user tier.
# Dynamic token budgeting
def get_max_tokens(user_tier: str, is_urgent: bool) -> int:
base = {
"free": 200,
"pro": 1000,
"enterprise": 4000
}
tokens = base.get(user_tier, 200)
# Urgent requests get more budget
if is_urgent:
tokens = int(tokens * 1.5)
return tokens
# In API call
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=messages,
max_tokens=get_max_tokens(user_tier, is_urgent)
)
# Track usage
tokens_used = response.usage.prompt_tokens + response.usage.completion_tokens
print(f"Tokens: {tokens_used}, Cost: ${tokens_used * 0.000015:.6f}")
Conversation History Trimming
Keep only recent messages in conversation history. Trim older messages to stay within token budgets while preserving context. Use rolling window or summary-based approaches.
# Trim conversation to max tokens
import tiktoken
def trim_messages(messages, max_tokens=2000):
enc = tiktoken.encoding_for_model("gpt-4o")
# Keep system message
result = [m for m in messages if m["role"] == "system"]
# Add recent messages until budget
recent = [m for m in messages if m["role"] != "system"]
recent.reverse()
tokens = sum(len(enc.encode(m["content"])) for m in result)
for msg in recent:
msg_tokens = len(enc.encode(msg["content"]))
if tokens + msg_tokens < max_tokens:
result.insert(1, msg) # Insert after system
tokens += msg_tokens
return result
03 — Repetition
Prompt Caching
OpenAI and Anthropic support prompt caching: repeated prompts are cached and charged at 10-20% the normal rate. Essential for RAG systems and fixed context that repeats across requests.
# OpenAI prompt caching (cache_control)
from openai import OpenAI
client = OpenAI(api_key="...")
documents = """Large context (e.g., entire codebase, book)..."""
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "user",
"content": [
{
"type": "text",
"text": documents,
"cache_control": {"type": "ephemeral"}
},
{
"type": "text",
"text": "Question about the docs: ..."
}
]
}
]
)
# First call caches, subsequent calls reuse
print(response.usage.cache_creation_input_tokens) # First call
print(response.usage.cache_read_input_tokens) # Subsequent calls
Anthropic Prompt Caching
# Anthropic claude-3-5-sonnet with cache_control
from anthropic import Anthropic
client = Anthropic()
system = "You are a helpful assistant."
context = "Large reusable context..."
response = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
system=[
{
"type": "text",
"text": context,
"cache_control": {"type": "ephemeral"}
}
],
messages=[
{"role": "user", "content": "Question: ..."}
]
)
print(f"Cache creation: {response.usage.cache_creation_input_tokens}")
print(f"Cache read: {response.usage.cache_read_input_tokens}")
print(f"Savings: {response.usage.cache_read_input_tokens * 0.9} tokens")
04 — Similarity
Semantic Caching
Cache based on semantic similarity, not exact string matching. If a new query is similar to a cached query, reuse the cached response. Tools like GPTCache and LangChain SemanticCache implement this. Typical savings: 20-40% on repeated similar queries.
# GPTCache semantic caching
from gptcache import cache
from gptcache.adapter import openai
from gptcache.manager import manager
# Initialize cache with similarity threshold
cache.init_sqlalchemy_cache(
cache_store_type="sqlite",
data_dir="./cache",
)
cache.set_search_semantic("gpt-3.5-turbo", threshold=0.8)
# Use cached OpenAI client
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[
{"role": "user", "content": "What is machine learning?"}
]
)
# Similar query uses cached response
response2 = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[
{"role": "user", "content": "Explain machine learning"}
]
)
Cache Invalidation
Set TTL (time-to-live) on cached responses. Refresh when source documents change. Implement versioning to detect stale cached responses.
# Semantic cache with TTL and versioning
import hashlib
def get_cached_response(query, doc_content, cache_ttl=3600):
# Version based on document content
doc_version = hashlib.md5(doc_content.encode()).hexdigest()
cache_key = f"{query}_{doc_version}"
# Check cache
cached = cache.get(cache_key)
if cached and not is_expired(cached["timestamp"], cache_ttl):
return cached["response"]
# Call API and cache
response = client.chat.completions.create(...)
cache.set(cache_key, {
"response": response,
"timestamp": time.time()
})
return response
05 — Optimization
Model Routing by Cost and Quality
Route requests to different models based on cost, latency, and quality requirements. Use cheaper models for simple tasks, expensive models for complex reasoning. Implement quality thresholds and fallbacks.
# Rule-based model routing
def choose_model(task_type: str, quality_threshold: float):
if task_type == "classification":
return "gpt-4o-mini" # Fast, cheap
elif task_type == "summarization":
return "gpt-4o-mini" # Good for summaries
elif task_type == "reasoning":
return "gpt-4o" # Complex reasoning
elif task_type == "coding":
return "gpt-4o" # Advanced code
else:
return "gpt-4o-mini"
# Cost-aware fallback
models = [
("gpt-4o-mini", cost=0.00015),
("gpt-4o", cost=0.005),
]
response = client.chat.completions.create(
model="gpt-4o",
messages=messages,
temperature=0.7
)
# If quality too low, retry with better model
if response.quality_score < 0.7:
response = client.chat.completions.create(
model="gpt-4o",
messages=messages
)
LiteLLM Router
# LiteLLM for model routing
from litellm import Router
router = Router(
model_list=[
{"model_name": "cheap", "litellm_params": {"model": "gpt-3.5-turbo"}},
{"model_name": "smart", "litellm_params": {"model": "gpt-4o"}},
]
)
# Automatically route
response = router.completion(
model="cheap",
messages=[...],
fallback_models=["smart"] # Fallback if cheap fails
)
06 — Reliability
Rate Limit Handling and Backoff
Implement exponential backoff with jitter to handle rate limits gracefully. Respect Retry-After headers. Track remaining quota to avoid hitting limits.
# Exponential backoff with jitter
import time
import random
from openai import RateLimitError
def api_call_with_backoff(messages, max_retries=5):
for attempt in range(max_retries):
try:
response = client.chat.completions.create(
model="gpt-4o",
messages=messages
)
return response
except RateLimitError as e:
if attempt == max_retries - 1:
raise
# Exponential backoff with jitter
wait_time = (2 ** attempt) + random.uniform(0, 1)
print(f"Rate limited. Retrying in {wait_time:.2f}s")
time.sleep(wait_time)
return None
# Track quota
headers_remaining = response.headers.get("x-ratelimit-remaining-requests")
headers_reset = response.headers.get("x-ratelimit-reset-requests")
print(f"Requests remaining: {headers_remaining}")
print(f"Reset at: {headers_reset}")
Token Quota Management
Monitor token-per-minute (TPM) limits. Pre-compute request tokens before dispatching. Implement token-weighted queuing to stay within limits.
# Token quota tracking
class TokenQuotaManager:
def __init__(self, tokens_per_minute=90000):
self.tpm_limit = tokens_per_minute
self.tokens_used = 0
self.window_start = time.time()
def can_send(self, prompt_tokens):
elapsed = time.time() - self.window_start
# Reset window if 60 seconds passed
if elapsed >= 60:
self.tokens_used = 0
self.window_start = time.time()
if self.tokens_used + prompt_tokens > self.tpm_limit:
wait_time = 60 - elapsed
return False, wait_time
self.tokens_used += prompt_tokens
return True, 0
# Usage
quota = TokenQuotaManager()
prompt_tokens = count_tokens(prompt, model="gpt-4o")
can_send, wait = quota.can_send(prompt_tokens)
if not can_send:
time.sleep(wait)
07 — Monitoring
Cost Attribution and Analytics
Track costs per user, feature, and request. Use structured logging to correlate API calls with business outcomes. Implement dashboards for cost visibility and anomaly detection.
# Cost attribution logging
import logging
import json
from datetime import datetime
class CostLogger:
def __init__(self):
self.logger = logging.getLogger("api_costs")
def log_api_call(self, user_id, feature, model, prompt_tokens, completion_tokens, cost):
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"user_id": user_id,
"feature": feature,
"model": model,
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"total_tokens": prompt_tokens + completion_tokens,
"cost_usd": cost,
"cost_per_token": cost / (prompt_tokens + completion_tokens)
}
self.logger.info(json.dumps(log_entry))
cost_logger = CostLogger()
# After API call
prompt_tokens = response.usage.prompt_tokens
completion_tokens = response.usage.completion_tokens
cost = (prompt_tokens * 0.005 + completion_tokens * 0.015) / 1000000
cost_logger.log_api_call(
user_id="user123",
feature="search_summarization",
model="gpt-4o",
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
cost=cost
)
Integration with Langfuse
Use Langfuse for production-grade cost tracking with trace-level visibility. Correlate costs with latency, user actions, and model quality metrics.
# Langfuse cost tracking
from langfuse import Langfuse
langfuse = Langfuse(
public_key="pk_...",
secret_key="sk_..."
)
with langfuse.trace(
name="search_feature",
user_id="user123",
session_id="session_456"
) as trace:
response = client.chat.completions.create(
model="gpt-4o",
messages=messages
)
trace.event(
name="api_call_completed",
input=messages,
output=response.choices[0].message.content,
model=response.model,
usage=response.usage
)
# Langfuse automatically captures costs
# Get cost via dashboard
References
Learn More
OpenAI Pricing
Real-time pricing for all OpenAI models. Compare token costs across GPT-4o, GPT-4o mini, and legacy models.
OpenAI Chat API
Full API reference including cache_control, max_tokens, and response format parameters.
LiteLLM
Open-source library for multi-provider LLM routing with unified interface and cost tracking.
GPTCache
Semantic caching library with vector similarity and TTL-based invalidation strategies.
Langfuse
Production observability platform with cost attribution, trace-level debugging, and performance analytics.
tiktoken
Official tokenizer library for OpenAI models. Accurate token counting for budget estimation.
FastAPI Docs
High-performance Python web framework with async support and automatic API documentation.