Fan-out pattern: run independent agent subtasks concurrently and merge the results — dramatically reducing total latency when subtasks don't depend on each other.
A sequential agent that calls three tools one-by-one, each taking 2 seconds, takes 6 seconds total. If those tool calls are independent (the result of tool A doesn't affect what you pass to tool B), running them in parallel takes 2 seconds — a 3× latency improvement. As workflows grow, this difference compounds dramatically.
Modern LLMs are expensive to call but fast in parallel: you can fan out 10 simultaneous API calls and pay the same wall-clock time as a single call. This is one of the biggest practical optimisations in production agent systems.
The rule is simple: serialise tasks that depend on each other; parallelise everything else.
A subtask can run in parallel if it doesn't consume output from any other subtask in the same fan-out group. Common patterns:
Data gathering: search five different sources simultaneously instead of one-by-one. Each search is independent.
Multi-perspective analysis: ask three different specialist agents to evaluate the same document from different angles (legal, technical, financial). None needs the others' output to start.
Batch processing: summarise 20 documents in parallel rather than sequentially.
Validation: run multiple quality checks on a generated output simultaneously — grammar, factual accuracy, tone — and merge feedback.
Anything with a DAG dependency (A feeds B feeds C) cannot be parallelised within that chain, but independent chains within the DAG can run concurrently.
import asyncio
import anthropic
client = anthropic.Anthropic()
async def analyse_from_perspective(text: str, perspective: str) -> dict:
'''Analyse text from a specific expert perspective.'''
loop = asyncio.get_event_loop()
# Run blocking Anthropic call in thread pool
response = await loop.run_in_executor(None, lambda: client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=512,
system=f"You are a {perspective} expert. Analyse the following text from your perspective.",
messages=[{"role": "user", "content": text}]
))
return {"perspective": perspective, "analysis": response.content[0].text}
async def parallel_analysis(text: str) -> dict:
perspectives = ["legal", "technical", "financial", "security"]
# Fan-out: launch all tasks simultaneously
tasks = [analyse_from_perspective(text, p) for p in perspectives]
results = await asyncio.gather(*tasks) # wait for all to complete
# Fan-in: merge results
return {r["perspective"]: r["analysis"] for r in results}
# Run
async def main():
text = "Our new API will process 1M requests/day, storing all user queries in plain text logs."
analyses = await parallel_analysis(text)
for perspective, analysis in analyses.items():
print(f"
=== {perspective.upper()} ===")
print(analysis[:200])
asyncio.run(main())
asyncio.gather() is the core primitive: it schedules all coroutines concurrently and returns when all complete (or when the first fails, if you don't use return_exceptions=True).
For simpler scripts without asyncio, concurrent.futures.ThreadPoolExecutor gives you parallel execution with a familiar interface:
from concurrent.futures import ThreadPoolExecutor, as_completed
import anthropic
client = anthropic.Anthropic()
def summarise(doc: str, doc_id: int) -> dict:
response = client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=256,
messages=[{"role": "user", "content": f"Summarise in 2 sentences:
{doc}"}]
)
return {"id": doc_id, "summary": response.content[0].text}
documents = [
"Long document 1...",
"Long document 2...",
"Long document 3...",
# ... up to 20 documents
]
# Process all documents in parallel (up to 10 concurrent)
summaries = {}
with ThreadPoolExecutor(max_workers=10) as executor:
futures = {executor.submit(summarise, doc, i): i for i, doc in enumerate(documents)}
for future in as_completed(futures):
result = future.result()
summaries[result["id"]] = result["summary"]
print(f"Doc {result['id']} summarised.")
# Results in original order
ordered = [summaries[i] for i in range(len(documents))]
Use threads for I/O-bound work (API calls, DB queries). Use ProcessPoolExecutor only for CPU-bound work (rare in agent systems).
LangGraph supports parallel node execution natively via fan-out edges — when multiple nodes connect from the same source, LangGraph runs them in parallel:
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
class AnalysisState(TypedDict):
document: str
legal: str
technical: str
financial: str
final_report: str
def legal_agent(state: AnalysisState) -> AnalysisState:
# Analyse from legal perspective
return {"legal": f"Legal analysis of: {state['document'][:50]}..."}
def technical_agent(state: AnalysisState) -> AnalysisState:
return {"technical": f"Technical analysis of: {state['document'][:50]}..."}
def financial_agent(state: AnalysisState) -> AnalysisState:
return {"financial": f"Financial analysis of: {state['document'][:50]}..."}
def synthesiser(state: AnalysisState) -> AnalysisState:
report = f"REPORT
Legal: {state['legal']}
Tech: {state['technical']}
Financial: {state['financial']}"
return {"final_report": report}
builder = StateGraph(AnalysisState)
builder.add_node("legal", legal_agent)
builder.add_node("technical", technical_agent)
builder.add_node("financial", financial_agent)
builder.add_node("synthesiser", synthesiser)
# Fan-out from START to three parallel agents
builder.set_entry_point("legal")
builder.set_entry_point("technical")
builder.set_entry_point("financial")
# Fan-in: all three feed into synthesiser
builder.add_edge("legal", "synthesiser")
builder.add_edge("technical", "synthesiser")
builder.add_edge("financial", "synthesiser")
builder.add_edge("synthesiser", END)
graph = builder.compile()
When one of N parallel tasks fails, you have three strategies:
Fail-fast (default with asyncio.gather): if any task raises an exception, gather raises immediately and cancels the rest. Use this when all results are required — a missing perspective makes the final output invalid.
# Fail-fast: raises on first exception
results = await asyncio.gather(*tasks)
Collect all results: use return_exceptions=True to get a mix of results and exceptions, then handle each:
results = await asyncio.gather(*tasks, return_exceptions=True)
successful = [r for r in results if not isinstance(r, Exception)]
failed = [r for r in results if isinstance(r, Exception)]
print(f"{len(failed)} tasks failed: {failed}")
Timeout individual tasks: wrap each task with asyncio.wait_for(task, timeout=30.0) so a slow API call doesn't block the entire fan-in.
Rate limits multiply with parallelism. If your API allows 60 requests/minute and you fan out 100 tasks simultaneously, you'll hit rate limits immediately. Add a semaphore to cap concurrent requests:
sem = asyncio.Semaphore(10) # max 10 concurrent
async def limited_call(text: str):
async with sem:
return await analyse_from_perspective(text, "legal")
Cost scales linearly with parallelism. Running 20 agents in parallel costs 20× as much as running one. The latency improves but the total token cost stays the same. Monitor costs and only parallelise where latency genuinely matters to the user.
Order is not guaranteed. asyncio.gather() returns results in the same order as the input coroutines, but as_completed() returns in completion order. Be explicit about which you need — using as_completed when you need ordering will produce subtle bugs.
| Pattern | Python Primitive | Best For | Failure Handling |
|---|---|---|---|
| asyncio.gather | asyncio | I/O-bound async tasks (LLM calls) | First failure cancels all (default) |
| asyncio.gather(return_exceptions=True) | asyncio | Partial results acceptable | Returns exception objects, not raises |
| ThreadPoolExecutor | concurrent.futures | Sync code, CPU-light tasks | Future.exception() per task |
| ProcessPoolExecutor | concurrent.futures | CPU-bound tasks (embeddings, tokenisation) | Future.exception() per task |
| LangGraph parallel nodes | LangGraph | Agent fan-out with state merging | Node-level error handlers |
For LLM API fan-outs, always cap concurrency with a semaphore. Without a cap, launching 50 simultaneous LLM calls will likely hit your provider's RPM limit and cause a cascade of 429 errors that retry and compound. A semaphore of 10-20 concurrent calls provides good throughput on most API tier limits without hitting rate limits. Monitor the semaphore queue depth in production to detect when your concurrency cap is throttling throughput and adjust accordingly.
Collect and log the latency of each parallel branch separately. In production, parallel branches have high variance -- one slow branch (due to a long output or a model timeout) will block the merge step even if all other branches complete quickly. Identify the slow-branch patterns from your logs and either set per-branch timeouts or move slow branches to asynchronous post-processing rather than blocking the main result.
Parallel execution in LLM pipelines requires careful management of shared context. When multiple sub-agents run concurrently, they each consume tokens from the same rate limits, write results to shared memory, and may produce conflicting outputs that need reconciliation. A well-designed parallel orchestrator pre-allocates resources, detects conflicts in output schemas, and implements merge strategies before downstream nodes consume the aggregated results.
Fan-out / fan-in patterns are the most common parallel execution topology. The orchestrator fans out a task to N parallel workers, each solving a sub-problem independently. The fan-in step aggregates results — either by selecting the best output, merging all outputs into a unified answer, or feeding results into a synthesis prompt. The key design decision is whether synthesis happens at the model layer (another LLM call) or at the code layer (deterministic merge logic).