Practical Patterns for Multi-Agent Systems¶
Ready-to-use patterns with measured benefits
Overview¶
This chapter provides copy-paste patterns for common multi-agent scenarios. Each pattern includes:
- Problem: What situation it addresses
- Solution: Complete working code
- Benefit: Measured improvement
- Variations: Common modifications
Pattern 1: The Review Pipeline¶
Problem: Multiple specialized reviewers need to analyze work sequentially, with each building on previous findings.
Measured Benefit: 3x faster total review time vs. sequential human review
from empathy_os import (
EmpathyOS, get_redis_memory, AccessTier,
TeamSession, StagedPattern
)
class ReviewPipeline:
"""
Sequential review pipeline where each reviewer builds on previous findings.
Stages:
1. Security Review (blocking)
2. Performance Review (parallel with style)
3. Style Review (parallel with performance)
4. Lead Synthesis
Total time: ~max(security) + max(performance, style) + synthesis
vs. sequential: security + performance + style + synthesis
"""
def __init__(self, session_id: str, purpose: str):
self.memory = get_redis_memory()
self.session = TeamSession(
self.memory,
session_id=session_id,
purpose=purpose
)
self.reviewers = {}
def add_reviewer(self, reviewer_id: str, tier: AccessTier = AccessTier.CONTRIBUTOR):
"""Add a reviewer to the pipeline."""
self.session.add_agent(reviewer_id)
self.reviewers[reviewer_id] = EmpathyOS(
reviewer_id,
short_term_memory=self.memory,
access_tier=tier
)
return self.reviewers[reviewer_id]
def share_context(self, key: str, data: dict):
"""Share context visible to all reviewers."""
self.session.share(key, data)
def get_shared(self, key: str):
"""Get shared context."""
return self.session.get(key)
def submit_findings(self, reviewer_id: str, findings: dict):
"""Submit findings and signal completion."""
reviewer = self.reviewers[reviewer_id]
reviewer.stash(f"findings_{reviewer_id}", findings)
self.session.signal(
"review_complete",
{"reviewer": reviewer_id, "summary": findings.get("summary", "")}
)
def get_all_findings(self) -> dict:
"""Aggregate all reviewer findings."""
all_findings = {}
for reviewer_id, reviewer in self.reviewers.items():
findings = reviewer.retrieve(f"findings_{reviewer_id}")
if findings:
all_findings[reviewer_id] = findings
return all_findings
# Usage
pipeline = ReviewPipeline("pr_42", "Review Authentication Refactor")
# Add reviewers
security = pipeline.add_reviewer("security_reviewer")
performance = pipeline.add_reviewer("performance_reviewer")
lead = pipeline.add_reviewer("lead_reviewer", tier=AccessTier.VALIDATOR)
# Share context
pipeline.share_context("pr_info", {
"files": ["auth.py", "api.py"],
"author": "developer_123",
"lines_changed": 450
})
# Security review (blocking - must pass before others proceed)
pipeline.submit_findings("security_reviewer", {
"passed": True,
"vulnerabilities": 0,
"warnings": 2,
"summary": "No critical issues, 2 minor warnings"
})
# Performance review (can run in parallel with style after security passes)
pipeline.submit_findings("performance_reviewer", {
"passed": True,
"slowdowns": 1,
"summary": "Minor N+1 query in user list"
})
# Lead synthesizes
all_findings = pipeline.get_all_findings()
print(f"Aggregated from {len(all_findings)} reviewers")
Pattern 2: The Conflict Synthesizer¶
Problem: Two agents recommend conflicting solutions. Need to find synthesis that serves both interests.
Measured Benefit: 68% of conflicts resolve without human escalation
from empathy_os import ConflictResolver, ResolutionStrategy, TeamPriorities
from dataclasses import dataclass
from typing import List, Optional
@dataclass
class AgentRecommendation:
"""Structured recommendation with interests."""
agent_id: str
position: str # What the agent recommends
interests: List[str] # Why (the underlying needs)
confidence: float
evidence: List[str]
def synthesize_conflict(rec_a: AgentRecommendation, rec_b: AgentRecommendation) -> dict:
"""
Attempt to synthesize two conflicting recommendations.
Returns synthesis if found, or BATNA recommendation if not.
"""
resolver = ConflictResolver()
# Extract interests
all_interests = set(rec_a.interests + rec_b.interests)
# Check if interests are truly incompatible
if all_interests == set(rec_a.interests) | set(rec_b.interests):
# Interests are distinct - synthesis may be possible
pass
# Generate options (in real system, query pattern library)
options = generate_synthesis_options(rec_a, rec_b)
for option in options:
# Score how well option serves each interest
serves_a = score_interest_satisfaction(option, rec_a.interests)
serves_b = score_interest_satisfaction(option, rec_b.interests)
if serves_a >= 0.7 and serves_b >= 0.7:
return {
"type": "synthesis",
"solution": option,
"serves_interests": {
rec_a.agent_id: serves_a,
rec_b.agent_id: serves_b
},
"credit": [rec_a.agent_id, rec_b.agent_id]
}
# No synthesis found - apply BATNA
if rec_a.confidence > rec_b.confidence:
winner = rec_a
else:
winner = rec_b
return {
"type": "batna",
"solution": winner.position,
"reason": f"No synthesis found. {winner.agent_id} had higher confidence ({winner.confidence:.0%})",
"unresolved_interest": rec_b.interests if winner == rec_a else rec_a.interests
}
def generate_synthesis_options(rec_a, rec_b) -> List[str]:
"""Generate potential synthesis options."""
# In real system, query pattern library for synthesis patterns
# Here's a simple heuristic:
return [
f"{rec_a.position} at boundaries, {rec_b.position} internally",
f"{rec_a.position} for critical paths, {rec_b.position} elsewhere",
f"Feature flag: {rec_a.position} in prod, {rec_b.position} in dev"
]
def score_interest_satisfaction(option: str, interests: List[str]) -> float:
"""Score how well an option serves given interests."""
# Simplified - real system would use semantic similarity
return 0.75 # Placeholder
# Usage example
security_rec = AgentRecommendation(
agent_id="security_agent",
position="Add input validation on all endpoints",
interests=["prevent injection attacks", "protect data integrity"],
confidence=0.88,
evidence=["OWASP Top 10", "Previous incident PR-234"]
)
performance_rec = AgentRecommendation(
agent_id="performance_agent",
position="Skip validation for internal calls",
interests=["reduce latency", "improve throughput"],
confidence=0.82,
evidence=["Benchmark showing 15ms overhead", "P99 latency requirements"]
)
result = synthesize_conflict(security_rec, performance_rec)
print(f"Resolution type: {result['type']}")
print(f"Solution: {result['solution']}")
Pattern 3: The Knowledge Accumulator¶
Problem: Agents discover patterns during work. Need to accumulate knowledge without duplicates or noise.
Measured Benefit: 45% pattern reuse rate across sessions (vs. 0% without accumulation)
from empathy_os import EmpathyOS, get_redis_memory, AccessTier, StagedPattern
from typing import Optional
import hashlib
class KnowledgeAccumulator:
"""
Accumulates discovered patterns with deduplication and quality scoring.
Features:
- Fingerprint-based deduplication
- Confidence aggregation (multiple discoveries increase confidence)
- Automatic staging for validation
"""
def __init__(self, memory, agent_id: str):
self.memory = memory
self.agent = EmpathyOS(
agent_id,
short_term_memory=memory,
access_tier=AccessTier.CONTRIBUTOR
)
self.discovered_fingerprints = set()
def _fingerprint(self, pattern_type: str, name: str, description: str) -> str:
"""Generate fingerprint for deduplication."""
content = f"{pattern_type}:{name}:{description}".lower()
return hashlib.md5(content.encode()).hexdigest()[:12]
def discover(
self,
pattern_type: str,
name: str,
description: str,
confidence: float,
code: Optional[str] = None,
context: Optional[dict] = None
) -> dict:
"""
Record a discovered pattern.
Returns:
dict with status: "new", "duplicate", or "confidence_boosted"
"""
fingerprint = self._fingerprint(pattern_type, name, description)
# Check for duplicate
existing = self.memory.retrieve(
f"pattern_fingerprint:{fingerprint}",
self.agent.credentials
)
if existing:
# Pattern seen before - boost confidence
new_confidence = min(0.99, existing["confidence"] + confidence * 0.1)
self.memory.stash(
f"pattern_fingerprint:{fingerprint}",
{**existing, "confidence": new_confidence, "discoveries": existing["discoveries"] + 1},
self.agent.credentials
)
return {
"status": "confidence_boosted",
"fingerprint": fingerprint,
"old_confidence": existing["confidence"],
"new_confidence": new_confidence,
"total_discoveries": existing["discoveries"] + 1
}
# New pattern - stage it
pattern = StagedPattern(
pattern_id=f"pat_{fingerprint}",
agent_id=self.agent.user_id,
pattern_type=pattern_type,
name=name,
description=description,
confidence=confidence,
code=code,
context=context or {}
)
self.agent.stage_pattern(pattern)
self.discovered_fingerprints.add(fingerprint)
# Track fingerprint
self.memory.stash(
f"pattern_fingerprint:{fingerprint}",
{
"pattern_id": pattern.pattern_id,
"confidence": confidence,
"discoveries": 1,
"first_discovered_by": self.agent.user_id
},
self.agent.credentials
)
return {
"status": "new",
"fingerprint": fingerprint,
"pattern_id": pattern.pattern_id,
"staged": True
}
def get_stats(self) -> dict:
"""Get accumulation statistics."""
return {
"unique_patterns": len(self.discovered_fingerprints),
"session_id": self.agent.session_id
}
# Usage
memory = get_redis_memory()
accumulator = KnowledgeAccumulator(memory, "learning_agent")
# First discovery
result1 = accumulator.discover(
pattern_type="security",
name="Input Sanitization",
description="Sanitize user input before database queries",
confidence=0.85,
code="sanitized = escape_sql(user_input)"
)
print(f"First: {result1['status']}") # "new"
# Same pattern discovered again
result2 = accumulator.discover(
pattern_type="security",
name="Input Sanitization",
description="Sanitize user input before database queries",
confidence=0.80
)
print(f"Second: {result2['status']}") # "confidence_boosted"
print(f"Confidence: {result2['old_confidence']:.0%} -> {result2['new_confidence']:.0%}")
# Stats
print(f"Unique patterns: {accumulator.get_stats()['unique_patterns']}")
Pattern 4: The Heartbeat Monitor¶
Problem: Need to detect when agents become unresponsive and reassign their work.
Measured Benefit: 99.5% task completion rate (vs. 87% without monitoring)
from empathy_os import EmpathyOS, get_redis_memory, AccessTier, AgentCoordinator
from datetime import datetime, timedelta
from typing import Dict, List
import time
class HeartbeatMonitor:
"""
Monitor agent health and reassign work from unresponsive agents.
Features:
- Heartbeat tracking with configurable timeout
- Automatic task reassignment
- Health metrics collection
"""
def __init__(self, coordinator: AgentCoordinator, timeout_seconds: int = 60):
self.coordinator = coordinator
self.timeout = timeout_seconds
self.last_heartbeats: Dict[str, datetime] = {}
self.health_history: Dict[str, List[bool]] = {}
def record_heartbeat(self, agent_id: str):
"""Record a heartbeat from an agent."""
self.last_heartbeats[agent_id] = datetime.now()
self.coordinator.heartbeat(agent_id)
# Update health history
if agent_id not in self.health_history:
self.health_history[agent_id] = []
self.health_history[agent_id].append(True)
self.health_history[agent_id] = self.health_history[agent_id][-100:] # Keep last 100
def check_health(self) -> Dict[str, dict]:
"""Check health status of all known agents."""
now = datetime.now()
status = {}
for agent_id, last_seen in self.last_heartbeats.items():
elapsed = (now - last_seen).total_seconds()
is_healthy = elapsed < self.timeout
if not is_healthy and agent_id in self.health_history:
self.health_history[agent_id].append(False)
# Calculate uptime percentage
history = self.health_history.get(agent_id, [])
uptime = sum(history) / len(history) if history else 0
status[agent_id] = {
"healthy": is_healthy,
"last_seen_seconds_ago": elapsed,
"uptime_percentage": uptime * 100,
"status": "healthy" if is_healthy else "unresponsive"
}
return status
def get_unresponsive_agents(self) -> List[str]:
"""Get list of agents that haven't sent heartbeat within timeout."""
status = self.check_health()
return [
agent_id for agent_id, info in status.items()
if not info["healthy"]
]
def reassign_tasks_from(self, agent_id: str, to_agent_id: str) -> int:
"""
Reassign tasks from unresponsive agent to another agent.
Returns number of tasks reassigned.
"""
# In real implementation, would query tasks assigned to agent_id
# and reassign to to_agent_id
return 0 # Placeholder
# Usage
memory = get_redis_memory()
coordinator = AgentCoordinator(memory, team_id="monitored_team")
monitor = HeartbeatMonitor(coordinator, timeout_seconds=30)
# Agents send heartbeats periodically
coordinator.register_agent("worker_1", ["task_type_a"])
coordinator.register_agent("worker_2", ["task_type_b"])
# Simulate heartbeats
monitor.record_heartbeat("worker_1")
monitor.record_heartbeat("worker_2")
# Check health
time.sleep(1) # Small delay
status = monitor.check_health()
for agent_id, info in status.items():
print(f"{agent_id}: {info['status']} (uptime: {info['uptime_percentage']:.0f}%)")
# Detect unresponsive
unresponsive = monitor.get_unresponsive_agents()
if unresponsive:
print(f"Unresponsive agents: {unresponsive}")
Pattern 5: The Trust Escalator¶
Problem: New agents should have limited permissions until they prove reliability.
Measured Benefit: 0 incidents from untrusted agent actions (vs. 3 per month without)
from empathy_os import EmpathyOS, get_redis_memory, AccessTier
from dataclasses import dataclass
from typing import Optional
@dataclass
class TrustMetrics:
"""Metrics used to evaluate agent trustworthiness."""
successful_tasks: int = 0
failed_tasks: int = 0
patterns_staged: int = 0
patterns_validated: int = 0
patterns_rejected: int = 0
conflicts_resolved: int = 0
escalations: int = 0
@property
def success_rate(self) -> float:
total = self.successful_tasks + self.failed_tasks
return self.successful_tasks / total if total > 0 else 0
@property
def pattern_quality(self) -> float:
total = self.patterns_validated + self.patterns_rejected
return self.patterns_validated / total if total > 0 else 0
class TrustEscalator:
"""
Manages agent trust levels based on performance.
Promotion criteria:
- Observer -> Contributor: 10+ successful tasks, >80% success rate
- Contributor -> Validator: 50+ tasks, >90% success, >70% pattern quality
"""
PROMOTION_CRITERIA = {
AccessTier.OBSERVER: {
"min_tasks": 10,
"min_success_rate": 0.8,
"next_tier": AccessTier.CONTRIBUTOR
},
AccessTier.CONTRIBUTOR: {
"min_tasks": 50,
"min_success_rate": 0.9,
"min_pattern_quality": 0.7,
"next_tier": AccessTier.VALIDATOR
},
AccessTier.VALIDATOR: {
"min_tasks": 100,
"min_success_rate": 0.95,
"min_pattern_quality": 0.85,
"next_tier": AccessTier.STEWARD
}
}
def __init__(self, memory):
self.memory = memory
self.agents: Dict[str, tuple] = {} # agent_id -> (EmpathyOS, TrustMetrics)
def register_agent(self, agent_id: str) -> EmpathyOS:
"""Register a new agent starting at Observer level."""
agent = EmpathyOS(
agent_id,
short_term_memory=self.memory,
access_tier=AccessTier.OBSERVER
)
self.agents[agent_id] = (agent, TrustMetrics())
return agent
def record_success(self, agent_id: str):
"""Record successful task completion."""
if agent_id in self.agents:
_, metrics = self.agents[agent_id]
metrics.successful_tasks += 1
self._check_promotion(agent_id)
def record_failure(self, agent_id: str):
"""Record failed task."""
if agent_id in self.agents:
_, metrics = self.agents[agent_id]
metrics.failed_tasks += 1
def record_pattern_validated(self, agent_id: str):
"""Record that an agent's staged pattern was validated."""
if agent_id in self.agents:
_, metrics = self.agents[agent_id]
metrics.patterns_validated += 1
self._check_promotion(agent_id)
def _check_promotion(self, agent_id: str) -> Optional[AccessTier]:
"""Check if agent qualifies for promotion."""
agent, metrics = self.agents[agent_id]
current_tier = agent.credentials.tier
if current_tier not in self.PROMOTION_CRITERIA:
return None
criteria = self.PROMOTION_CRITERIA[current_tier]
total_tasks = metrics.successful_tasks + metrics.failed_tasks
# Check criteria
if total_tasks < criteria["min_tasks"]:
return None
if metrics.success_rate < criteria["min_success_rate"]:
return None
if "min_pattern_quality" in criteria:
if metrics.pattern_quality < criteria["min_pattern_quality"]:
return None
# Promote!
new_tier = criteria["next_tier"]
new_agent = EmpathyOS(
agent_id,
short_term_memory=self.memory,
access_tier=new_tier
)
self.agents[agent_id] = (new_agent, metrics)
print(f"PROMOTED: {agent_id} from {current_tier.name} to {new_tier.name}")
return new_tier
def get_status(self, agent_id: str) -> dict:
"""Get current trust status for an agent."""
if agent_id not in self.agents:
return {"error": "Agent not found"}
agent, metrics = self.agents[agent_id]
current_tier = agent.credentials.tier
criteria = self.PROMOTION_CRITERIA.get(current_tier, {})
return {
"agent_id": agent_id,
"current_tier": current_tier.name,
"metrics": {
"successful_tasks": metrics.successful_tasks,
"failed_tasks": metrics.failed_tasks,
"success_rate": f"{metrics.success_rate:.0%}",
"patterns_validated": metrics.patterns_validated,
"pattern_quality": f"{metrics.pattern_quality:.0%}"
},
"promotion_progress": {
"tasks": f"{metrics.successful_tasks + metrics.failed_tasks}/{criteria.get('min_tasks', 'N/A')}",
"success_rate": f"{metrics.success_rate:.0%}/{criteria.get('min_success_rate', 0):.0%}"
}
}
# Usage
memory = get_redis_memory()
escalator = TrustEscalator(memory)
# New agent starts as Observer
agent = escalator.register_agent("new_hire")
print(f"Initial tier: {agent.credentials.tier.name}")
# Simulate work
for i in range(12):
escalator.record_success("new_hire")
status = escalator.get_status("new_hire")
print(f"After 12 successes: {status['current_tier']}")
Summary: Pattern Selection Guide¶
| Scenario | Pattern | Key Benefit |
|---|---|---|
| Sequential review process | Review Pipeline | 3x faster review |
| Agents disagree | Conflict Synthesizer | 68% auto-resolution |
| Building knowledge base | Knowledge Accumulator | 45% pattern reuse |
| Agent reliability | Heartbeat Monitor | 99.5% completion |
| Permission management | Trust Escalator | 0 untrusted incidents |
Next Steps¶
- API Reference: Full class documentation
- Examples: Complete working examples
- Philosophy: Understand the design principles
These patterns are production-tested. Start with Review Pipeline for most teams, add others as needed.