A central planner agent that decomposes a complex task into subtasks, routes each to a specialised sub-agent, and assembles the results β separating coordination from execution.
In a flat multi-agent system, agents communicate peer-to-peer or via a shared message bus. Every agent needs to understand the full context of the system. As the number of agents grows, coordination complexity grows quadratically.
The orchestrator pattern introduces a hierarchy: one agent (the orchestrator) owns the task decomposition and coordination; specialist agents (workers) each do one thing well and don't need to know about each other.
This mirrors real organisations: a project manager doesn't need every team member to communicate with every other team member. The PM breaks the project into work items, assigns them to the right people, and assembles the deliverables. Workers focus on their assigned tasks without global context.
Benefits: simpler worker agents (narrower context = better performance), reusable workers across different orchestrators, easier debugging (track the orchestrator's decision log), and controlled quality gates before assembly.
User request
β
βΌ
βββββββββββββββββββββββββββ
β ORCHESTRATOR β
β 1. Parse the task β
β 2. Plan subtasks β
β 3. Route to workers β
β 4. Collect results β
β 5. Validate quality β
β 6. Assemble response β
βββββββββββ¬ββββββββββββββββ
β assigns subtasks
βββββββ΄βββββββββββββββββββββββββββ
β β β
βββββΌβββββ ββββββΌβββββ βββββββββββΌβββ
βResearcherβ β Writer β βFact-checkerβ
β Agent β β Agent β β Agent β
ββββββββββββ ββββββββββββ ββββββββββββββ
The orchestrator can be synchronous (route to Worker A β get result β route to Worker B using A's output) or parallel (route to A, B, C simultaneously β wait for all β assemble). The right approach depends on whether subtasks have dependencies.
import anthropic
import json
client = anthropic.Anthropic()
# Worker agents β each is a simple, focused LLM call
def research_agent(topic: str) -> str:
return client.messages.create(
model="claude-haiku-4-5-20251001", max_tokens=512,
system="You are a research assistant. Provide factual, concise information.",
messages=[{"role": "user", "content": f"Research: {topic}"}]
).content[0].text
def writer_agent(topic: str, research: str) -> str:
return client.messages.create(
model="claude-haiku-4-5-20251001", max_tokens=1024,
system="You are a technical writer. Write clear, engaging prose.",
messages=[{"role": "user", "content": f"Write a section about {topic} using this research:
{research}"}]
).content[0].text
def fact_check_agent(content: str) -> str:
return client.messages.create(
model="claude-haiku-4-5-20251001", max_tokens=256,
system="You are a fact-checker. Identify any claims that may be incorrect.",
messages=[{"role": "user", "content": f"Fact-check this content:
{content}"}]
).content[0].text
# Orchestrator
def orchestrator(user_request: str) -> str:
# Step 1: Plan
plan_prompt = f'''Break this request into subtasks for: researcher, writer, fact-checker.
Output JSON: {{"subtasks": [{{"agent": "researcher/writer/fact_checker", "input": "..."}}]}}
Request: {user_request}'''
plan_resp = client.messages.create(
model="claude-sonnet-4-5", max_tokens=512,
messages=[{"role": "user", "content": plan_prompt}]
)
plan_text = plan_resp.content[0].text
# Extract JSON from response
import re
json_match = re.search(r'\{.*\}', plan_text, re.DOTALL)
plan = json.loads(json_match.group()) if json_match else {"subtasks": []}
# Step 2: Execute
results = {}
for task in plan.get("subtasks", []):
agent, inp = task["agent"], task["input"]
if agent == "researcher":
results["research"] = research_agent(inp)
elif agent == "writer":
research = results.get("research", "")
results["draft"] = writer_agent(inp, research)
elif agent == "fact_checker":
draft = results.get("draft", inp)
results["fact_check"] = fact_check_agent(draft)
# Step 3: Assemble
assembly_prompt = f"Assemble a final response from:
Research: {results.get('research', '')}
Draft: {results.get('draft', '')}
Fact-check notes: {results.get('fact_check', '')}"
return client.messages.create(
model="claude-sonnet-4-5", max_tokens=1024,
messages=[{"role": "user", "content": assembly_prompt}]
).content[0].text
result = orchestrator("Write a short technical explainer on how HTTPS works.")
print(result)
Instead of hardcoding which agents exist, the orchestrator can dynamically select from a registry of available agents based on the task:
AGENT_REGISTRY = {
"researcher": {
"description": "Research factual information about a topic",
"fn": research_agent
},
"coder": {
"description": "Write Python code to solve a programming problem",
"fn": lambda task: client.messages.create(
model="claude-sonnet-4-5", max_tokens=1024,
system="Write clean, well-commented Python code.",
messages=[{"role": "user", "content": task}]
).content[0].text
},
"summariser": {
"description": "Summarise long text into key points",
"fn": lambda task: client.messages.create(
model="claude-haiku-4-5-20251001", max_tokens=256,
system="Extract the 3-5 key points.",
messages=[{"role": "user", "content": task}]
).content[0].text
}
}
def route(task: str) -> str:
'''Ask the orchestrator which agent to use for a task.'''
agent_list = "
".join(f"- {k}: {v['description']}" for k, v in AGENT_REGISTRY.items())
resp = client.messages.create(
model="claude-haiku-4-5-20251001", max_tokens=64,
messages=[{"role": "user", "content": f"Which agent should handle: '{task}'?
Agents:
{agent_list}
Respond with just the agent name."}]
)
agent_name = resp.content[0].text.strip().lower()
return AGENT_REGISTRY.get(agent_name, AGENT_REGISTRY["researcher"])["fn"](task)
For long-running orchestration, stream intermediate results so users see progress rather than a blank screen:
from typing import Generator
def streaming_orchestrator(request: str) -> Generator[str, None, None]:
yield "Planning subtasks...\n"
subtasks = plan_subtasks(request)
for i, task in enumerate(subtasks):
yield f"\n[{i+1}/{len(subtasks)}] Running {task['agent']}...\n"
result = run_agent(task)
yield f"Done: {result[:100]}...\n"
yield "\nAssembling final response...\n"
final = assemble_results(subtasks)
yield f"\n{final}"
# Usage
for chunk in streaming_orchestrator("Write a technical blog post about LLMs"):
print(chunk, end="", flush=True)
Good fits: tasks that naturally decompose into parallel independent subtasks (research + write + illustrate), tasks with sequential dependencies (plan β validate β execute), workflows with quality gates (draft β critique β revise β publish), and cases where different subtasks benefit from different model sizes or specialisations.
Poor fits: simple tasks that a single LLM call handles adequately (orchestration overhead isn't worth it), tightly coupled tasks where the split is artificial, and latency-critical applications where adding an orchestrator adds 2-3 extra LLM round-trips.
A useful heuristic: if you can't clearly articulate what each worker agent does and why a human would assign those tasks to different specialists, you probably don't need an orchestrator.
The orchestrator becomes the bottleneck. Every subtask waits for the orchestrator to process its result before the next step. Use async execution and parallel fan-out wherever subtasks are independent β don't serialize unnecessarily.
Information loss at assembly. The orchestrator sees summaries or excerpts of each worker's output, not the full raw output. Important details can fall through the cracks. Pass structured data (JSON/dict) between agents rather than plain text whenever possible, so the assembler has access to all fields.
Error handling must be explicit. If Worker A fails, the orchestrator needs a clear recovery path: retry, use a fallback agent, skip the subtask, or abort the whole workflow. Don't assume workers succeed β always handle errors at the orchestration layer.
| Design Decision | Options | Recommendation |
|---|---|---|
| Task decomposition | Static (hardcoded) vs Dynamic (LLM-decided) | Static for known workflows; dynamic for open-ended tasks |
| Worker communication | Direct call vs message queue | Direct for latency-sensitive; queue for reliability |
| Error handling | Fail fast vs partial results | Partial results with confidence scores for user-facing APIs |
| Result aggregation | Concat vs synthesise with LLM | Synthesise for coherent output; concat for audit trails |
| Human oversight | None vs checkpoint vs full review | Checkpoint before irreversible actions |
The orchestrator should be the only component with global state β individual workers should be stateless and rerunnable. This makes the system debuggable: if a worker fails, you can replay exactly its inputs from the orchestrator's log without re-running earlier stages. Log the full task decomposition and all worker inputs/outputs at DEBUG level; this data is invaluable when diagnosing why the final output diverged from expectations.
Keep orchestrator state minimal: store only task inputs, outputs, and status. Delegate all business logic to workers. Checkpoint state after every worker completion, not just at the end of the full orchestration, so a failed orchestrator instance can be replaced and any task resumed from the last persisted checkpoint without replaying the entire workflow.
Implement structured logging in your orchestrator to enable post-hoc workflow analysis. Each log entry should include: orchestration ID, step number, worker ID, input token count, output token count, latency, and success/failure status. This lets you run cost attribution queries across orchestration runs, identify which worker types are the most expensive or slowest, and build dashboards that show quality trends correlated with orchestration complexity.