Production Engineering

Checkpointing

Saving intermediate agent and pipeline state at key execution points so that long-running tasks can resume after failures without restarting from scratch.

Recovery time
seconds vs minutes
Storage overhead
low
Frameworks
LangGraph, custom Redis

Table of Contents

SECTION 01

Why Checkpointing?

A 10-step agentic pipeline that fails at step 9 without checkpointing restarts from step 1, wasting compute and time. Checkpointing saves the output of each completed step so on retry, execution resumes from the last successful checkpoint. Critical for: long research tasks, multi-tool agents, batch processing jobs.

SECTION 02

Checkpoint Granularity

Too coarse (checkpoint only at end): no benefit. Too fine (every token): overhead overwhelms. Optimal: checkpoint at natural step boundaries — after each tool call, after each sub-task completion, before expensive operations. For batch jobs, checkpoint every N items (N=100 is typical).

SECTION 03

Storage Backends

Redis: fast, ephemeral — good for intra-session checkpoints. " "Postgres/SQLite: persistent, queryable — good for multi-session tasks. " "S3/blob storage: for large intermediate artifacts (generated documents, retrieved corpora).

import json, redis, hashlib
r = redis.Redis()
def save_checkpoint(task_id: str, step: int, state: dict, ttl: int = 3600):
    key = f"checkpoint:{task_id}:{step}"
    r.set(key, json.dumps(state), ex=ttl)
    # Track latest checkpoint
    r.set(f"checkpoint:{task_id}:latest", step, ex=ttl)
    print(f"Checkpoint saved: step {step}")
def load_latest_checkpoint(task_id: str) -> tuple[int, dict]:
    latest = r.get(f"checkpoint:{task_id}:latest")
    if not latest:
        return 0, {}
    step = int(latest)
    state = json.loads(r.get(f"checkpoint:{task_id}:{step}"))
    return step, state
SECTION 04

Resumption Logic

On task start, check for an existing checkpoint. If found, resume from that step. " "If not found, start from step 0. Pass the checkpoint state to downstream steps.

async def run_pipeline(task_id: str, inputs: dict) -> dict:
    start_step, state = load_latest_checkpoint(task_id)
    if start_step > 0:
        print(f"Resuming from step {start_step}")
    else:
        state = {"inputs": inputs, "results": {}}
steps = [step_retrieve, step_analyze, step_generate, step_validate, step_format]
    for i, step_fn in enumerate(steps):
        if i < start_step:
            continue  # skip already-completed steps
        state = await step_fn(state)
        save_checkpoint(task_id, i + 1, state)
return state["results"]
SECTION 05

Idempotency Requirements

Each step must be idempotent: running it twice with the same input produces the same output. For LLM calls, use temperature=0 and store the output hash to detect re-runs. For external API calls (web search, database writes), use idempotency keys. Non-idempotent steps (file writes, emails) must check 'already done' before executing.

SECTION 06

LangGraph Integration

LangGraph has built-in checkpointing via a Checkpointer interface. SqliteSaver, PostgresSaver, and RedisSaver are provided out of the box. Pass a checkpointer to your graph at compile time; LangGraph saves state after each node. On re-run with the same thread_id, execution resumes from the last saved state automatically.

SECTION 07

Checkpoint Formats & Serialization

Checkpoints store model weights, optimizer state, and training metadata. Common formats include PyTorch's native .pt (pickle-based), SafeTensors (safer, faster), and ONNX (framework-agnostic). Each has trade-offs: .pt is fastest to save/load but has security risks; SafeTensors is safer but has slower I/O on some systems; ONNX is portable but loses some precision info.

# PyTorch checkpointing with optimizer state
import torch

checkpoint = {
    "epoch": epoch,
    "model_state_dict": model.state_dict(),
    "optimizer_state_dict": optimizer.state_dict(),
    "loss": loss,
    "config": config.to_dict()
}
torch.save(checkpoint, f"checkpoint_epoch_{epoch}.pt")

# Load and resume training
loaded = torch.load("checkpoint_epoch_10.pt")
model.load_state_dict(loaded["model_state_dict"])
optimizer.load_state_dict(loaded["optimizer_state_dict"])
start_epoch = loaded["epoch"] + 1

Distributed Training Checkpoints

In distributed training, each GPU process must save its shard. Coordinate saves to avoid file conflicts using a single "rank 0" process, or use distributed-aware libraries (PyTorch DDP, Hugging Face Accelerate) that handle sharding automatically.

# Distributed checkpoint with DDP
from torch.distributed import get_rank, barrier

def save_checkpoint_distributed(model, optimizer, epoch, path):
    # All processes wait before saving
    barrier()

    if get_rank() == 0:  # Only rank 0 saves
        checkpoint = {
            "epoch": epoch,
            "model": model.module.state_dict(),  # .module for DDP wrapper
            "optimizer": optimizer.state_dict()
        }
        torch.save(checkpoint, path)

    barrier()  # All processes wait for rank 0 to finish
SECTION 08

Checkpoint Management & Recovery

Keeping every checkpoint wastes storage. Typically keep the last 3–5 checkpoints and the best (by validation metric). Delete stale checkpoints after 7 days or when disk usage exceeds a threshold. Checksums on checkpoints help detect corruption; if a checkpoint is corrupted, you have backups to fall back to.

Policy Frequency Retention Storage (GB) Use Case
Last N Every 100 steps Keep last 5 5 × 2GB = 10GB Quick recovery from recent crash
Best Metric Every epoch if improves Keep 1 (best val loss) 2GB Model serving, final artifact
Hourly Every 1 hour Keep 24 24 × 2GB = 48GB Long training runs, insurance
Daily Once per day Keep 30 30 × 2GB = 60GB Very long runs (weeks/months)

Checkpoint Versioning: Encode the PyTorch version, CUDA version, and model architecture version into each checkpoint metadata. Loading a checkpoint saved with PyTorch 2.0 into PyTorch 1.13 may silently produce incorrect results due to operator changes. Always validate that checkpoint metadata matches your current environment before loading.

For multi-GPU training on cloud instances, save checkpoints to cloud storage (S3, GCS) rather than local EBS/persistent disks. This protects against instance failure and enables easy sharing across teams. Use signed URLs with time-limited access for security.

Checkpoint Compression & Deduplication: Model checkpoints can be gigabytes large. Reduce size via compression (ZStandard gives ~50% reduction), or via deduplication (only store changed parameters, not the whole model). For distributed training, each GPU has a shard; combine shards into a single checkpoint for easier management. Use sparse tensor formats (COO, CSR) if models have structured sparsity, further reducing checkpoint size. On-the-fly compression during save/load incurs CPU overhead; benchmark the trade-off (storage cost vs. I/O latency) for your cluster.

Cloud storage costs accumulate; implement automated checkpoint cleanup: delete checkpoints older than 7 days (keeping only last 3 + best), and archive older checkpoints to cold storage (Glacier) after 30 days. Use cloud provider lifecycle policies to automate tiering. For critical training runs, maintain a "golden checkpoint" in cold storage indefinitely for legal/compliance reasons, even after the model is fully trained and retired.

Monitoring and observability are essential for production systems. Set up comprehensive logging at every layer: API requests, model predictions, database queries, cache hits/misses. Use structured logging (JSON) to enable filtering and aggregation across thousands of servers. For production deployments, track not just errors but also latency percentiles (p50, p95, p99); if p99 latency suddenly doubles, something is wrong even if error rates are normal. Set up alerting based on SLO violations: if a service is supposed to have 99.9% availability and it drops to 99.5%, alert immediately. Use distributed tracing (Jaeger, Lightstep) to track requests across multiple services; a slow end-to-end latency might be hidden in one deep service call, invisible in aggregate metrics.

For long-running ML jobs (training, batch inference), implement checkpoint recovery and graceful degradation. If a training job crashes after 2 weeks, you want to resume from the last checkpoint, not restart from scratch. Implement job orchestration with Kubernetes or Airflow to handle retries, resource allocation, and dependency management. Use feature flags for safe deployment: deploy new model versions behind a flag that's off by default, gradually roll out to 1% of users, 10%, then 100%, monitoring metrics at each step. If something goes wrong, flip the flag back instantly. This approach reduces risk and enables fast rollback.

Finally, build a culture of incident response and post-mortems. When something breaks (and it will), document the incident: timeline, root cause, mitigation steps, and preventive measures. Use incidents as learning opportunities; blameless post-mortems focus on systems, not people. Share findings across teams to prevent repeat incidents. A well-documented incident history is an organization's institutional knowledge about system failures and how to avoid them.

The rapid evolution of AI infrastructure requires continuous learning and adaptation. Teams should establish regular tech talks and knowledge-sharing sessions where engineers present lessons learned from production deployments, performance optimization work, and incident postmortems. Create internal wiki pages documenting best practices specific to your organization: how to debug common failure modes, performance tuning guides for your hardware, and checklists for safe deployments. This prevents repeating mistakes and accelerates onboarding of new team members.

Build relationships with vendors and open-source communities. If you encounter bugs in frameworks (PyTorch, JAX), file detailed reports. If you have questions, ask on forums; community members often have encountered similar issues. For mission-critical infrastructure, consider purchasing support contracts with vendors (PyTorch, HuggingFace, cloud providers). Support gives you direct access to engineers who understand your system and can prioritize fixes. This is insurance against production outages caused by third-party software bugs.

Finally, remember that optimization is a journey, not a destination. Today's cutting-edge technique becomes tomorrow's baseline. Allocate 10-15% of engineering time to exploration and experimentation. Some experiments will fail, but successful ones compound into significant efficiency gains. Foster a culture of continuous improvement: measure, analyze, iterate, and share results. The teams that stay ahead are those that invest in understanding their systems deeply and adapting proactively to new technologies and changing demands.

Key Takeaway: Success in GenAI infrastructure depends on mastering fundamentals: understand your hardware constraints, profile your workloads, measure everything, and iterate. The most sophisticated techniques (dynamic batching, mixed precision, distributed training) build on solid foundations of clear thinking and empirical validation. Avoid cargo-cult engineering: if you don't understand why a technique helps your specific use case, it probably won't. Invest time in understanding root causes, not just applying trendy solutions. Over time, this rigor will compound into significant competitive advantage.