The simplest agentic workflow: each step passes its output to the next as input. The LLM pipeline version of a Unix pipe — composable, debuggable, and the right default before reaching for more complex patterns.
A sequential chain is the simplest multi-step agent pattern: Step 1 runs and produces output; Step 2 takes that output as input; Step 3 takes Step 2's output; and so on. It's a pipeline — the LLM equivalent of a Unix pipe: extract | transform | summarise | format.
Despite being simple, sequential chains cover a surprising range of real workflows: scrape a webpage → extract key facts → verify against a knowledge base → generate a structured report. Each step is a focused LLM call with a clear input and output contract.
The advantages are significant: each step is independently debuggable (run it in isolation with sample input), each step can use a different model (use cheap Haiku for extraction, use Sonnet only for the final synthesis), and the chain is easy to extend (add a step anywhere in the pipeline).
import anthropic
client = anthropic.Anthropic()
def call(system: str, user: str, model: str = "claude-haiku-4-5-20251001") -> str:
return client.messages.create(
model=model, max_tokens=1024,
system=system,
messages=[{"role": "user", "content": user}]
).content[0].text
# 4-step pipeline: raw text → structured → verified → formatted report
def research_pipeline(raw_text: str) -> str:
# Step 1: Extract key facts
facts = call(
system="Extract key facts as a bulleted list. Be concise.",
user=f"Extract facts from:
{raw_text}"
)
print(f"Step 1 — Facts extracted ({len(facts)} chars)")
# Step 2: Assess credibility
assessment = call(
system="For each fact, rate credibility: [HIGH/MEDIUM/LOW] with one-line reason.",
user=f"Assess these facts:
{facts}"
)
print(f"Step 2 — Credibility assessed")
# Step 3: Filter to high-credibility facts
filtered = call(
system="Keep only HIGH credibility facts. Rewrite as clean sentences.",
user=f"Filter these assessed facts:
{assessment}"
)
print(f"Step 3 — Filtered to verified facts")
# Step 4: Format as executive summary (use better model for final output)
report = call(
system="Write a concise executive summary. Use professional business language.",
user=f"Summarise these verified facts:
{filtered}",
model="claude-sonnet-4-5"
)
print(f"Step 4 — Report generated")
return report
result = research_pipeline("A large dump of raw text from a news article...")
print(result)
Untyped chains pass raw strings between steps, making them fragile — one malformed output breaks all downstream steps. Typed chains use Pydantic models as the contract between steps:
from pydantic import BaseModel
from typing import Literal
import anthropic, json
client = anthropic.Anthropic()
class ExtractedFact(BaseModel):
claim: str
source: str
confidence: Literal["high", "medium", "low"]
class FactList(BaseModel):
facts: list[ExtractedFact]
total_count: int
def extract_facts(text: str) -> FactList:
response = client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=1024,
system='''Extract facts as JSON matching this schema:
{"facts": [{"claim": "...", "source": "...", "confidence": "high|medium|low"}],
"total_count": N}''',
messages=[{"role": "user", "content": text}]
)
raw = response.content[0].text
# Parse and validate with Pydantic
data = json.loads(raw)
return FactList(**data)
def filter_facts(fact_list: FactList) -> FactList:
high_confidence = [f for f in fact_list.facts if f.confidence == "high"]
return FactList(facts=high_confidence, total_count=len(high_confidence))
def generate_report(fact_list: FactList) -> str:
facts_text = "
".join(f"- {f.claim} (source: {f.source})" for f in fact_list.facts)
return client.messages.create(
model="claude-sonnet-4-5", max_tokens=1024,
messages=[{"role": "user", "content": f"Write a report from:
{facts_text}"}]
).content[0].text
# Typed pipeline — each step has a clear, validated contract
facts = extract_facts("Some text...")
verified = filter_facts(facts)
report = generate_report(verified)
from langchain_anthropic import ChatAnthropic
from langchain.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
llm = ChatAnthropic(model="claude-haiku-4-5-20251001")
parser = StrOutputParser()
# Step 1: Extract
extract_prompt = ChatPromptTemplate.from_messages([
("system", "Extract the key facts as a bulleted list."),
("human", "{raw_text}")
])
extract_chain = extract_prompt | llm | parser
# Step 2: Summarise the extracted facts
summarise_prompt = ChatPromptTemplate.from_messages([
("system", "Summarise these facts for a non-technical executive."),
("human", "{facts}")
])
summarise_chain = summarise_prompt | llm | parser
# Compose into a sequential pipeline using the pipe operator
# Note: intermediate variable names are passed automatically
full_chain = (
{"facts": extract_chain} # extract_chain output → "facts" key
| summarise_chain
)
result = full_chain.invoke({"raw_text": "Raw article text..."})
print(result)
LangChain's | operator chains runnables sequentially. The output of one step becomes the input to the next. Use RunnablePassthrough to pass multiple values through and RunnableParallel to fan out.
Sequential chains are the right default for most multi-step tasks. Start here before considering more complex patterns (orchestrator, parallel, graph-based).
Good fits: extract-transform-load (ETL) tasks, document processing pipelines (parse → chunk → enrich → index), content generation (research → outline → draft → edit → format), and any task with a clear sequence of transformations where each step depends on the previous.
Upgrade to parallel when: multiple steps in the sequence are independent and you care about latency (e.g., "gather from source A and source B and source C simultaneously, then synthesise").
Upgrade to an orchestrator when: the sequence itself is dynamic (you don't know upfront which steps are needed) or different subtasks need meaningfully different specialists.
class StepOutput(BaseModel):
content: str
quality_score: float # 0.0 - 1.0
issues: list[str]
def validate_step(output: str, criteria: list[str]) -> StepOutput:
'''Use LLM to validate output quality before passing to next step.'''
criteria_str = "
".join(f"- {c}" for c in criteria)
response = client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=512,
system=f'''Evaluate content quality (0.0-1.0) against criteria.
Return JSON: {{"quality_score": 0.0-1.0, "issues": ["..."]}}''',
messages=[{"role": "user", "content": f"Criteria:
{criteria_str}
Content:
{output}"}]
)
import json
data = json.loads(response.content[0].text)
return StepOutput(content=output, **data)
# Pipeline with validation gates
facts = extract_facts(raw_text)
validation = validate_step(
str(facts.model_dump()),
criteria=["At least 3 facts extracted", "All facts have a source", "No duplicates"]
)
if validation.quality_score < 0.7:
raise ValueError(f"Extraction quality too low: {validation.issues}")
# Continue to next step only if quality passes
report = generate_report(filter_facts(facts))
Context window debt accumulates. Each step that concatenates "previous output + new prompt" grows the context. By step 5, you might be sending 10,000 tokens of intermediate outputs. Be deliberate: pass only what the next step actually needs, not the full history of all prior steps.
Error propagation is silent. If Step 2 produces slightly wrong output, Step 3 may still run successfully with subtly wrong input, producing confidently wrong output at Step 4. Add validation steps at key points to catch problems early rather than at the end.
Latency is additive. A 5-step chain where each step takes 2 seconds takes 10 seconds minimum. For user-facing applications, stream intermediate results so users see progress, or use async chains so other work can proceed while waiting.
| Pattern | Data Flow | Error Handling | Best For |
|---|---|---|---|
| Linear pipe | A → B → C | Fail on first error | Simple transformation pipelines |
| Linear with validation | A → validate → B | Retry or branch on schema error | Production pipelines with quality gates |
| Reduce (map-reduce) | A1..N → merge → B | Partial results on failure | Long document processing |
| Accumulate | A feeds state into B, B into C | State rollback on error | Iterative refinement |
In production sequential chains, instrument each step with timing and token-count logging. Over time this data reveals which steps are the bottlenecks (slow steps to optimise) and which are the cost drivers (high token-count steps to simplify). A step that consistently adds tokens without improving final output quality is a candidate for removal or replacement with a deterministic function. Map this instrumentation to your cost tracking to calculate per-step ROI.
For long sequential chains (5+ steps), implement a checkpoint-and-resume pattern: persist intermediate outputs to a key-value store after each step, keyed by a chain run ID. If the chain fails at step 4, you can resume from step 3's output rather than restarting from scratch. This is especially valuable for chains with expensive upstream steps (large document processing, web search). Retention policy: keep intermediate checkpoints for 24 hours to enable human debugging, then expire them to control storage costs.
When debugging a sequential chain with incorrect outputs, bisect the chain by logging intermediate outputs and identifying the first step where quality diverges from expected. The error is usually not in the final step but in an earlier step whose output looks plausible but subtly misframes the task for subsequent steps. Fixing the first bad step is more impactful than adding validation at the end of the chain.
Add a cost attribution step at the end of each sequential chain run: sum the input and output tokens for each step and log the breakdown. Over time this reveals which steps account for the most spend. Typically, the synthesis or summarisation steps at the end of a chain consume the most tokens. Refactoring these steps to use map-reduce (summarise in chunks, then combine) rather than passing a full concatenated input often reduces costs by 40-60% with minimal quality impact.