01 — Foundation
Why State Is Hard
LLMs are stateless by nature. Each API call starts fresh — the model has no built-in memory of previous interactions. Yet users expect continuity. Conversations should remember context, tone, decisions. Managing that gap is the job of application-level state.
The core challenge: Every API call costs tokens. Sending full conversation history every time gets expensive fast. Context windows aren't infinite — GPT-4 offers 128K tokens, but real-world requests sit in 2K–10K windows. And multi-user apps need isolation: user A's history should never leak into user B's session.
💡
Every API call starts fresh — state is your responsibility. The model won't remember, persist, or cost-optimize for you. You design history strategies, storage backends, and expiry policies.
02 — Tradeoffs
History Strategies
Different strategies optimize for cost, quality, or session length. Choose based on your use case.
| Strategy | Description | Cost | Quality | Best For |
| Full history | Send all messages | High | Best | Short sessions |
| Sliding window | Last N turns | Medium | Good | General chat |
| Summarise + trim | LLM summary of old turns | Medium | Good | Long sessions |
| Embedding memory | Retrieve relevant past turns | Low | Selective | Long-term personal |
| Hybrid | Sliding window + summary | Medium | Excellent | Production default |
Strategy Details
Full history: Best quality, worst cost. Works for demos and short chat sessions (< 20 turns). Once you exceed your context window, quality drops sharply.
Sliding window: Keep last N messages (e.g., last 10 turns = 20 messages). Simple, predictable cost. Loses distant context but often sufficient for coherent conversations.
Summarise + trim: Periodically ask the LLM to summarize old turns into a condensed summary. Replace old messages with "Summary: ...". Preserves key context while cutting token cost. Adds latency (extra API call).
Embedding memory: Store turn embeddings in a vector DB. On each new message, retrieve the most relevant past turns. Best for long-term, multi-session memory but requires embedding infrastructure.
Hybrid (recommended): Keep last 5 turns in full. Summarize older turns into a rolling summary. Combine both in the context. Best cost/quality tradeoff for production.
03 — Code
Implementing History in Python
LangChain offers memory utilities for sliding window and summarization. Here's how to use them:
from langchain.memory import ConversationBufferWindowMemory, ConversationSummaryBufferMemory
from langchain_openai import OpenAI
# Sliding window: last 5 turns
memory = ConversationBufferWindowMemory(k=5)
# Summarization: keep last 4 turns, summarize older
llm = OpenAI(model_name="gpt-3.5-turbo")
memory = ConversationSummaryBufferMemory(
llm=llm,
max_token_limit=1000,
buffer="Keep the last 4 turns, summarize older"
)
# Add messages
memory.save_context(
{"input": "What is AI?"},
{"output": "AI is..."}
)
# Get formatted history for next request
history_str = memory.buffer # or memory.load_memory_variables({})
print(history_str)
Manual Token Trimming with tiktoken
For fine-grained control, manually trim using token counts:
import tiktoken
from collections import deque
def trim_messages_to_limit(messages, max_tokens=4000, model="gpt-4"):
enc = tiktoken.encoding_for_model(model)
# Count tokens in messages
total = 0
kept = deque()
# Go backwards; keep recent first
for msg in reversed(messages):
msg_tokens = len(enc.encode(msg["content"]))
if total + msg_tokens > max_tokens:
break
kept.appendleft(msg)
total += msg_tokens
return list(kept)
# Usage
messages = [{"role": "user", "content": "..."}, ...]
trimmed = trim_messages_to_limit(messages, max_tokens=3000)
04 — Infrastructure
Session Storage Backends
Where you store conversation state depends on latency, durability, and scale requirements:
| Backend | Latency | Durability | Scale | Cost |
| In-memory dict | <1ms | No | Single process | Free |
| Redis | 1–5ms | Yes | Horizontal | $ |
| DynamoDB | 5–20ms | Yes | Unlimited | $$ |
| PostgreSQL | 5–30ms | Yes | Vertical | $ |
Redis Session Store Example
Redis is the most popular for session storage — fast, reliable, and supports expiry out of the box:
import redis
import json
from datetime import timedelta
# Connect
r = redis.Redis(host='localhost', port=6379, db=0)
# Save session (auto-expires in 24 hours)
session_id = "user_123_session_456"
session_data = {
"messages": [
{"role": "user", "content": "..."},
{"role": "assistant", "content": "..."}
],
"user_id": "user_123",
"created_at": "2026-03-24T..."
}
r.setex(
session_id,
timedelta(hours=24),
json.dumps(session_data)
)
# Retrieve session
data = r.get(session_id)
if data:
session_data = json.loads(data)
# List all sessions for a user (optional: prefix scan)
pattern = f"user_123_*"
session_keys = r.keys(pattern)
PostgreSQL with pgvector
For embedding-based memory, PostgreSQL + pgvector allows semantic search over past turns:
-- Create session table
CREATE TABLE sessions (
session_id UUID PRIMARY KEY,
user_id UUID NOT NULL,
created_at TIMESTAMP,
expires_at TIMESTAMP
);
-- Store individual turns with embeddings
CREATE TABLE turns (
turn_id SERIAL PRIMARY KEY,
session_id UUID REFERENCES sessions(session_id),
role VARCHAR(20), -- 'user' or 'assistant'
content TEXT,
embedding vector(1536), -- OpenAI ada-002
created_at TIMESTAMP
);
-- Index for fast retrieval
CREATE INDEX ON turns USING ivfflat (embedding vector_cosine_ops);
-- Find similar past turns
SELECT content, role
FROM turns
WHERE session_id = $1
ORDER BY embedding <-> $2::vector
LIMIT 3;
05 — Isolation
Multi-User & Multi-Session Architecture
Production apps support multiple users, each with multiple concurrent sessions. Session isolation is critical.
Session ID Generation
Use UUID4 for globally unique, unpredictable session identifiers:
import uuid
from datetime import datetime, timedelta
def create_session(user_id: str) -> dict:
session_id = str(uuid.uuid4())
return {
"session_id": session_id,
"user_id": user_id,
"created_at": datetime.utcnow().isoformat(),
"expires_at": (datetime.utcnow() + timedelta(hours=24)).isoformat(),
"messages": []
}
# Store in Redis with key: user_id:session_id
key = f"{user_id}:{session_id}"
r.setex(key, timedelta(hours=24), json.dumps(session_data))
Key Patterns
- Session key:
user_id:session_id — namespaces sessions by user
- User key:
user_id:sessions — list/set of active session IDs (for UI)
- Message archive:
user_id:session_id:messages — full history in a list
Expiry & TTL
Set Redis TTL on session keys. When expired, sessions auto-delete. Options:
⏰ Fixed expiry
- All sessions expire in 24 hours
- Simple, predictable
- May lose active chats
🔄 Sliding expiry
- Reset TTL on each message
- Sessions live 24h of inactivity
- More user-friendly
Session Listing for UI
Let users see and switch between their sessions:
def list_user_sessions(user_id: str, r: redis.Redis):
"""Return all active sessions for a user."""
pattern = f"{user_id}:*"
session_keys = r.keys(pattern)
sessions = []
for key in session_keys:
if ":messages" not in key: # Skip message archives
data = json.loads(r.get(key))
sessions.append({
"id": data["session_id"],
"created_at": data["created_at"],
"message_count": len(data["messages"])
})
return sorted(sessions, key=lambda x: x["created_at"], reverse=True)
06 — Persistence
Stateful Agents
Agents are programs that loop: observe state, decide action, execute tool, update state. State persists across steps and sessions.
Agent Scratchpad & Tool History
An agent's scratchpad stores its internal reasoning and tool calls. Must be saved between steps:
from langchain.agents import AgentExecutor, initialize_agent
from langchain.memory import ConversationBufferMemory
# Agent with persistent memory
memory = ConversationBufferMemory(memory_key="chat_history")
agent = initialize_agent(
tools=[...],
llm=llm,
agent="zero-shot-react-description",
memory=memory,
return_intermediate_steps=True # Capture tool calls
)
# Each step adds to memory automatically
response = agent.run("User query")
# Agent's reasoning, tool calls, and results are logged
Checkpointing with LangGraph
LangGraph provides explicit checkpointing for agent state. Save snapshots after each step:
from langgraph.checkpoint.base import BaseCheckpointStorage
from langgraph.checkpoint.postgres import PostgresCheckpointStorage
import psycopg
# Use Postgres for durable checkpoints
conn = psycopg.connect("...")
checkpointer = PostgresCheckpointStorage(conn=conn)
# Build graph with checkpointing
workflow = StateGraph(AgentState)
workflow.add_node("agent", agent_node)
workflow.add_edge("agent", "tools")
# ... define workflow ...
app = workflow.compile(checkpointer=checkpointer)
# Run with snapshots
result = app.invoke(
{"messages": [...], "task": "..."},
config={"configurable": {"thread_id": session_id}}
)
# Can resume from checkpoint if interrupted
snapshot = checkpointer.get(config)
print(snapshot.values) # Last saved state
MemGPT-Style External Memory
MemGPT stores agent memory in a database, partitioning by type:
-- MemGPT memory tables
CREATE TABLE agent_core_memory (
agent_id UUID,
section VARCHAR(50), -- 'persona', 'human'
content TEXT,
updated_at TIMESTAMP
);
CREATE TABLE agent_recall_memory (
agent_id UUID,
turn_id INT,
content TEXT,
created_at TIMESTAMP
);
-- Agent passes memory chunks in system prompt
system_prompt = f"""
{agent_persona}
[Recall: {recent_turns}]
"""
Long-Running Task State
For async tasks that span hours/days, pair asyncio tasks with database records:
import asyncio
import json
from datetime import datetime
# Task record in DB
async def start_long_task(user_id: str, query: str):
task_id = str(uuid.uuid4())
# Create DB record
await db.execute(
"""INSERT INTO tasks (task_id, user_id, query, status, created_at)
VALUES (%s, %s, %s, %s, %s)""",
(task_id, user_id, query, "running", datetime.utcnow())
)
# Launch background task
asyncio.create_task(process_task(task_id, user_id, query))
return task_id
async def process_task(task_id: str, user_id: str, query: str):
try:
result = await long_llm_operation(query)
await db.execute(
"UPDATE tasks SET status=%s, result=%s, ended_at=%s WHERE task_id=%s",
("done", json.dumps(result), datetime.utcnow(), task_id)
)
except Exception as e:
await db.execute(
"UPDATE tasks SET status=%s, error=%s, ended_at=%s WHERE task_id=%s",
("failed", str(e), datetime.utcnow(), task_id)
)
07 — Safeguards
Security & Privacy
Session data may contain sensitive user information. Protect it.
Session Hijacking Prevention
- Use cryptographically strong session IDs (UUID4, not sequential)
- Bind sessions to user + IP (optional, for non-mobile apps)
- Use HttpOnly, Secure cookies (browser storage)
- Rotate session IDs on login
- Implement rate limiting on session endpoints
PII Redaction in Stored History
Before storing messages in long-term storage, redact sensitive data:
import re
def redact_pii(text: str) -> str:
"""Redact email, phone, SSN, credit card."""
# Email
text = re.sub(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
'[EMAIL]', text)
# Phone (simple US format)
text = re.sub(r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b', '[PHONE]', text)
# SSN
text = re.sub(r'\b\d{3}-\d{2}-\d{4}\b', '[SSN]', text)
# Credit card (simple check)
text = re.sub(r'\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b', '[CARD]', text)
return text
# Store redacted version
redacted_content = redact_pii(user_message)
r.lpush(f"{session_id}:messages", json.dumps({
"role": "user",
"content": redacted_content,
"original_hash": hash(user_message) # For audit
}))
Encryption at Rest
Encrypt session data before storing in Redis or DB:
from cryptography.fernet import Fernet
import os
# Generate key (store in environment, rotate periodically)
cipher_key = os.getenv("SESSION_CIPHER_KEY")
cipher = Fernet(cipher_key)
def encrypt_session(data: dict) -> str:
plaintext = json.dumps(data).encode()
return cipher.encrypt(plaintext).decode()
def decrypt_session(ciphertext: str) -> dict:
plaintext = cipher.decrypt(ciphertext.encode())
return json.loads(plaintext)
# Store encrypted
encrypted = encrypt_session(session_data)
r.setex(session_id, timedelta(hours=24), encrypted)
GDPR Right to Erasure
Implement account deletion that removes all session data:
async def delete_user_data(user_id: str, r: redis.Redis, db):
"""Cascade delete all user sessions and messages."""
# Redis: delete all sessions
keys = r.keys(f"{user_id}:*")
if keys:
r.delete(*keys)
# DB: delete sessions and tasks
await db.execute("DELETE FROM sessions WHERE user_id = %s", (user_id,))
await db.execute("DELETE FROM tasks WHERE user_id = %s", (user_id,))
# Log deletion (audit trail)
await db.execute(
"INSERT INTO audit_log (user_id, action, timestamp) VALUES (%s, %s, %s)",
(user_id, "account_deleted", datetime.utcnow())
)
Audit Logging
Log sensitive operations for compliance and debugging:
async def log_audit(user_id: str, action: str, details: dict, db):
"""Record audit event."""
await db.execute(
"""INSERT INTO audit_log
(user_id, action, details, timestamp, ip_address)
VALUES (%s, %s, %s, %s, %s)""",
(user_id, action, json.dumps(details),
datetime.utcnow(), request.client.host)
)
# Usage: on sensitive actions
await log_audit(user_id, "session_created", {"session_id": sid}, db)
await log_audit(user_id, "session_deleted", {"session_id": sid}, db)
await log_audit(user_id, "data_exported", {"count": 50}, db)