An agent triggered by an external event — webhook, queue message, cron schedule — rather than a direct user call. The foundation of background automation: reactive pipelines that run without a human in the loop.
Most agent examples are interactive: a user sends a message, the agent responds. Event-driven agents flip this model: the agent sits idle until triggered by something happening in the world — a new email arrives, a GitHub PR is opened, a monitoring alert fires, a daily schedule ticks over.
This is the foundation of automation: agents that do useful work in the background without a human initiating each run. Examples include: automatically summarising incoming support tickets and routing them, monitoring a news feed and alerting on specific topics, processing overnight batch jobs, or responding to webhooks from payment processors.
Event-driven agents are often long-running processes (not serverless functions) that maintain persistent connections to queues or webhook endpoints. Their reliability requirements are higher than interactive agents — a crash means missed events.
Three primary trigger patterns for event-driven agents:
Webhooks: an external service calls your HTTP endpoint when something happens. GitHub sends a webhook on PR events; Stripe sends one on payment completion; Twilio sends one on incoming SMS. Fast, push-based, no polling needed.
Message queues: events are written to a queue (Redis, RabbitMQ, SQS, Kafka) and the agent consumes them. Decouples event producer from consumer, handles backpressure, supports retry and dead-letter queues. Best for high-volume or unreliable producers.
Scheduled triggers: a cron job or scheduler (APScheduler, Celery Beat, GitHub Actions) wakes the agent on a fixed schedule. Simplest for regular batch processing: "every night at 2am, process yesterday's logs".
from fastapi import FastAPI, Request, BackgroundTasks
import anthropic, hmac, hashlib
app = FastAPI()
client = anthropic.Anthropic()
def verify_signature(payload: bytes, signature: str, secret: str) -> bool:
'''Verify webhook authenticity (GitHub-style HMAC).'''
expected = "sha256=" + hmac.new(
secret.encode(), payload, hashlib.sha256
).hexdigest()
return hmac.compare_digest(expected, signature)
async def process_github_pr(payload: dict):
'''Agent that analyses a new GitHub PR.'''
pr = payload.get("pull_request", {})
if not pr:
return
title = pr.get("title", "")
body = pr.get("body", "")
diff_url = pr.get("diff_url", "")
response = client.messages.create(
model="claude-sonnet-4-5", max_tokens=1024,
system="You are a code review assistant. Provide a brief risk assessment.",
messages=[{"role": "user", "content": f"PR: {title}
Description: {body}
Provide a 3-point review checklist."}]
)
review = response.content[0].text
# Post review back to GitHub (simplified)
print(f"PR Review:
{review}")
# In production: call GitHub API to post a comment
@app.post("/webhook/github")
async def github_webhook(request: Request, background_tasks: BackgroundTasks):
payload = await request.body()
signature = request.headers.get("X-Hub-Signature-256", "")
# Verify and process in background (return 200 immediately to GitHub)
background_tasks.add_task(process_github_pr, await request.json())
return {"status": "received"}
import redis, json, time, anthropic
from dataclasses import dataclass
@dataclass
class AgentTask:
task_id: str
task_type: str
payload: dict
r = redis.Redis(host="localhost", port=6379, db=0)
client = anthropic.Anthropic()
QUEUE_NAME = "agent_tasks"
PROCESSING_QUEUE = "agent_tasks:processing"
def process_task(task: AgentTask) -> dict:
if task.task_type == "summarise":
text = task.payload.get("text", "")
resp = client.messages.create(
model="claude-haiku-4-5-20251001", max_tokens=256,
messages=[{"role": "user", "content": f"Summarise in 2 sentences:
{text}"}]
)
return {"task_id": task.task_id, "summary": resp.content[0].text}
return {"task_id": task.task_id, "error": "Unknown task type"}
def worker_loop():
'''Reliable queue consumer with at-least-once processing.'''
print("Agent worker started. Waiting for tasks...")
while True:
# BRPOPLPUSH: atomically move task to processing queue
raw = r.brpoplpush(QUEUE_NAME, PROCESSING_QUEUE, timeout=5)
if raw is None:
continue # timeout — check again
task_data = json.loads(raw)
task = AgentTask(**task_data)
try:
result = process_task(task)
r.set(f"result:{task.task_id}", json.dumps(result), ex=3600)
r.lrem(PROCESSING_QUEUE, 1, raw) # remove from processing queue
print(f"Task {task.task_id} complete")
except Exception as e:
print(f"Task {task.task_id} failed: {e}")
r.lpush(QUEUE_NAME, raw) # requeue for retry
r.lrem(PROCESSING_QUEUE, 1, raw)
# Enqueue a task
def enqueue(task_type: str, payload: dict) -> str:
import uuid
task_id = str(uuid.uuid4())
r.lpush(QUEUE_NAME, json.dumps({"task_id": task_id, "task_type": task_type, "payload": payload}))
return task_id
if __name__ == "__main__":
worker_loop()
from apscheduler.schedulers.blocking import BlockingScheduler
import anthropic, datetime
client = anthropic.Anthropic()
scheduler = BlockingScheduler()
@scheduler.scheduled_job("cron", hour=7, minute=0) # 7:00 AM daily
def morning_briefing():
'''Generate a daily briefing from overnight data.'''
today = datetime.date.today().isoformat()
# In production: fetch overnight data from your systems
data_summary = "30 new support tickets. 5 high-priority. 2 related to billing."
response = client.messages.create(
model="claude-sonnet-4-5", max_tokens=512,
messages=[{"role": "user", "content": f"Date: {today}
Overnight summary: {data_summary}
Write a 3-point morning briefing for the team."}]
)
briefing = response.content[0].text
print(f"Morning Briefing — {today}:
{briefing}")
# In production: send via Slack, email, etc.
@scheduler.scheduled_job("interval", hours=1) # Every hour
def monitor_and_alert():
'''Check system health and alert on anomalies.'''
# In production: fetch real metrics
metrics = {"error_rate": 0.02, "p99_latency_ms": 450, "active_users": 1200}
if metrics["error_rate"] > 0.05 or metrics["p99_latency_ms"] > 1000:
response = client.messages.create(
model="claude-haiku-4-5-20251001", max_tokens=256,
messages=[{"role": "user", "content": f"Alert: unusual metrics: {metrics}. Write a 1-paragraph incident summary."}]
)
print(f"ALERT: {response.content[0].text}")
print("Starting scheduled agent...")
scheduler.start()
Event-driven agents must handle failures gracefully. Key patterns:
Idempotent processing: if an event is processed twice (e.g., due to a retry after a crash), the result should be the same as processing it once. Add a deduplication key (event ID) and check it before processing:
def process_event_idempotently(event_id: str, event: dict):
# Check if already processed
if r.get(f"processed:{event_id}"):
print(f"Skipping duplicate event {event_id}")
return
result = process_event(event)
# Mark as processed (expire after 24h)
r.set(f"processed:{event_id}", "1", ex=86400)
return result
Dead-letter queues: events that fail repeatedly should go to a DLQ for manual inspection, not silently dropped. Set a max retry count (3-5) before routing to DLQ.
Observability: log event ID, processing start/end time, success/failure, and token usage for every agent run. Without this, debugging production failures is nearly impossible.
Webhook endpoints must return 200 quickly. Most webhook providers (GitHub, Stripe, Twilio) timeout in 5-30 seconds. If your agent takes longer, acknowledge the webhook immediately and process in a background task or queue. A timeout causes the provider to retry — leading to duplicate processing if you're not idempotent.
Queue consumers need heartbeats. A worker that takes 10 minutes on a task might look "dead" to the queue, which reassigns the task to another worker — causing double processing. Set visibility timeouts longer than your longest expected task, or send periodic heartbeat extensions.
LLM cost surprises in batch jobs. A scheduled agent that processes 1,000 items/night might cost pennies in testing but dollars in production when the items are large. Always estimate token cost before deploying batch agents — multiply (average tokens per item) × (items per run) × (cost per 1M tokens) × (runs per month).
| Failure Mode | Cause | Solution |
|---|---|---|
| Duplicate processing | Webhook retry on timeout | Idempotency key; check-and-set in Redis before processing |
| Message loss | Agent crashes mid-processing | Ack only after successful completion; dead-letter queue for failures |
| Stale triggers | Old events queued during downtime | Timestamp check; discard events older than TTL |
| Fan-out explosion | One event triggers many agents | Rate-limit downstream fan-out; use topic-based filtering |
| Runaway cron | Scheduled job takes longer than interval | Distributed lock; skip if previous run still active |
The dead-letter queue (DLQ) is the most important reliability primitive in event-driven agent systems. When a message fails processing after all retries, move it to the DLQ rather than dropping it. Set up a monitoring alert on DLQ depth — a growing DLQ means systematic failures, not just transient ones. Schedule a weekly review of DLQ contents to identify patterns: if the same event type fails repeatedly, the agent's handling logic needs to be updated.
Testing event-driven agents requires simulating the full event lifecycle, not just the agent logic. Use a local message broker in tests rather than mocking it entirely — this catches serialisation bugs and timing issues. Write integration tests that publish a test event, verify the agent processes it, and confirm expected side effects. Use a dedicated test topic to avoid polluting production event streams.
For GDPR and data residency compliance in event-driven agent pipelines, never store PII in event payloads themselves. Instead, store a reference ID in the event and look up PII from an encrypted, region-restricted data store at processing time. This allows event logs and dead-letter queues to be retained for debugging without creating a compliance liability. Implement event payload schema validation at the producer side to catch accidental PII inclusion before events enter the queue.