Production · Cost

LLM Traffic & Cost Management

Token budgets, rate limits, caching, model routing, and cost attribution — keeping LLM costs predictable at scale

5Levers
7Sections
Python-firstCode
In this guide
  1. Cost Anatomy
  2. Token Budgets
  3. Prompt Caching
  4. Semantic Caching
  5. Model Routing
  6. Rate Limits
  7. Cost Attribution
01 — Economics

Cost Anatomy: Understanding LLM Pricing

LLM pricing is per-token: prompt tokens (input) and completion tokens (output). Completion tokens typically cost 3-4× more than prompt tokens. One thousand tokens ≈ 750 words. Always estimate input size before calling the API.

Pricing Models (March 2026)

ModelPromptCompletionRatio
GPT-4o$5/1M$15/1M1:3
Claude 3.5 Sonnet$3/1M$15/1M1:5
Gemini 2.0$0.075/1K$0.3/1K1:4
Llama 3.3 (via API)$0.6/1M$2.4/1M1:4
💡 Cost is non-linear: Doubling input size doubles cost. Doubling output size also doubles cost. Context windows are expensive — watch input length carefully.

Real-World Calculations

# Estimate cost before calling API import tiktoken enc = tiktoken.encoding_for_model("gpt-4o") prompt = "Your long prompt here..." prompt_tokens = len(enc.encode(prompt)) # Max output tokens max_completion = 500 completion_tokens = max_completion # Cost calculation prompt_cost = (prompt_tokens / 1_000_000) * 5 # $5 per 1M completion_cost = (completion_tokens / 1_000_000) * 15 # $15 per 1M total_cost = prompt_cost + completion_cost print(f"Estimated cost: ${total_cost:.6f}")
02 — Budgets

Token Budgeting

Set per-request token limits via max_tokens. Track token consumption in production. Implement alerts when monthly spend exceeds budget. Use dynamic budgets based on task priority or user tier.

# Dynamic token budgeting def get_max_tokens(user_tier: str, is_urgent: bool) -> int: base = { "free": 200, "pro": 1000, "enterprise": 4000 } tokens = base.get(user_tier, 200) # Urgent requests get more budget if is_urgent: tokens = int(tokens * 1.5) return tokens # In API call response = client.chat.completions.create( model="gpt-4o-mini", messages=messages, max_tokens=get_max_tokens(user_tier, is_urgent) ) # Track usage tokens_used = response.usage.prompt_tokens + response.usage.completion_tokens print(f"Tokens: {tokens_used}, Cost: ${tokens_used * 0.000015:.6f}")

Conversation History Trimming

Keep only recent messages in conversation history. Trim older messages to stay within token budgets while preserving context. Use rolling window or summary-based approaches.

# Trim conversation to max tokens import tiktoken def trim_messages(messages, max_tokens=2000): enc = tiktoken.encoding_for_model("gpt-4o") # Keep system message result = [m for m in messages if m["role"] == "system"] # Add recent messages until budget recent = [m for m in messages if m["role"] != "system"] recent.reverse() tokens = sum(len(enc.encode(m["content"])) for m in result) for msg in recent: msg_tokens = len(enc.encode(msg["content"])) if tokens + msg_tokens < max_tokens: result.insert(1, msg) # Insert after system tokens += msg_tokens return result
03 — Repetition

Prompt Caching

OpenAI and Anthropic support prompt caching: repeated prompts are cached and charged at 10-20% the normal rate. Essential for RAG systems and fixed context that repeats across requests.

# OpenAI prompt caching (cache_control) from openai import OpenAI client = OpenAI(api_key="...") documents = """Large context (e.g., entire codebase, book)...""" response = client.chat.completions.create( model="gpt-4o", messages=[ { "role": "user", "content": [ { "type": "text", "text": documents, "cache_control": {"type": "ephemeral"} }, { "type": "text", "text": "Question about the docs: ..." } ] } ] ) # First call caches, subsequent calls reuse print(response.usage.cache_creation_input_tokens) # First call print(response.usage.cache_read_input_tokens) # Subsequent calls

Anthropic Prompt Caching

# Anthropic claude-3-5-sonnet with cache_control from anthropic import Anthropic client = Anthropic() system = "You are a helpful assistant." context = "Large reusable context..." response = client.messages.create( model="claude-3-5-sonnet-20241022", max_tokens=1024, system=[ { "type": "text", "text": context, "cache_control": {"type": "ephemeral"} } ], messages=[ {"role": "user", "content": "Question: ..."} ] ) print(f"Cache creation: {response.usage.cache_creation_input_tokens}") print(f"Cache read: {response.usage.cache_read_input_tokens}") print(f"Savings: {response.usage.cache_read_input_tokens * 0.9} tokens")
04 — Similarity

Semantic Caching

Cache based on semantic similarity, not exact string matching. If a new query is similar to a cached query, reuse the cached response. Tools like GPTCache and LangChain SemanticCache implement this. Typical savings: 20-40% on repeated similar queries.

# GPTCache semantic caching from gptcache import cache from gptcache.adapter import openai from gptcache.manager import manager # Initialize cache with similarity threshold cache.init_sqlalchemy_cache( cache_store_type="sqlite", data_dir="./cache", ) cache.set_search_semantic("gpt-3.5-turbo", threshold=0.8) # Use cached OpenAI client response = openai.ChatCompletion.create( model="gpt-3.5-turbo", messages=[ {"role": "user", "content": "What is machine learning?"} ] ) # Similar query uses cached response response2 = openai.ChatCompletion.create( model="gpt-3.5-turbo", messages=[ {"role": "user", "content": "Explain machine learning"} ] )

Cache Invalidation

Set TTL (time-to-live) on cached responses. Refresh when source documents change. Implement versioning to detect stale cached responses.

# Semantic cache with TTL and versioning import hashlib def get_cached_response(query, doc_content, cache_ttl=3600): # Version based on document content doc_version = hashlib.md5(doc_content.encode()).hexdigest() cache_key = f"{query}_{doc_version}" # Check cache cached = cache.get(cache_key) if cached and not is_expired(cached["timestamp"], cache_ttl): return cached["response"] # Call API and cache response = client.chat.completions.create(...) cache.set(cache_key, { "response": response, "timestamp": time.time() }) return response
05 — Optimization

Model Routing by Cost and Quality

Route requests to different models based on cost, latency, and quality requirements. Use cheaper models for simple tasks, expensive models for complex reasoning. Implement quality thresholds and fallbacks.

# Rule-based model routing def choose_model(task_type: str, quality_threshold: float): if task_type == "classification": return "gpt-4o-mini" # Fast, cheap elif task_type == "summarization": return "gpt-4o-mini" # Good for summaries elif task_type == "reasoning": return "gpt-4o" # Complex reasoning elif task_type == "coding": return "gpt-4o" # Advanced code else: return "gpt-4o-mini" # Cost-aware fallback models = [ ("gpt-4o-mini", cost=0.00015), ("gpt-4o", cost=0.005), ] response = client.chat.completions.create( model="gpt-4o", messages=messages, temperature=0.7 ) # If quality too low, retry with better model if response.quality_score < 0.7: response = client.chat.completions.create( model="gpt-4o", messages=messages )

LiteLLM Router

# LiteLLM for model routing from litellm import Router router = Router( model_list=[ {"model_name": "cheap", "litellm_params": {"model": "gpt-3.5-turbo"}}, {"model_name": "smart", "litellm_params": {"model": "gpt-4o"}}, ] ) # Automatically route response = router.completion( model="cheap", messages=[...], fallback_models=["smart"] # Fallback if cheap fails )
06 — Reliability

Rate Limit Handling and Backoff

Implement exponential backoff with jitter to handle rate limits gracefully. Respect Retry-After headers. Track remaining quota to avoid hitting limits.

# Exponential backoff with jitter import time import random from openai import RateLimitError def api_call_with_backoff(messages, max_retries=5): for attempt in range(max_retries): try: response = client.chat.completions.create( model="gpt-4o", messages=messages ) return response except RateLimitError as e: if attempt == max_retries - 1: raise # Exponential backoff with jitter wait_time = (2 ** attempt) + random.uniform(0, 1) print(f"Rate limited. Retrying in {wait_time:.2f}s") time.sleep(wait_time) return None # Track quota headers_remaining = response.headers.get("x-ratelimit-remaining-requests") headers_reset = response.headers.get("x-ratelimit-reset-requests") print(f"Requests remaining: {headers_remaining}") print(f"Reset at: {headers_reset}")

Token Quota Management

Monitor token-per-minute (TPM) limits. Pre-compute request tokens before dispatching. Implement token-weighted queuing to stay within limits.

# Token quota tracking class TokenQuotaManager: def __init__(self, tokens_per_minute=90000): self.tpm_limit = tokens_per_minute self.tokens_used = 0 self.window_start = time.time() def can_send(self, prompt_tokens): elapsed = time.time() - self.window_start # Reset window if 60 seconds passed if elapsed >= 60: self.tokens_used = 0 self.window_start = time.time() if self.tokens_used + prompt_tokens > self.tpm_limit: wait_time = 60 - elapsed return False, wait_time self.tokens_used += prompt_tokens return True, 0 # Usage quota = TokenQuotaManager() prompt_tokens = count_tokens(prompt, model="gpt-4o") can_send, wait = quota.can_send(prompt_tokens) if not can_send: time.sleep(wait)
07 — Monitoring

Cost Attribution and Analytics

Track costs per user, feature, and request. Use structured logging to correlate API calls with business outcomes. Implement dashboards for cost visibility and anomaly detection.

# Cost attribution logging import logging import json from datetime import datetime class CostLogger: def __init__(self): self.logger = logging.getLogger("api_costs") def log_api_call(self, user_id, feature, model, prompt_tokens, completion_tokens, cost): log_entry = { "timestamp": datetime.utcnow().isoformat(), "user_id": user_id, "feature": feature, "model": model, "prompt_tokens": prompt_tokens, "completion_tokens": completion_tokens, "total_tokens": prompt_tokens + completion_tokens, "cost_usd": cost, "cost_per_token": cost / (prompt_tokens + completion_tokens) } self.logger.info(json.dumps(log_entry)) cost_logger = CostLogger() # After API call prompt_tokens = response.usage.prompt_tokens completion_tokens = response.usage.completion_tokens cost = (prompt_tokens * 0.005 + completion_tokens * 0.015) / 1000000 cost_logger.log_api_call( user_id="user123", feature="search_summarization", model="gpt-4o", prompt_tokens=prompt_tokens, completion_tokens=completion_tokens, cost=cost )

Integration with Langfuse

Use Langfuse for production-grade cost tracking with trace-level visibility. Correlate costs with latency, user actions, and model quality metrics.

# Langfuse cost tracking from langfuse import Langfuse langfuse = Langfuse( public_key="pk_...", secret_key="sk_..." ) with langfuse.trace( name="search_feature", user_id="user123", session_id="session_456" ) as trace: response = client.chat.completions.create( model="gpt-4o", messages=messages ) trace.event( name="api_call_completed", input=messages, output=response.choices[0].message.content, model=response.model, usage=response.usage ) # Langfuse automatically captures costs # Get cost via dashboard
Tools

Implementation Stack

Essential tools for managing LLM traffic, costs, and reliability at scale.

FastAPI

Web framework for building API endpoints with async support. Integrates with OpenAI streaming responses. Built-in OpenAPI documentation.

asyncio

Python concurrency library for concurrent API calls. Semaphores for rate limit enforcement. Efficient for I/O-bound workloads.

Celery

Distributed task queue for batch processing. Decouple request handling from LLM API calls. Implement retry logic and exponential backoff.

httpx

Modern HTTP client with async support. Better than aiohttp for most use cases. Retries and timeout configuration built-in.

Server-Sent Events (SSE)

Streaming protocol for progressive token delivery. HTTP long-polling for browsers. Lower overhead than WebSockets for simple streaming.

OpenAI Batch API

50% cost savings for non-time-sensitive work. Process 100K+ requests overnight. Asynchronous result retrieval.

References

Learn More

OpenAI Pricing

Real-time pricing for all OpenAI models. Compare token costs across GPT-4o, GPT-4o mini, and legacy models.

OpenAI Chat API

Full API reference including cache_control, max_tokens, and response format parameters.

Anthropic Batch API

Anthropic's batch processing documentation with examples for cost optimization.

LiteLLM

Open-source library for multi-provider LLM routing with unified interface and cost tracking.

GPTCache

Semantic caching library with vector similarity and TTL-based invalidation strategies.

Langfuse

Production observability platform with cost attribution, trace-level debugging, and performance analytics.

tiktoken

Official tokenizer library for OpenAI models. Accurate token counting for budget estimation.

FastAPI Docs

High-performance Python web framework with async support and automatic API documentation.