Systems where multiple AI agents collaborate, each with specialised roles, tools, and context, coordinated by an orchestrator to tackle complex tasks.
A single LLM call has two constraints: context window (how much it can see at once) and attention (how well it can juggle many concerns simultaneously). A task like "research 10 competitors, analyse their pricing, and write a 50-page report" exceeds both.
Multi-agent systems solve this by dividing the task: one agent searches, another analyses, a third writes, a fourth edits. Each agent has a focused context and a single responsibility. An orchestrator coordinates them, passing results between agents and synthesising the final output.
Think of it like a consulting firm: the partner (orchestrator) defines the scope and assigns tasks; analysts, writers, and subject-matter experts (agents) do focused work; the partner assembles the final deliverable.
import anthropic
from concurrent.futures import ThreadPoolExecutor
client = anthropic.Anthropic()
def run_agent(role: str, task: str, context: str = "") -> str:
'''Run a specialist agent with a given role and task.'''
system = f"You are a {role}. Be concise and return only what was asked."
user_content = f"{context}\n\nTask: {task}" if context else f"Task: {task}"
response = client.messages.create(
model="claude-3-5-haiku-20241022",
max_tokens=1024,
system=system,
messages=[{"role": "user", "content": user_content}]
)
return response.content[0].text
def orchestrate(goal: str) -> str:
'''Orchestrator: plan tasks, run agents, synthesise results.'''
# 1. Planner agent creates subtask assignments
plan_response = run_agent(
"task planner",
f"Break this goal into 3 specialist subtasks. Return as a JSON list of {{'role': ..., 'task': ...}}\n\nGoal: {goal}"
)
import json, re
match = re.search(r'\[.*\]', plan_response, re.DOTALL)
subtasks = json.loads(match.group()) if match else []
# 2. Run specialist agents (parallel where independent)
results = {}
with ThreadPoolExecutor(max_workers=len(subtasks)) as pool:
futures = {pool.submit(run_agent, s["role"], s["task"]): s for s in subtasks}
for future, subtask in futures.items():
results[subtask["role"]] = future.result()
# 3. Synthesiser agent combines everything
context = "\n\n".join(f"## {role}\n{result}" for role, result in results.items())
return run_agent("senior analyst", f"Synthesise these specialist reports into a final answer for: {goal}", context)
result = orchestrate("Summarise the key differences between FastAPI, Flask, and Django for a senior developer.")
print(result)
import asyncio
from anthropic import AsyncAnthropic
async_client = AsyncAnthropic()
async def run_agent_async(role: str, task: str) -> tuple[str, str]:
response = await async_client.messages.create(
model="claude-3-5-haiku-20241022",
max_tokens=512,
system=f"You are a {role}.",
messages=[{"role": "user", "content": task}]
)
return role, response.content[0].text
async def parallel_research(topic: str) -> dict[str, str]:
'''Run multiple research agents simultaneously.'''
tasks = [
run_agent_async("technical analyst", f"Explain the technical architecture of {topic}"),
run_agent_async("market analyst", f"Describe the market position and competitors for {topic}"),
run_agent_async("risk analyst", f"Identify the main risks and limitations of {topic}"),
run_agent_async("trend forecaster", f"Predict where {topic} is heading in 2025"),
]
results = await asyncio.gather(*tasks)
return dict(results)
# Run all 4 agents concurrently — takes as long as the slowest, not sum of all
reports = asyncio.run(parallel_research("large language models"))
for role, report in reports.items():
print(f"\n=== {role.upper()} ===\n{report[:200]}...")
For sequential pipelines where Agent B needs Agent A's output, pass results as messages:
class AgentPipeline:
def __init__(self):
self.client = anthropic.Anthropic()
self.shared_state = {}
def run_stage(self, agent_name: str, role: str, task: str, inputs: list[str] = None) -> str:
# Build context from required inputs
context_parts = []
if inputs:
for key in inputs:
if key in self.shared_state:
context_parts.append(f"{key}: {self.shared_state[key]}")
context = "\n".join(context_parts)
result = run_agent(role, task, context)
self.shared_state[agent_name] = result
return result
pipeline = AgentPipeline()
# Stage 1: Research
pipeline.run_stage("research", "researcher",
"Find the top 3 Python web frameworks by GitHub stars.")
# Stage 2: Analysis (uses research output)
pipeline.run_stage("analysis", "technical analyst",
"Compare the frameworks from a scalability perspective.",
inputs=["research"])
# Stage 3: Recommendation (uses both)
final = pipeline.run_stage("recommendation", "senior architect",
"Write a one-paragraph recommendation for a startup.",
inputs=["research", "analysis"])
print(final)
Use multi-agent when:
Don't use multi-agent when:
Context loss between agents. Agent B gets only what you explicitly pass from Agent A. If A's reasoning matters, pass the reasoning — not just the conclusion.
Contradictions between agents. Two specialist agents may give conflicting information. Your orchestrator needs explicit conflict-resolution logic, or the synthesiser will produce an inconsistent final answer.
Cascading failures. If Agent A produces a bad output and Agent B trusts it, Agent C gets contaminated input. Add validation after each agent: check that outputs meet minimum quality criteria before passing them downstream.
Cost scales linearly with agents. 5 agents each making 3 API calls = 15 API calls per orchestration. Add up the token budgets before deploying. Use cheaper models (Haiku) for specialist agents and reserve expensive models (Sonnet) for the orchestrator and synthesiser.
Parallel agents can hit rate limits. Launching 20 agents simultaneously with large prompts may exceed your RPM or TPM quota. Use a semaphore to cap concurrency:
sem = asyncio.Semaphore(5) # max 5 concurrent agents
async def run_with_limit(role, task):
async with sem:
return await run_agent_async(role, task)
Agent roles must be orthogonal. If two agents have overlapping responsibilities ("summariser" and "report writer"), they'll duplicate work or contradict each other. Define clear, non-overlapping scopes for each agent role.
Multi-agent systems succeed when tasks are genuinely parallelisable or benefit from specialisation — a researcher agent, a writer agent, and a critic agent each doing what they're best at. They fail when coordination overhead dominates, when agents have inconsistent world-states, or when debugging becomes impossible because errors compound across agent boundaries.
The most reliable production pattern is the supervisor-worker hierarchy: one orchestrator agent decomposes the task, assigns subtasks to specialist workers, and synthesises results. This gives a single control point for monitoring, error handling, and human oversight. Avoid fully autonomous peer-to-peer agent communication in production — it's nearly impossible to audit or debug.
| Pattern | Structure | Best For | Pitfall |
|---|---|---|---|
| Orchestrator-Worker | 1 orchestrator → N workers | Parallelisable subtasks | Orchestrator becomes bottleneck |
| Pipeline | Agent A → Agent B → Agent C | Sequential transformation stages | Error propagation, no backtracking |
| Debate/Critic | Generator + Critic agents | Quality improvement, fact-checking | Agents may agree rather than debate |
| Specialist Pool | Router → domain specialist agents | Multi-domain queries | Routing errors send to wrong specialist |
| Peer-to-Peer | Agents communicate freely | Complex emergent behaviour | Very hard to debug, audit, or control |
Multi-agent systems are harder to test than single-agent systems because failures emerge from agent interactions, not just individual agent behaviour. A solid test strategy layers unit tests (each agent in isolation), integration tests (pairs of agents), and end-to-end tests (full orchestration with mocked external APIs).
Key things to assert in integration tests: the orchestrator correctly decomposes tasks; each worker receives complete, well-formed inputs; the synthesiser produces coherent output even when workers return conflicting results; and the entire pipeline completes within your latency budget. Use deterministic model stubs in unit tests to keep CI fast:
class StubAgent:
# Deterministic stand-in for a real LLM agent in unit tests.
def __init__(self, fixed_response: str):
self.fixed_response = fixed_response
self.call_count = 0
def run(self, task: str) -> str:
self.call_count += 1
return self.fixed_response
def test_orchestrator_calls_all_workers():
research = StubAgent("Research result: LLMs use transformers.")
analysis = StubAgent("Analysis result: cost is O(n^2).")
synthesis = StubAgent("Final: transformers are expensive.")
orchestrator = Orchestrator(
research_agent=research,
analysis_agent=analysis,
synthesis_agent=synthesis
)
result = orchestrator.run("Compare LLM architectures")
assert research.call_count == 1
assert analysis.call_count == 1
assert synthesis.call_count == 1
assert "transformers" in result.lower()
For latency regression testing, record a baseline wall-clock time for a representative orchestration run, then alert if any PR increases it by more than 20%. Multi-agent latency is dominated by the longest serial chain, so optimising the critical path yields the best returns.