Token bucket and sliding window algorithms for per-user and per-tenant quota enforcement — protecting upstream LLM APIs from overuse and distributing limited TPM/RPM capacity fairly across your users.
LLM APIs enforce per-account rate limits (tokens-per-minute and requests-per-minute). Without your own rate limiting, a single heavy user can exhaust your entire quota, causing 429 errors for all other users. At scale, rate limiting also controls costs — a user who submits 1000 requests in a loop could burn through your monthly budget in minutes.
Three levels of rate limiting for LLM applications:
Per-user rate limiting: each user gets their own quota (e.g., 100 requests/hour). Prevents individual abuse. Implemented in your API layer.
Per-tenant rate limiting: for B2B SaaS, each customer organisation gets a quota. Usually tied to their subscription tier (starter: 1000 req/day, pro: 10,000, enterprise: unlimited). More coarse-grained than per-user.
Global budget guards: a hard ceiling on total spend across all users. Implemented as a daily/monthly token counter — when the budget is hit, all requests are blocked until the next period. Prevents runaway costs from bugs or attacks.
import time
import threading
class TokenBucket:
def __init__(self, capacity: int, refill_rate: float):
# capacity: max tokens in the bucket (burst allowance)
# refill_rate: tokens added per second
self.capacity = capacity
self.refill_rate = refill_rate
self.tokens = capacity # start full
self.last_refill = time.time()
self._lock = threading.Lock()
def _refill(self):
now = time.time()
elapsed = now - self.last_refill
added = elapsed * self.refill_rate
self.tokens = min(self.capacity, self.tokens + added)
self.last_refill = now
def consume(self, tokens: int = 1) -> bool:
with self._lock:
self._refill()
if self.tokens >= tokens:
self.tokens -= tokens
return True # allowed
return False # rate limited
# Example: 10 requests/second sustained, burst up to 50
bucket = TokenBucket(capacity=50, refill_rate=10)
for i in range(60):
allowed = bucket.consume(1)
print(f"Request {i}: {'OK' if allowed else 'RATE LIMITED'}")
time.sleep(0.05) # 20 req/s — twice the refill rate; burst consumed after 2.5s
Token bucket naturally handles bursts — a user who was idle accumulates tokens and can then burst briefly above the sustained rate. Good for interactive applications where users may send a few quick follow-up messages.
from collections import deque
import time
class SlidingWindowCounter:
def __init__(self, limit: int, window_seconds: float):
# limit: max requests in any window_seconds interval
self.limit = limit
self.window = window_seconds
self.requests = deque() # timestamps of recent requests
self._lock = __import__('threading').Lock()
def is_allowed(self) -> bool:
with self._lock:
now = time.time()
cutoff = now - self.window
# Remove expired timestamps
while self.requests and self.requests[0] < cutoff:
self.requests.popleft()
if len(self.requests) < self.limit:
self.requests.append(now)
return True
return False
@property
def retry_after(self) -> float:
# How long until the oldest request expires
if not self.requests:
return 0.0
return max(0.0, self.requests[0] + self.window - time.time())
# Example: 100 requests per minute
sw = SlidingWindowCounter(limit=100, window_seconds=60)
# Simulate usage
for i in range(105):
if sw.is_allowed():
print(f"Request {i}: OK")
else:
print(f"Request {i}: RATE LIMITED (retry after {sw.retry_after:.1f}s)")
import redis
import time
r = redis.Redis(host="localhost", port=6379, db=0)
def check_rate_limit(user_id: str, limit: int, window: int) -> dict:
# Sliding window using Redis sorted sets
# Key: rate_limit:{user_id}
# Members: request timestamps (as floats)
key = f"rate_limit:{user_id}"
now = time.time()
window_start = now - window
pipe = r.pipeline()
# Remove expired requests
pipe.zremrangebyscore(key, 0, window_start)
# Count current requests in window
pipe.zcard(key)
# Add this request
pipe.zadd(key, {str(now): now})
# Set expiry on the key
pipe.expire(key, window + 1)
results = pipe.execute()
current_count = results[1]
allowed = current_count < limit
if not allowed:
# Rollback the zadd — remove the request we just added
r.zrem(key, str(now))
oldest = r.zrange(key, 0, 0, withscores=True)
retry_after = (oldest[0][1] + window - now) if oldest else 0
return {
"allowed": allowed,
"count": current_count,
"limit": limit,
"retry_after": retry_after if not allowed else 0,
}
# Usage
result = check_rate_limit("user_123", limit=100, window=3600)
if not result["allowed"]:
print(f"Rate limited. Retry in {result['retry_after']:.0f}s")
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
import redis
import time
app = FastAPI()
r = redis.Redis(host="localhost", port=6379)
@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
# Extract user identifier (from JWT, API key, or IP)
user_id = request.headers.get("X-User-ID") or request.client.host
# Check rate limit
result = check_rate_limit(user_id, limit=60, window=60) # 60 req/min
if not result["allowed"]:
return JSONResponse(
status_code=429,
content={"error": "Rate limit exceeded",
"retry_after": result["retry_after"]},
headers={
"Retry-After": str(int(result["retry_after"])),
"X-RateLimit-Limit": str(result["limit"]),
"X-RateLimit-Remaining": "0",
"X-RateLimit-Reset": str(int(time.time() + result["retry_after"])),
}
)
# Add rate limit headers to successful responses
response = await call_next(request)
response.headers["X-RateLimit-Limit"] = str(result["limit"])
response.headers["X-RateLimit-Remaining"] = str(result["limit"] - result["count"] - 1)
return response
from enum import Enum
from dataclasses import dataclass
class Plan(Enum):
FREE = "free"
STARTER = "starter"
PRO = "pro"
ENTERPRISE = "enterprise"
@dataclass
class RateLimitTier:
requests_per_minute: int
requests_per_day: int
tokens_per_minute: int # maps to upstream LLM API limits
max_context_tokens: int # cap on prompt size
TIERS = {
Plan.FREE: RateLimitTier(10, 100, 10_000, 4_096),
Plan.STARTER: RateLimitTier(60, 1_000, 100_000, 32_768),
Plan.PRO: RateLimitTier(300, 10_000, 500_000, 100_000),
Plan.ENTERPRISE: RateLimitTier(3000, 100_000, 2_000_000, 200_000),
}
def enforce_limits(user_id: str, plan: Plan, token_count: int) -> bool:
tier = TIERS[plan]
# Check token count against context limit
if token_count > tier.max_context_tokens:
raise ValueError(f"Prompt too long: {token_count} > {tier.max_context_tokens}")
# Check RPM
rpm_result = check_rate_limit(f"{user_id}:rpm", tier.requests_per_minute, 60)
if not rpm_result["allowed"]: return False
# Check RPD
rpd_result = check_rate_limit(f"{user_id}:rpd", tier.requests_per_day, 86400)
if not rpd_result["allowed"]: return False
# Check TPM (approximate — track by consuming tokens)
tpm_result = check_rate_limit(f"{user_id}:tpm:{int(time.time()//60)}",
tier.tokens_per_minute, 60)
return tpm_result["allowed"]
Rate limiting by IP is easy to bypass and unfair. Behind a corporate NAT or shared WiFi, hundreds of users share one IP. Limiting by IP either over-limits legitimate users or provides too easy a bypass (just switch networks). Always rate limit by authenticated user ID or API key once users are logged in.
Sliding window is more accurate than fixed window but uses more memory. Fixed window counters (just increment a counter each minute, reset on the minute) allow double the limit at window boundaries (burst at the end of minute 1 + start of minute 2). Sliding window is more accurate but stores timestamps for every request. For most applications, sliding window with Redis TTL-based cleanup is the right choice.
Count tokens, not just requests, for LLM rate limiting. A single request with a 100K-token prompt uses 1000× the upstream API quota of a 100-token request. Counting requests equally is unfair and ineffective at protecting your budget. Track token usage per user alongside request counts, especially if you expose long-context capabilities.
| Algorithm | Burst Handling | Memory | Best For |
|---|---|---|---|
| Fixed window | Poor (resets at boundary) | O(1) | Simple per-minute caps |
| Sliding window log | Excellent | O(n) per user | Strict fairness, low traffic |
| Sliding window counter | Good approximation | O(1) | High-traffic APIs |
| Token bucket | Excellent (allows bursts) | O(1) | APIs with bursty clients |
| Leaky bucket | None (smooths traffic) | O(1) | Upstream LLM API protection |
For LLM applications specifically, rate-limit on tokens not just requests — a single 100K-token request can exhaust your upstream API budget as fast as 200 small requests. Track estimated input + output tokens per request in your rate-limit counter. Use the token estimates your upstream provider returns in response headers to update rolling averages, and adjust your rate-limit thresholds dynamically rather than hardcoding them.
Token-based rate limiting requires estimating output token counts before the request completes. Use the provider token counting endpoint for exact input counts, and maintain a rolling average of output-to-input ratio from recent requests for total usage estimation. Update the counter optimistically before the request, then correct it when actual usage arrives in response headers.
Implement a global token budget tracker across all users using Redis sorted sets: store (timestamp, tokens_used) tuples per user ID and use a sliding window to compute total tokens used in the past 60 seconds. This approach scales to millions of users with sub-millisecond overhead per request and supports atomic check-and-decrement operations that prevent race conditions in high-concurrency deployments where multiple request handlers run in parallel.