Saving intermediate agent and pipeline state at key execution points so that long-running tasks can resume after failures without restarting from scratch.
A 10-step agentic pipeline that fails at step 9 without checkpointing restarts from step 1, wasting compute and time. Checkpointing saves the output of each completed step so on retry, execution resumes from the last successful checkpoint. Critical for: long research tasks, multi-tool agents, batch processing jobs.
Too coarse (checkpoint only at end): no benefit. Too fine (every token): overhead overwhelms. Optimal: checkpoint at natural step boundaries — after each tool call, after each sub-task completion, before expensive operations. For batch jobs, checkpoint every N items (N=100 is typical).
Redis: fast, ephemeral — good for intra-session checkpoints. " "Postgres/SQLite: persistent, queryable — good for multi-session tasks. " "S3/blob storage: for large intermediate artifacts (generated documents, retrieved corpora).
import json, redis, hashlib
r = redis.Redis()
def save_checkpoint(task_id: str, step: int, state: dict, ttl: int = 3600):
key = f"checkpoint:{task_id}:{step}"
r.set(key, json.dumps(state), ex=ttl)
# Track latest checkpoint
r.set(f"checkpoint:{task_id}:latest", step, ex=ttl)
print(f"Checkpoint saved: step {step}")
def load_latest_checkpoint(task_id: str) -> tuple[int, dict]:
latest = r.get(f"checkpoint:{task_id}:latest")
if not latest:
return 0, {}
step = int(latest)
state = json.loads(r.get(f"checkpoint:{task_id}:{step}"))
return step, state
On task start, check for an existing checkpoint. If found, resume from that step. " "If not found, start from step 0. Pass the checkpoint state to downstream steps.
async def run_pipeline(task_id: str, inputs: dict) -> dict:
start_step, state = load_latest_checkpoint(task_id)
if start_step > 0:
print(f"Resuming from step {start_step}")
else:
state = {"inputs": inputs, "results": {}}
steps = [step_retrieve, step_analyze, step_generate, step_validate, step_format]
for i, step_fn in enumerate(steps):
if i < start_step:
continue # skip already-completed steps
state = await step_fn(state)
save_checkpoint(task_id, i + 1, state)
return state["results"]
Each step must be idempotent: running it twice with the same input produces the same output. For LLM calls, use temperature=0 and store the output hash to detect re-runs. For external API calls (web search, database writes), use idempotency keys. Non-idempotent steps (file writes, emails) must check 'already done' before executing.
LangGraph has built-in checkpointing via a Checkpointer interface. SqliteSaver, PostgresSaver, and RedisSaver are provided out of the box. Pass a checkpointer to your graph at compile time; LangGraph saves state after each node. On re-run with the same thread_id, execution resumes from the last saved state automatically.
Checkpoints store model weights, optimizer state, and training metadata. Common formats include PyTorch's native .pt (pickle-based), SafeTensors (safer, faster), and ONNX (framework-agnostic). Each has trade-offs: .pt is fastest to save/load but has security risks; SafeTensors is safer but has slower I/O on some systems; ONNX is portable but loses some precision info.
# PyTorch checkpointing with optimizer state
import torch
checkpoint = {
"epoch": epoch,
"model_state_dict": model.state_dict(),
"optimizer_state_dict": optimizer.state_dict(),
"loss": loss,
"config": config.to_dict()
}
torch.save(checkpoint, f"checkpoint_epoch_{epoch}.pt")
# Load and resume training
loaded = torch.load("checkpoint_epoch_10.pt")
model.load_state_dict(loaded["model_state_dict"])
optimizer.load_state_dict(loaded["optimizer_state_dict"])
start_epoch = loaded["epoch"] + 1In distributed training, each GPU process must save its shard. Coordinate saves to avoid file conflicts using a single "rank 0" process, or use distributed-aware libraries (PyTorch DDP, Hugging Face Accelerate) that handle sharding automatically.
# Distributed checkpoint with DDP
from torch.distributed import get_rank, barrier
def save_checkpoint_distributed(model, optimizer, epoch, path):
# All processes wait before saving
barrier()
if get_rank() == 0: # Only rank 0 saves
checkpoint = {
"epoch": epoch,
"model": model.module.state_dict(), # .module for DDP wrapper
"optimizer": optimizer.state_dict()
}
torch.save(checkpoint, path)
barrier() # All processes wait for rank 0 to finishKeeping every checkpoint wastes storage. Typically keep the last 3–5 checkpoints and the best (by validation metric). Delete stale checkpoints after 7 days or when disk usage exceeds a threshold. Checksums on checkpoints help detect corruption; if a checkpoint is corrupted, you have backups to fall back to.
| Policy | Frequency | Retention | Storage (GB) | Use Case |
|---|---|---|---|---|
| Last N | Every 100 steps | Keep last 5 | 5 × 2GB = 10GB | Quick recovery from recent crash |
| Best Metric | Every epoch if improves | Keep 1 (best val loss) | 2GB | Model serving, final artifact |
| Hourly | Every 1 hour | Keep 24 | 24 × 2GB = 48GB | Long training runs, insurance |
| Daily | Once per day | Keep 30 | 30 × 2GB = 60GB | Very long runs (weeks/months) |
Checkpoint Versioning: Encode the PyTorch version, CUDA version, and model architecture version into each checkpoint metadata. Loading a checkpoint saved with PyTorch 2.0 into PyTorch 1.13 may silently produce incorrect results due to operator changes. Always validate that checkpoint metadata matches your current environment before loading.
For multi-GPU training on cloud instances, save checkpoints to cloud storage (S3, GCS) rather than local EBS/persistent disks. This protects against instance failure and enables easy sharing across teams. Use signed URLs with time-limited access for security.
Checkpoint Compression & Deduplication: Model checkpoints can be gigabytes large. Reduce size via compression (ZStandard gives ~50% reduction), or via deduplication (only store changed parameters, not the whole model). For distributed training, each GPU has a shard; combine shards into a single checkpoint for easier management. Use sparse tensor formats (COO, CSR) if models have structured sparsity, further reducing checkpoint size. On-the-fly compression during save/load incurs CPU overhead; benchmark the trade-off (storage cost vs. I/O latency) for your cluster.
Cloud storage costs accumulate; implement automated checkpoint cleanup: delete checkpoints older than 7 days (keeping only last 3 + best), and archive older checkpoints to cold storage (Glacier) after 30 days. Use cloud provider lifecycle policies to automate tiering. For critical training runs, maintain a "golden checkpoint" in cold storage indefinitely for legal/compliance reasons, even after the model is fully trained and retired.
Monitoring and observability are essential for production systems. Set up comprehensive logging at every layer: API requests, model predictions, database queries, cache hits/misses. Use structured logging (JSON) to enable filtering and aggregation across thousands of servers. For production deployments, track not just errors but also latency percentiles (p50, p95, p99); if p99 latency suddenly doubles, something is wrong even if error rates are normal. Set up alerting based on SLO violations: if a service is supposed to have 99.9% availability and it drops to 99.5%, alert immediately. Use distributed tracing (Jaeger, Lightstep) to track requests across multiple services; a slow end-to-end latency might be hidden in one deep service call, invisible in aggregate metrics.
For long-running ML jobs (training, batch inference), implement checkpoint recovery and graceful degradation. If a training job crashes after 2 weeks, you want to resume from the last checkpoint, not restart from scratch. Implement job orchestration with Kubernetes or Airflow to handle retries, resource allocation, and dependency management. Use feature flags for safe deployment: deploy new model versions behind a flag that's off by default, gradually roll out to 1% of users, 10%, then 100%, monitoring metrics at each step. If something goes wrong, flip the flag back instantly. This approach reduces risk and enables fast rollback.
Finally, build a culture of incident response and post-mortems. When something breaks (and it will), document the incident: timeline, root cause, mitigation steps, and preventive measures. Use incidents as learning opportunities; blameless post-mortems focus on systems, not people. Share findings across teams to prevent repeat incidents. A well-documented incident history is an organization's institutional knowledge about system failures and how to avoid them.
The rapid evolution of AI infrastructure requires continuous learning and adaptation. Teams should establish regular tech talks and knowledge-sharing sessions where engineers present lessons learned from production deployments, performance optimization work, and incident postmortems. Create internal wiki pages documenting best practices specific to your organization: how to debug common failure modes, performance tuning guides for your hardware, and checklists for safe deployments. This prevents repeating mistakes and accelerates onboarding of new team members.
Build relationships with vendors and open-source communities. If you encounter bugs in frameworks (PyTorch, JAX), file detailed reports. If you have questions, ask on forums; community members often have encountered similar issues. For mission-critical infrastructure, consider purchasing support contracts with vendors (PyTorch, HuggingFace, cloud providers). Support gives you direct access to engineers who understand your system and can prioritize fixes. This is insurance against production outages caused by third-party software bugs.
Finally, remember that optimization is a journey, not a destination. Today's cutting-edge technique becomes tomorrow's baseline. Allocate 10-15% of engineering time to exploration and experimentation. Some experiments will fail, but successful ones compound into significant efficiency gains. Foster a culture of continuous improvement: measure, analyze, iterate, and share results. The teams that stay ahead are those that invest in understanding their systems deeply and adapting proactively to new technologies and changing demands.
Key Takeaway: Success in GenAI infrastructure depends on mastering fundamentals: understand your hardware constraints, profile your workloads, measure everything, and iterate. The most sophisticated techniques (dynamic batching, mixed precision, distributed training) build on solid foundations of clear thinking and empirical validation. Avoid cargo-cult engineering: if you don't understand why a technique helps your specific use case, it probably won't. Invest time in understanding root causes, not just applying trendy solutions. Over time, this rigor will compound into significant competitive advantage.