Multi-Agent

Orchestrator Pattern

A central planner agent that decomposes a complex task into subtasks, routes each to a specialised sub-agent, and assembles the results β€” separating coordination from execution.

Central
Coordination
Specialised
Sub-agents
Assembles
Final output

Table of Contents

SECTION 01

Orchestrator vs flat multi-agent

In a flat multi-agent system, agents communicate peer-to-peer or via a shared message bus. Every agent needs to understand the full context of the system. As the number of agents grows, coordination complexity grows quadratically.

The orchestrator pattern introduces a hierarchy: one agent (the orchestrator) owns the task decomposition and coordination; specialist agents (workers) each do one thing well and don't need to know about each other.

This mirrors real organisations: a project manager doesn't need every team member to communicate with every other team member. The PM breaks the project into work items, assigns them to the right people, and assembles the deliverables. Workers focus on their assigned tasks without global context.

Benefits: simpler worker agents (narrower context = better performance), reusable workers across different orchestrators, easier debugging (track the orchestrator's decision log), and controlled quality gates before assembly.

SECTION 02

The orchestrator loop

User request
      β”‚
      β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚      ORCHESTRATOR       β”‚
β”‚  1. Parse the task      β”‚
β”‚  2. Plan subtasks       β”‚
β”‚  3. Route to workers    β”‚
β”‚  4. Collect results     β”‚
β”‚  5. Validate quality    β”‚
β”‚  6. Assemble response   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
          β”‚ assigns subtasks
    β”Œβ”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
    β”‚             β”‚                  β”‚
β”Œβ”€β”€β”€β–Όβ”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”
β”‚Researcherβ”‚  β”‚  Writer  β”‚  β”‚Fact-checkerβ”‚
β”‚  Agent   β”‚  β”‚  Agent   β”‚  β”‚   Agent    β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

The orchestrator can be synchronous (route to Worker A β†’ get result β†’ route to Worker B using A's output) or parallel (route to A, B, C simultaneously β†’ wait for all β†’ assemble). The right approach depends on whether subtasks have dependencies.

SECTION 03

Building a basic orchestrator

import anthropic
import json

client = anthropic.Anthropic()

# Worker agents β€” each is a simple, focused LLM call
def research_agent(topic: str) -> str:
    return client.messages.create(
        model="claude-haiku-4-5-20251001", max_tokens=512,
        system="You are a research assistant. Provide factual, concise information.",
        messages=[{"role": "user", "content": f"Research: {topic}"}]
    ).content[0].text

def writer_agent(topic: str, research: str) -> str:
    return client.messages.create(
        model="claude-haiku-4-5-20251001", max_tokens=1024,
        system="You are a technical writer. Write clear, engaging prose.",
        messages=[{"role": "user", "content": f"Write a section about {topic} using this research:
{research}"}]
    ).content[0].text

def fact_check_agent(content: str) -> str:
    return client.messages.create(
        model="claude-haiku-4-5-20251001", max_tokens=256,
        system="You are a fact-checker. Identify any claims that may be incorrect.",
        messages=[{"role": "user", "content": f"Fact-check this content:
{content}"}]
    ).content[0].text

# Orchestrator
def orchestrator(user_request: str) -> str:
    # Step 1: Plan
    plan_prompt = f'''Break this request into subtasks for: researcher, writer, fact-checker.
Output JSON: {{"subtasks": [{{"agent": "researcher/writer/fact_checker", "input": "..."}}]}}

Request: {user_request}'''

    plan_resp = client.messages.create(
        model="claude-sonnet-4-5", max_tokens=512,
        messages=[{"role": "user", "content": plan_prompt}]
    )
    plan_text = plan_resp.content[0].text
    # Extract JSON from response
    import re
    json_match = re.search(r'\{.*\}', plan_text, re.DOTALL)
    plan = json.loads(json_match.group()) if json_match else {"subtasks": []}

    # Step 2: Execute
    results = {}
    for task in plan.get("subtasks", []):
        agent, inp = task["agent"], task["input"]
        if agent == "researcher":
            results["research"] = research_agent(inp)
        elif agent == "writer":
            research = results.get("research", "")
            results["draft"] = writer_agent(inp, research)
        elif agent == "fact_checker":
            draft = results.get("draft", inp)
            results["fact_check"] = fact_check_agent(draft)

    # Step 3: Assemble
    assembly_prompt = f"Assemble a final response from:
Research: {results.get('research', '')}
Draft: {results.get('draft', '')}
Fact-check notes: {results.get('fact_check', '')}"
    return client.messages.create(
        model="claude-sonnet-4-5", max_tokens=1024,
        messages=[{"role": "user", "content": assembly_prompt}]
    ).content[0].text

result = orchestrator("Write a short technical explainer on how HTTPS works.")
print(result)
SECTION 04

Dynamic routing

Instead of hardcoding which agents exist, the orchestrator can dynamically select from a registry of available agents based on the task:

AGENT_REGISTRY = {
    "researcher": {
        "description": "Research factual information about a topic",
        "fn": research_agent
    },
    "coder": {
        "description": "Write Python code to solve a programming problem",
        "fn": lambda task: client.messages.create(
            model="claude-sonnet-4-5", max_tokens=1024,
            system="Write clean, well-commented Python code.",
            messages=[{"role": "user", "content": task}]
        ).content[0].text
    },
    "summariser": {
        "description": "Summarise long text into key points",
        "fn": lambda task: client.messages.create(
            model="claude-haiku-4-5-20251001", max_tokens=256,
            system="Extract the 3-5 key points.",
            messages=[{"role": "user", "content": task}]
        ).content[0].text
    }
}

def route(task: str) -> str:
    '''Ask the orchestrator which agent to use for a task.'''
    agent_list = "
".join(f"- {k}: {v['description']}" for k, v in AGENT_REGISTRY.items())
    resp = client.messages.create(
        model="claude-haiku-4-5-20251001", max_tokens=64,
        messages=[{"role": "user", "content": f"Which agent should handle: '{task}'?
Agents:
{agent_list}
Respond with just the agent name."}]
    )
    agent_name = resp.content[0].text.strip().lower()
    return AGENT_REGISTRY.get(agent_name, AGENT_REGISTRY["researcher"])["fn"](task)
SECTION 05

Streaming results to the user

For long-running orchestration, stream intermediate results so users see progress rather than a blank screen:

from typing import Generator

def streaming_orchestrator(request: str) -> Generator[str, None, None]:
    yield "Planning subtasks...\n"
    subtasks = plan_subtasks(request)

    for i, task in enumerate(subtasks):
        yield f"\n[{i+1}/{len(subtasks)}] Running {task['agent']}...\n"
        result = run_agent(task)
        yield f"Done: {result[:100]}...\n"

    yield "\nAssembling final response...\n"
    final = assemble_results(subtasks)
    yield f"\n{final}"

# Usage
for chunk in streaming_orchestrator("Write a technical blog post about LLMs"):
    print(chunk, end="", flush=True)
SECTION 06

When to use the orchestrator pattern

Good fits: tasks that naturally decompose into parallel independent subtasks (research + write + illustrate), tasks with sequential dependencies (plan β†’ validate β†’ execute), workflows with quality gates (draft β†’ critique β†’ revise β†’ publish), and cases where different subtasks benefit from different model sizes or specialisations.

Poor fits: simple tasks that a single LLM call handles adequately (orchestration overhead isn't worth it), tightly coupled tasks where the split is artificial, and latency-critical applications where adding an orchestrator adds 2-3 extra LLM round-trips.

A useful heuristic: if you can't clearly articulate what each worker agent does and why a human would assign those tasks to different specialists, you probably don't need an orchestrator.

SECTION 07

Gotchas

The orchestrator becomes the bottleneck. Every subtask waits for the orchestrator to process its result before the next step. Use async execution and parallel fan-out wherever subtasks are independent β€” don't serialize unnecessarily.

Information loss at assembly. The orchestrator sees summaries or excerpts of each worker's output, not the full raw output. Important details can fall through the cracks. Pass structured data (JSON/dict) between agents rather than plain text whenever possible, so the assembler has access to all fields.

Error handling must be explicit. If Worker A fails, the orchestrator needs a clear recovery path: retry, use a fallback agent, skip the subtask, or abort the whole workflow. Don't assume workers succeed β€” always handle errors at the orchestration layer.

SECTION 08

Orchestrator Design Checklist

Design DecisionOptionsRecommendation
Task decompositionStatic (hardcoded) vs Dynamic (LLM-decided)Static for known workflows; dynamic for open-ended tasks
Worker communicationDirect call vs message queueDirect for latency-sensitive; queue for reliability
Error handlingFail fast vs partial resultsPartial results with confidence scores for user-facing APIs
Result aggregationConcat vs synthesise with LLMSynthesise for coherent output; concat for audit trails
Human oversightNone vs checkpoint vs full reviewCheckpoint before irreversible actions

The orchestrator should be the only component with global state β€” individual workers should be stateless and rerunnable. This makes the system debuggable: if a worker fails, you can replay exactly its inputs from the orchestrator's log without re-running earlier stages. Log the full task decomposition and all worker inputs/outputs at DEBUG level; this data is invaluable when diagnosing why the final output diverged from expectations.

Keep orchestrator state minimal: store only task inputs, outputs, and status. Delegate all business logic to workers. Checkpoint state after every worker completion, not just at the end of the full orchestration, so a failed orchestrator instance can be replaced and any task resumed from the last persisted checkpoint without replaying the entire workflow.

Implement structured logging in your orchestrator to enable post-hoc workflow analysis. Each log entry should include: orchestration ID, step number, worker ID, input token count, output token count, latency, and success/failure status. This lets you run cost attribution queries across orchestration runs, identify which worker types are the most expensive or slowest, and build dashboards that show quality trends correlated with orchestration complexity.