Multi-Agent

Event-Driven Agent

An agent triggered by an external event — webhook, queue message, cron schedule — rather than a direct user call. The foundation of background automation: reactive pipelines that run without a human in the loop.

Webhook
Triggered
Background
Automation
No user
Interaction needed

Table of Contents

SECTION 01

From interactive to event-driven agents

Most agent examples are interactive: a user sends a message, the agent responds. Event-driven agents flip this model: the agent sits idle until triggered by something happening in the world — a new email arrives, a GitHub PR is opened, a monitoring alert fires, a daily schedule ticks over.

This is the foundation of automation: agents that do useful work in the background without a human initiating each run. Examples include: automatically summarising incoming support tickets and routing them, monitoring a news feed and alerting on specific topics, processing overnight batch jobs, or responding to webhooks from payment processors.

Event-driven agents are often long-running processes (not serverless functions) that maintain persistent connections to queues or webhook endpoints. Their reliability requirements are higher than interactive agents — a crash means missed events.

SECTION 02

Trigger patterns

Three primary trigger patterns for event-driven agents:

Webhooks: an external service calls your HTTP endpoint when something happens. GitHub sends a webhook on PR events; Stripe sends one on payment completion; Twilio sends one on incoming SMS. Fast, push-based, no polling needed.

Message queues: events are written to a queue (Redis, RabbitMQ, SQS, Kafka) and the agent consumes them. Decouples event producer from consumer, handles backpressure, supports retry and dead-letter queues. Best for high-volume or unreliable producers.

Scheduled triggers: a cron job or scheduler (APScheduler, Celery Beat, GitHub Actions) wakes the agent on a fixed schedule. Simplest for regular batch processing: "every night at 2am, process yesterday's logs".

SECTION 03

Webhook-triggered agent

from fastapi import FastAPI, Request, BackgroundTasks
import anthropic, hmac, hashlib

app = FastAPI()
client = anthropic.Anthropic()

def verify_signature(payload: bytes, signature: str, secret: str) -> bool:
    '''Verify webhook authenticity (GitHub-style HMAC).'''
    expected = "sha256=" + hmac.new(
        secret.encode(), payload, hashlib.sha256
    ).hexdigest()
    return hmac.compare_digest(expected, signature)

async def process_github_pr(payload: dict):
    '''Agent that analyses a new GitHub PR.'''
    pr = payload.get("pull_request", {})
    if not pr:
        return

    title = pr.get("title", "")
    body = pr.get("body", "")
    diff_url = pr.get("diff_url", "")

    response = client.messages.create(
        model="claude-sonnet-4-5", max_tokens=1024,
        system="You are a code review assistant. Provide a brief risk assessment.",
        messages=[{"role": "user", "content": f"PR: {title}

Description: {body}

Provide a 3-point review checklist."}]
    )
    review = response.content[0].text

    # Post review back to GitHub (simplified)
    print(f"PR Review:
{review}")
    # In production: call GitHub API to post a comment

@app.post("/webhook/github")
async def github_webhook(request: Request, background_tasks: BackgroundTasks):
    payload = await request.body()
    signature = request.headers.get("X-Hub-Signature-256", "")

    # Verify and process in background (return 200 immediately to GitHub)
    background_tasks.add_task(process_github_pr, await request.json())
    return {"status": "received"}
SECTION 04

Queue-based agent with Redis

import redis, json, time, anthropic
from dataclasses import dataclass

@dataclass
class AgentTask:
    task_id: str
    task_type: str
    payload: dict

r = redis.Redis(host="localhost", port=6379, db=0)
client = anthropic.Anthropic()
QUEUE_NAME = "agent_tasks"
PROCESSING_QUEUE = "agent_tasks:processing"

def process_task(task: AgentTask) -> dict:
    if task.task_type == "summarise":
        text = task.payload.get("text", "")
        resp = client.messages.create(
            model="claude-haiku-4-5-20251001", max_tokens=256,
            messages=[{"role": "user", "content": f"Summarise in 2 sentences:
{text}"}]
        )
        return {"task_id": task.task_id, "summary": resp.content[0].text}
    return {"task_id": task.task_id, "error": "Unknown task type"}

def worker_loop():
    '''Reliable queue consumer with at-least-once processing.'''
    print("Agent worker started. Waiting for tasks...")
    while True:
        # BRPOPLPUSH: atomically move task to processing queue
        raw = r.brpoplpush(QUEUE_NAME, PROCESSING_QUEUE, timeout=5)
        if raw is None:
            continue  # timeout — check again
        task_data = json.loads(raw)
        task = AgentTask(**task_data)
        try:
            result = process_task(task)
            r.set(f"result:{task.task_id}", json.dumps(result), ex=3600)
            r.lrem(PROCESSING_QUEUE, 1, raw)  # remove from processing queue
            print(f"Task {task.task_id} complete")
        except Exception as e:
            print(f"Task {task.task_id} failed: {e}")
            r.lpush(QUEUE_NAME, raw)  # requeue for retry
            r.lrem(PROCESSING_QUEUE, 1, raw)

# Enqueue a task
def enqueue(task_type: str, payload: dict) -> str:
    import uuid
    task_id = str(uuid.uuid4())
    r.lpush(QUEUE_NAME, json.dumps({"task_id": task_id, "task_type": task_type, "payload": payload}))
    return task_id

if __name__ == "__main__":
    worker_loop()
SECTION 05

Scheduled agents with cron

from apscheduler.schedulers.blocking import BlockingScheduler
import anthropic, datetime

client = anthropic.Anthropic()
scheduler = BlockingScheduler()

@scheduler.scheduled_job("cron", hour=7, minute=0)  # 7:00 AM daily
def morning_briefing():
    '''Generate a daily briefing from overnight data.'''
    today = datetime.date.today().isoformat()
    # In production: fetch overnight data from your systems
    data_summary = "30 new support tickets. 5 high-priority. 2 related to billing."

    response = client.messages.create(
        model="claude-sonnet-4-5", max_tokens=512,
        messages=[{"role": "user", "content": f"Date: {today}
Overnight summary: {data_summary}

Write a 3-point morning briefing for the team."}]
    )
    briefing = response.content[0].text
    print(f"Morning Briefing — {today}:
{briefing}")
    # In production: send via Slack, email, etc.

@scheduler.scheduled_job("interval", hours=1)  # Every hour
def monitor_and_alert():
    '''Check system health and alert on anomalies.'''
    # In production: fetch real metrics
    metrics = {"error_rate": 0.02, "p99_latency_ms": 450, "active_users": 1200}
    if metrics["error_rate"] > 0.05 or metrics["p99_latency_ms"] > 1000:
        response = client.messages.create(
            model="claude-haiku-4-5-20251001", max_tokens=256,
            messages=[{"role": "user", "content": f"Alert: unusual metrics: {metrics}. Write a 1-paragraph incident summary."}]
        )
        print(f"ALERT: {response.content[0].text}")

print("Starting scheduled agent...")
scheduler.start()
SECTION 06

Idempotency and reliability

Event-driven agents must handle failures gracefully. Key patterns:

Idempotent processing: if an event is processed twice (e.g., due to a retry after a crash), the result should be the same as processing it once. Add a deduplication key (event ID) and check it before processing:

def process_event_idempotently(event_id: str, event: dict):
    # Check if already processed
    if r.get(f"processed:{event_id}"):
        print(f"Skipping duplicate event {event_id}")
        return
    result = process_event(event)
    # Mark as processed (expire after 24h)
    r.set(f"processed:{event_id}", "1", ex=86400)
    return result

Dead-letter queues: events that fail repeatedly should go to a DLQ for manual inspection, not silently dropped. Set a max retry count (3-5) before routing to DLQ.

Observability: log event ID, processing start/end time, success/failure, and token usage for every agent run. Without this, debugging production failures is nearly impossible.

SECTION 07

Gotchas

Webhook endpoints must return 200 quickly. Most webhook providers (GitHub, Stripe, Twilio) timeout in 5-30 seconds. If your agent takes longer, acknowledge the webhook immediately and process in a background task or queue. A timeout causes the provider to retry — leading to duplicate processing if you're not idempotent.

Queue consumers need heartbeats. A worker that takes 10 minutes on a task might look "dead" to the queue, which reassigns the task to another worker — causing double processing. Set visibility timeouts longer than your longest expected task, or send periodic heartbeat extensions.

LLM cost surprises in batch jobs. A scheduled agent that processes 1,000 items/night might cost pennies in testing but dollars in production when the items are large. Always estimate token cost before deploying batch agents — multiply (average tokens per item) × (items per run) × (cost per 1M tokens) × (runs per month).

SECTION 08

Event-Driven Agent Reliability Patterns

Failure ModeCauseSolution
Duplicate processingWebhook retry on timeoutIdempotency key; check-and-set in Redis before processing
Message lossAgent crashes mid-processingAck only after successful completion; dead-letter queue for failures
Stale triggersOld events queued during downtimeTimestamp check; discard events older than TTL
Fan-out explosionOne event triggers many agentsRate-limit downstream fan-out; use topic-based filtering
Runaway cronScheduled job takes longer than intervalDistributed lock; skip if previous run still active

The dead-letter queue (DLQ) is the most important reliability primitive in event-driven agent systems. When a message fails processing after all retries, move it to the DLQ rather than dropping it. Set up a monitoring alert on DLQ depth — a growing DLQ means systematic failures, not just transient ones. Schedule a weekly review of DLQ contents to identify patterns: if the same event type fails repeatedly, the agent's handling logic needs to be updated.

Testing event-driven agents requires simulating the full event lifecycle, not just the agent logic. Use a local message broker in tests rather than mocking it entirely — this catches serialisation bugs and timing issues. Write integration tests that publish a test event, verify the agent processes it, and confirm expected side effects. Use a dedicated test topic to avoid polluting production event streams.

For GDPR and data residency compliance in event-driven agent pipelines, never store PII in event payloads themselves. Instead, store a reference ID in the event and look up PII from an encrypted, region-restricted data store at processing time. This allows event logs and dead-letter queues to be retained for debugging without creating a compliance liability. Implement event payload schema validation at the producer side to catch accidental PII inclusion before events enter the queue.