Short-Term Memory: Redis-Backed Multi-Agent Coordination¶
The Empathy Framework provides a Redis-backed short-term memory system for real-time multi-agent coordination, pattern staging, and collaboration state management.
Overview¶
Short-term memory enables: - Working Memory: Fast TTL-based storage for intermediate results - Pattern Staging: Validate patterns before promotion to the library - Coordination Signals: Real-time communication between agents - Session Management: Collaborative multi-agent sessions - State Persistence: Save/restore collaboration state
Quick Start¶
from empathy_os import EmpathyOS, get_redis_memory, AccessTier
# Get Redis memory (auto-detects Railway, fallback to localhost/mock)
memory = get_redis_memory()
# Create an agent with short-term memory
empathy = EmpathyOS(
user_id="code_reviewer",
short_term_memory=memory,
access_tier=AccessTier.CONTRIBUTOR,
)
# Store working data (expires in 1 hour)
empathy.stash("analysis_results", {"files": 10, "issues": 3})
# Retrieve data
results = empathy.retrieve("analysis_results")
print(results) # {'files': 10, 'issues': 3}
Configuration¶
Environment Variables¶
# Railway Redis (recommended for production)
export REDIS_URL="redis://default:password@host:port"
# Local development
export REDIS_URL="redis://localhost:6379"
# Force mock mode (testing)
export REDIS_URL=""
Programmatic Configuration¶
from empathy_os import get_redis_memory, get_railway_redis, check_redis_connection
# Auto-detect (checks REDIS_URL, falls back to localhost, then mock)
memory = get_redis_memory()
# Explicit Railway Redis
memory = get_railway_redis(
host="centerbeam.proxy.rlwy.net",
port=14516,
password="your_password"
)
# Check connection
if check_redis_connection():
print("Redis available!")
Access Tiers¶
Role-based access control for data integrity:
| Tier | Level | Can Read | Can Write | Can Validate | Can Admin |
|---|---|---|---|---|---|
| OBSERVER | 1 | ✅ | ❌ | ❌ | ❌ |
| CONTRIBUTOR | 2 | ✅ | ✅ | ❌ | ❌ |
| VALIDATOR | 3 | ✅ | ✅ | ✅ | ❌ |
| STEWARD | 4 | ✅ | ✅ | ✅ | ✅ |
from empathy_os import AccessTier
# Observer: Can only read (monitoring dashboards)
empathy = EmpathyOS(user_id="monitor", access_tier=AccessTier.OBSERVER)
# Contributor: Can read/write (most agents)
empathy = EmpathyOS(user_id="analyzer", access_tier=AccessTier.CONTRIBUTOR)
# Validator: Can promote patterns (senior agents)
empathy = EmpathyOS(user_id="senior_reviewer", access_tier=AccessTier.VALIDATOR)
# Steward: Full admin (system operators)
empathy = EmpathyOS(user_id="admin", access_tier=AccessTier.STEWARD)
Working Memory¶
Store and retrieve intermediate results:
# Store data (default 1 hour TTL)
empathy.stash("key", {"any": "data"})
# Retrieve your own data
data = empathy.retrieve("key")
# Retrieve another agent's data
other_data = empathy.retrieve("analysis", agent_id="other_agent")
Pattern Staging¶
Stage discovered patterns for validation before promotion:
from empathy_os import StagedPattern
# Discover and stage a pattern
pattern = StagedPattern(
pattern_id="pat_auth_001",
agent_id=empathy.user_id,
pattern_type="security",
name="JWT Token Refresh",
description="Refresh tokens 5 minutes before expiry",
confidence=0.85,
code="# Example code here"
)
empathy.stage_pattern(pattern)
# Validators can review and promote
staged = empathy.get_staged_patterns()
for p in staged:
print(f"Review: {p.name} (confidence: {p.confidence})")
Coordination Signals¶
Real-time communication between agents:
# Send targeted signal
empathy.send_signal(
signal_type="analysis_complete",
data={"files_analyzed": 10, "issues_found": 3},
target_agent="lead_reviewer"
)
# Broadcast to all
empathy.send_signal(
signal_type="status_update",
data={"phase": "testing"}
)
# Receive signals
signals = empathy.receive_signals("analysis_complete")
for sig in signals:
print(f"From {sig['sender']}: {sig['data']}")
State Persistence¶
Save and restore collaboration state:
# Update state through interactions
empathy.collaboration_state.trust_level = 0.8
empathy.collaboration_state.successful_interventions = 10
# Persist to Redis (survives restarts)
empathy.persist_collaboration_state()
# Later, restore state
empathy.restore_collaboration_state()
print(empathy.collaboration_state.trust_level) # 0.8
Multi-Agent Coordination¶
AgentCoordinator¶
Coordinate tasks across a team of agents:
from empathy_os import AgentCoordinator, AgentTask, get_redis_memory
memory = get_redis_memory()
coordinator = AgentCoordinator(memory, team_id="code_review_team")
# Register agents
coordinator.register_agent("security_agent", ["security_review"])
coordinator.register_agent("performance_agent", ["performance_review"])
# Add tasks
task = AgentTask(
task_id="review_001",
task_type="security_review",
description="Review authentication module",
priority=8
)
coordinator.add_task(task)
# Agents claim and complete tasks
claimed = coordinator.claim_task("security_agent", "security_review")
if claimed:
# Do work...
coordinator.complete_task(claimed.task_id, {"vulnerabilities": 0})
# Aggregate results
results = coordinator.aggregate_results()
print(f"Completed: {results['total_completed']}")
TeamSession¶
Collaborative sessions for multi-agent work:
from empathy_os import TeamSession, get_redis_memory
memory = get_redis_memory()
# Create session
session = TeamSession(
memory,
session_id="pr_review_42",
purpose="Review PR #42"
)
# Add agents
session.add_agent("security_agent")
session.add_agent("performance_agent")
session.add_agent("style_agent")
# Share context
session.share("scope", {"files_changed": 15, "lines_changed": 500})
# Agents retrieve shared context
scope = session.get("scope")
# Signal completion
session.signal("review_complete", {"agent": "security_agent", "passed": True})
Wizards with Redis Memory¶
Wizards automatically support short-term memory:
from empathy_software_plugin.wizards.security_analysis_wizard import SecurityAnalysisWizard
from empathy_os import get_redis_memory
memory = get_redis_memory()
# Create wizard with memory
wizard = SecurityAnalysisWizard(short_term_memory=memory)
# Analysis with automatic caching
result = await wizard.analyze_with_cache({"code": "...", "language": "python"})
# Share context with other wizards
wizard.share_context("security_findings", result["vulnerabilities"])
# Stage discovered patterns
wizard.stage_discovered_pattern(
pattern_id="sec_001",
pattern_type="security",
name="SQL Injection Prevention",
description="Always use parameterized queries",
confidence=0.9
)
TTL Strategies¶
Data expires based on type:
| Type | TTL | Use Case |
|---|---|---|
| WORKING_RESULTS | 1 hour | Analysis results, intermediate data |
| STAGED_PATTERNS | 24 hours | Patterns awaiting validation |
| COORDINATION | 5 minutes | Signals, heartbeats |
| CONFLICT_CONTEXT | 7 days | Unresolved conflicts |
| SESSION | 30 minutes | Active collaboration sessions |
Mock Mode¶
For testing without Redis:
from empathy_os.redis_memory import RedisShortTermMemory
# Explicit mock mode
memory = RedisShortTermMemory(use_mock=True)
# Auto-mock when Redis unavailable
memory = get_redis_memory() # Falls back to mock
print(memory.get_stats()["mode"]) # "mock" or "redis"
Railway Deployment¶
Short-term memory works automatically with Railway:
- Add Redis plugin to your Railway project
- Set
REDIS_URLenvironment variable (auto-set by Railway) - Deploy - memory will auto-connect
# This just works on Railway
memory = get_redis_memory()
# Connects to centerbeam.proxy.rlwy.net:PORT
Best Practices¶
- Use appropriate access tiers - Don't give all agents STEWARD access
- Let TTLs expire - Don't manually clean up; Redis handles it
- Stage before promoting - All patterns should be validated
- Use signals for coordination - Not polling working memory
- Persist state periodically - Every few minutes for critical agents
- Use mock mode for tests - Avoid Redis dependency in CI
Example: Multi-Agent Code Review¶
from empathy_os import (
EmpathyOS, get_redis_memory, AccessTier,
AgentCoordinator, AgentTask, TeamSession
)
memory = get_redis_memory()
# 1. Create coordinator
coordinator = AgentCoordinator(memory, team_id="pr_review")
# 2. Create specialized agents
security = EmpathyOS("security", short_term_memory=memory, access_tier=AccessTier.CONTRIBUTOR)
perf = EmpathyOS("performance", short_term_memory=memory, access_tier=AccessTier.CONTRIBUTOR)
lead = EmpathyOS("lead_reviewer", short_term_memory=memory, access_tier=AccessTier.VALIDATOR)
# 3. Register with coordinator
coordinator.register_agent("security", ["security_review"])
coordinator.register_agent("performance", ["performance_review"])
# 4. Add tasks
coordinator.add_task(AgentTask(
task_id="sec_001", task_type="security_review",
description="Check for vulnerabilities", priority=9
))
coordinator.add_task(AgentTask(
task_id="perf_001", task_type="performance_review",
description="Profile database queries", priority=7
))
# 5. Agents work and signal completion
security.send_signal("review_complete", {"passed": True}, target_agent="lead_reviewer")
perf.send_signal("review_complete", {"issues": 2}, target_agent="lead_reviewer")
# 6. Lead aggregates
signals = lead.receive_signals("review_complete")
print(f"Received {len(signals)} reviews")
API Reference¶
See the full API documentation: - EmpathyOS - Multi-Agent Coordination - Persistence
Copyright 2025 Smart AI Memory, LLC. Licensed under Fair Source 0.9.