Traffic & Cost

Rate Limiting

Token bucket and sliding window algorithms for per-user and per-tenant quota enforcement — protecting upstream LLM APIs from overuse and distributing limited TPM/RPM capacity fairly across your users.

Token bucket
Algorithm
Per-tenant
Enforcement
Redis
State store

Table of Contents

SECTION 01

Why LLM apps need rate limiting

LLM APIs enforce per-account rate limits (tokens-per-minute and requests-per-minute). Without your own rate limiting, a single heavy user can exhaust your entire quota, causing 429 errors for all other users. At scale, rate limiting also controls costs — a user who submits 1000 requests in a loop could burn through your monthly budget in minutes.

Three levels of rate limiting for LLM applications:

Per-user rate limiting: each user gets their own quota (e.g., 100 requests/hour). Prevents individual abuse. Implemented in your API layer.

Per-tenant rate limiting: for B2B SaaS, each customer organisation gets a quota. Usually tied to their subscription tier (starter: 1000 req/day, pro: 10,000, enterprise: unlimited). More coarse-grained than per-user.

Global budget guards: a hard ceiling on total spend across all users. Implemented as a daily/monthly token counter — when the budget is hit, all requests are blocked until the next period. Prevents runaway costs from bugs or attacks.

SECTION 02

Token bucket algorithm

import time
import threading

class TokenBucket:
    def __init__(self, capacity: int, refill_rate: float):
        # capacity: max tokens in the bucket (burst allowance)
        # refill_rate: tokens added per second
        self.capacity = capacity
        self.refill_rate = refill_rate
        self.tokens = capacity         # start full
        self.last_refill = time.time()
        self._lock = threading.Lock()

    def _refill(self):
        now = time.time()
        elapsed = now - self.last_refill
        added = elapsed * self.refill_rate
        self.tokens = min(self.capacity, self.tokens + added)
        self.last_refill = now

    def consume(self, tokens: int = 1) -> bool:
        with self._lock:
            self._refill()
            if self.tokens >= tokens:
                self.tokens -= tokens
                return True   # allowed
            return False      # rate limited

# Example: 10 requests/second sustained, burst up to 50
bucket = TokenBucket(capacity=50, refill_rate=10)

for i in range(60):
    allowed = bucket.consume(1)
    print(f"Request {i}: {'OK' if allowed else 'RATE LIMITED'}")
    time.sleep(0.05)   # 20 req/s — twice the refill rate; burst consumed after 2.5s

Token bucket naturally handles bursts — a user who was idle accumulates tokens and can then burst briefly above the sustained rate. Good for interactive applications where users may send a few quick follow-up messages.

SECTION 03

Sliding window counter

from collections import deque
import time

class SlidingWindowCounter:
    def __init__(self, limit: int, window_seconds: float):
        # limit: max requests in any window_seconds interval
        self.limit = limit
        self.window = window_seconds
        self.requests = deque()    # timestamps of recent requests
        self._lock = __import__('threading').Lock()

    def is_allowed(self) -> bool:
        with self._lock:
            now = time.time()
            cutoff = now - self.window

            # Remove expired timestamps
            while self.requests and self.requests[0] < cutoff:
                self.requests.popleft()

            if len(self.requests) < self.limit:
                self.requests.append(now)
                return True
            return False

    @property
    def retry_after(self) -> float:
        # How long until the oldest request expires
        if not self.requests:
            return 0.0
        return max(0.0, self.requests[0] + self.window - time.time())

# Example: 100 requests per minute
sw = SlidingWindowCounter(limit=100, window_seconds=60)

# Simulate usage
for i in range(105):
    if sw.is_allowed():
        print(f"Request {i}: OK")
    else:
        print(f"Request {i}: RATE LIMITED (retry after {sw.retry_after:.1f}s)")
SECTION 04

Redis-backed implementation

import redis
import time

r = redis.Redis(host="localhost", port=6379, db=0)

def check_rate_limit(user_id: str, limit: int, window: int) -> dict:
    # Sliding window using Redis sorted sets
    # Key: rate_limit:{user_id}
    # Members: request timestamps (as floats)
    key = f"rate_limit:{user_id}"
    now = time.time()
    window_start = now - window

    pipe = r.pipeline()
    # Remove expired requests
    pipe.zremrangebyscore(key, 0, window_start)
    # Count current requests in window
    pipe.zcard(key)
    # Add this request
    pipe.zadd(key, {str(now): now})
    # Set expiry on the key
    pipe.expire(key, window + 1)
    results = pipe.execute()

    current_count = results[1]
    allowed = current_count < limit

    if not allowed:
        # Rollback the zadd — remove the request we just added
        r.zrem(key, str(now))

    oldest = r.zrange(key, 0, 0, withscores=True)
    retry_after = (oldest[0][1] + window - now) if oldest else 0

    return {
        "allowed": allowed,
        "count": current_count,
        "limit": limit,
        "retry_after": retry_after if not allowed else 0,
    }

# Usage
result = check_rate_limit("user_123", limit=100, window=3600)
if not result["allowed"]:
    print(f"Rate limited. Retry in {result['retry_after']:.0f}s")
SECTION 05

FastAPI middleware

from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
import redis
import time

app = FastAPI()
r = redis.Redis(host="localhost", port=6379)

@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
    # Extract user identifier (from JWT, API key, or IP)
    user_id = request.headers.get("X-User-ID") or request.client.host

    # Check rate limit
    result = check_rate_limit(user_id, limit=60, window=60)  # 60 req/min

    if not result["allowed"]:
        return JSONResponse(
            status_code=429,
            content={"error": "Rate limit exceeded",
                     "retry_after": result["retry_after"]},
            headers={
                "Retry-After": str(int(result["retry_after"])),
                "X-RateLimit-Limit": str(result["limit"]),
                "X-RateLimit-Remaining": "0",
                "X-RateLimit-Reset": str(int(time.time() + result["retry_after"])),
            }
        )

    # Add rate limit headers to successful responses
    response = await call_next(request)
    response.headers["X-RateLimit-Limit"] = str(result["limit"])
    response.headers["X-RateLimit-Remaining"] = str(result["limit"] - result["count"] - 1)
    return response
SECTION 06

Tiered limits by plan

from enum import Enum
from dataclasses import dataclass

class Plan(Enum):
    FREE    = "free"
    STARTER = "starter"
    PRO     = "pro"
    ENTERPRISE = "enterprise"

@dataclass
class RateLimitTier:
    requests_per_minute: int
    requests_per_day: int
    tokens_per_minute: int    # maps to upstream LLM API limits
    max_context_tokens: int   # cap on prompt size

TIERS = {
    Plan.FREE:       RateLimitTier(10,    100,     10_000,   4_096),
    Plan.STARTER:    RateLimitTier(60,   1_000,   100_000,  32_768),
    Plan.PRO:        RateLimitTier(300,  10_000,  500_000, 100_000),
    Plan.ENTERPRISE: RateLimitTier(3000, 100_000, 2_000_000, 200_000),
}

def enforce_limits(user_id: str, plan: Plan, token_count: int) -> bool:
    tier = TIERS[plan]

    # Check token count against context limit
    if token_count > tier.max_context_tokens:
        raise ValueError(f"Prompt too long: {token_count} > {tier.max_context_tokens}")

    # Check RPM
    rpm_result = check_rate_limit(f"{user_id}:rpm", tier.requests_per_minute, 60)
    if not rpm_result["allowed"]: return False

    # Check RPD
    rpd_result = check_rate_limit(f"{user_id}:rpd", tier.requests_per_day, 86400)
    if not rpd_result["allowed"]: return False

    # Check TPM (approximate — track by consuming tokens)
    tpm_result = check_rate_limit(f"{user_id}:tpm:{int(time.time()//60)}",
                                   tier.tokens_per_minute, 60)
    return tpm_result["allowed"]
SECTION 07

Gotchas

Rate limiting by IP is easy to bypass and unfair. Behind a corporate NAT or shared WiFi, hundreds of users share one IP. Limiting by IP either over-limits legitimate users or provides too easy a bypass (just switch networks). Always rate limit by authenticated user ID or API key once users are logged in.

Sliding window is more accurate than fixed window but uses more memory. Fixed window counters (just increment a counter each minute, reset on the minute) allow double the limit at window boundaries (burst at the end of minute 1 + start of minute 2). Sliding window is more accurate but stores timestamps for every request. For most applications, sliding window with Redis TTL-based cleanup is the right choice.

Count tokens, not just requests, for LLM rate limiting. A single request with a 100K-token prompt uses 1000× the upstream API quota of a 100-token request. Counting requests equally is unfair and ineffective at protecting your budget. Track token usage per user alongside request counts, especially if you expose long-context capabilities.

SECTION 08

Rate Limit Architecture Patterns

AlgorithmBurst HandlingMemoryBest For
Fixed windowPoor (resets at boundary)O(1)Simple per-minute caps
Sliding window logExcellentO(n) per userStrict fairness, low traffic
Sliding window counterGood approximationO(1)High-traffic APIs
Token bucketExcellent (allows bursts)O(1)APIs with bursty clients
Leaky bucketNone (smooths traffic)O(1)Upstream LLM API protection

For LLM applications specifically, rate-limit on tokens not just requests — a single 100K-token request can exhaust your upstream API budget as fast as 200 small requests. Track estimated input + output tokens per request in your rate-limit counter. Use the token estimates your upstream provider returns in response headers to update rolling averages, and adjust your rate-limit thresholds dynamically rather than hardcoding them.

Token-based rate limiting requires estimating output token counts before the request completes. Use the provider token counting endpoint for exact input counts, and maintain a rolling average of output-to-input ratio from recent requests for total usage estimation. Update the counter optimistically before the request, then correct it when actual usage arrives in response headers.

Implement a global token budget tracker across all users using Redis sorted sets: store (timestamp, tokens_used) tuples per user ID and use a sliding window to compute total tokens used in the past 60 seconds. This approach scales to millions of users with sub-millisecond overhead per request and supports atomic check-and-decrement operations that prevent race conditions in high-concurrency deployments where multiple request handlers run in parallel.