Example: Webhook & Event Integration¶
Difficulty: Intermediate Time: 25 minutes Features: Event bus, webhooks, external integrations Integrations: Slack, GitHub, JIRA, custom webhooks
Overview¶
This example shows how to integrate the Empathy Framework with external systems using: - Event bus: Internal pub/sub system for framework events - Webhooks: HTTP callbacks to external services - Bidirectional integration: Trigger empathy from external events (GitHub PRs, Slack messages) - Real-time notifications: Alert teams instantly about Level 4 predictions
Use Cases: - Notify Slack when high-confidence predictions occur - Create GitHub issues from anticipatory warnings - Trigger JIRA tickets for predicted problems - Send metrics to Datadog/NewRelic - Custom integrations with your tools
Installation¶
Part 1: Event Bus Basics¶
Subscribe to Framework Events¶
from empathy_os import EmpathyOS
from empathy_os.events import EventBus, Event
# Create event bus
bus = EventBus()
# Subscribe to events
@bus.on("pattern_learned")
def handle_pattern_learned(event: Event):
print(f"📚 New pattern learned: {event.data['pattern_name']}")
print(f" Confidence: {event.data['confidence']:.0%}")
print(f" User: {event.data['user_id']}")
@bus.on("level_4_prediction")
def handle_prediction(event: Event):
print(f"🔮 Level 4 Prediction!")
print(f" {event.data['prediction']}")
print(f" Confidence: {event.data['confidence']:.0%}")
@bus.on("trust_milestone")
def handle_trust_milestone(event: Event):
print(f"🎉 Trust milestone reached!")
print(f" User: {event.data['user_id']}")
print(f" Trust level: {event.data['trust_level']:.0%}")
print(f" Milestone: {event.data['milestone']}")
# Create empathy with event bus
empathy = EmpathyOS(
user_id="user_123",
target_level=4,
event_bus=bus # Connect to event bus
)
# Interact (events will fire automatically)
response = empathy.interact(
user_id="user_123",
user_input="Analyze this code for security issues",
context={"code": "SELECT * FROM users WHERE id = " + user_id}
)
# Events emitted:
# 🔮 Level 4 Prediction!
# SQL injection vulnerability detected
# Confidence: 95%
#
# 📚 New pattern learned: sql_injection_detection
# Confidence: 95%
# User: user_123
Part 2: Webhook Notifications¶
Send Events to External Services¶
from empathy_os.webhooks import WebhookManager
from empathy_os.events import EventBus
bus = EventBus()
webhooks = WebhookManager(bus)
# Register Slack webhook
webhooks.register(
event_type="level_4_prediction",
url="https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK",
headers={"Content-Type": "application/json"},
payload_template={
"text": "🔮 *Level 4 Prediction*",
"blocks": [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*Prediction:* {prediction}\n*Confidence:* {confidence:.0%}\n*User:* {user_id}"
}
}
]
}
)
# When Level 4 prediction occurs, Slack gets notified automatically
empathy = EmpathyOS(
user_id="user_123",
target_level=4,
event_bus=bus
)
response = empathy.interact(
user_id="user_123",
user_input="About to deploy API changes to production",
context={
"deployment": "production",
"service": "user-api",
"changes": ["schema_modification", "new_endpoints"]
}
)
# If Level 4 prediction is made, Slack receives:
# 🔮 **Level 4 Prediction**
# Prediction: Schema modification may break mobile app (uses old API contract)
# Confidence: 87%
# User: user_123
Part 3: Multiple Webhook Integrations¶
Notify Multiple Services¶
from empathy_os.webhooks import WebhookManager
from empathy_os.events import EventBus
import os
bus = EventBus()
webhooks = WebhookManager(bus)
# 1. Slack notification
webhooks.register(
event_type="level_4_prediction",
url=os.getenv("SLACK_WEBHOOK_URL"),
payload_template={
"text": "🔮 Prediction: {prediction}",
"username": "Empathy Bot",
"icon_emoji": ":crystal_ball:"
}
)
# 2. Datadog metrics
webhooks.register(
event_type="level_4_prediction",
url="https://api.datadoghq.com/api/v1/events",
headers={
"Content-Type": "application/json",
"DD-API-KEY": os.getenv("DATADOG_API_KEY")
},
payload_template={
"title": "Empathy Level 4 Prediction",
"text": "{prediction}",
"priority": "normal",
"tags": ["empathy:level4", "confidence:{confidence}", "user:{user_id}"],
"alert_type": "info"
}
)
# 3. Custom internal webhook
webhooks.register(
event_type="level_4_prediction",
url="https://internal-api.company.com/webhooks/empathy",
headers={
"Authorization": f"Bearer {os.getenv('INTERNAL_API_TOKEN')}",
"Content-Type": "application/json"
},
payload_template={
"event_type": "prediction",
"data": {
"prediction": "{prediction}",
"confidence": "{confidence}",
"user_id": "{user_id}",
"timestamp": "{timestamp}"
}
}
)
# Single event triggers all 3 webhooks
empathy = EmpathyOS(user_id="user_123", target_level=4, event_bus=bus)
response = empathy.interact(
user_id="user_123",
user_input="Merge this PR",
context={"pr_number": 456, "changes": ["auth_module"]}
)
# All 3 services notified simultaneously:
# ✅ Slack: Team alerted
# ✅ Datadog: Metric recorded
# ✅ Internal API: Custom processing triggered
Part 4: Conditional Webhooks¶
Fire Webhooks Based on Conditions¶
from empathy_os.webhooks import ConditionalWebhook
# Only notify for HIGH confidence predictions (>85%)
webhooks.register_conditional(
event_type="level_4_prediction",
condition=lambda event: event.data['confidence'] > 0.85,
url=os.getenv("SLACK_HIGH_CONFIDENCE_WEBHOOK"),
payload_template={
"text": "⚠️ *HIGH CONFIDENCE PREDICTION* ({confidence:.0%})",
"attachments": [{
"color": "warning",
"text": "{prediction}"
}]
}
)
# Only notify for security-related predictions
webhooks.register_conditional(
event_type="level_4_prediction",
condition=lambda event: "security" in event.data.get('tags', []),
url=os.getenv("SECURITY_TEAM_WEBHOOK"),
payload_template={
"text": "🔒 Security prediction: {prediction}",
"channel": "#security-alerts"
}
)
# Only notify during business hours (9am-5pm)
import datetime
webhooks.register_conditional(
event_type="level_4_prediction",
condition=lambda event: 9 <= datetime.datetime.now().hour < 17,
url=os.getenv("SLACK_BUSINESS_HOURS_WEBHOOK"),
payload_template={"text": "Prediction (business hours): {prediction}"}
)
Part 5: GitHub Integration¶
Create Issues from Predictions¶
from empathy_os.integrations import GitHubIntegration
from empathy_os.events import EventBus
# Setup GitHub integration
github = GitHubIntegration(
token=os.getenv("GITHUB_TOKEN"),
repo="username/repo"
)
bus = EventBus()
# Auto-create GitHub issue for high-severity predictions
@bus.on("level_4_prediction")
async def create_github_issue(event: Event):
if event.data['confidence'] > 0.85:
issue = await github.create_issue(
title=f"🔮 Prediction: {event.data['prediction'][:50]}...",
body=f"""
## Empathy Level 4 Prediction
**Prediction:** {event.data['prediction']}
**Confidence:** {event.data['confidence']:.0%}
**Context:**
- User: {event.data['user_id']}
- Timestamp: {event.data['timestamp']}
**Recommended Action:**
{event.data.get('recommendation', 'Review and address this prediction')}
---
*This issue was automatically created by Empathy Framework*
""",
labels=["empathy-prediction", "needs-review"],
assignees=["tech-lead"]
)
print(f"✅ Created GitHub issue #{issue.number}")
# Connect empathy to GitHub
empathy = EmpathyOS(
user_id="user_123",
target_level=4,
event_bus=bus,
integrations=[github]
)
# Prediction triggers GitHub issue creation
response = empathy.interact(
user_id="user_123",
user_input="Deploying authentication refactor",
context={"deployment": "production"}
)
# If prediction made:
# ✅ Created GitHub issue #789
Part 6: Bidirectional Integration¶
Trigger Empathy from External Events¶
from empathy_os import EmpathyOS
from empathy_os.integrations import GitHubIntegration
github = GitHubIntegration(
token=os.getenv("GITHUB_TOKEN"),
repo="username/repo"
)
empathy = EmpathyOS(
user_id="ci_agent",
target_level=4,
integrations=[github]
)
# Listen for GitHub webhook events
@github.on("pull_request.opened")
async def analyze_pr(pr_data):
"""
When PR is opened, analyze it with Empathy
"""
# Get PR details
pr_number = pr_data['number']
pr_author = pr_data['user']['login']
pr_title = pr_data['title']
files_changed = await github.get_pr_files(pr_number)
# Analyze with Empathy
response = empathy.interact(
user_id=f"github_user_{pr_author}",
user_input=f"Review PR #{pr_number}: {pr_title}",
context={
"pr_number": pr_number,
"files_changed": files_changed,
"diff": await github.get_pr_diff(pr_number)
}
)
# Post analysis as PR comment
await github.comment_on_pr(
pr_number=pr_number,
comment=f"""
## 🤖 Empathy Code Review
{response.response}
---
**Empathy Level:** {response.level}
**Confidence:** {response.confidence:.0%}
"""
)
# If Level 4 prediction, add labels
if response.level == 4 and response.predictions:
await github.add_labels(
pr_number=pr_number,
labels=["⚠️ empathy-prediction", "needs-review"]
)
print(f"✅ Analyzed PR #{pr_number}")
# GitHub sends webhook → Empathy analyzes → Posts comment
# Fully automated code review with Level 4 anticipatory intelligence!
Part 7: Slack Integration¶
Slash Commands¶
from empathy_os import EmpathyOS
from empathy_os.integrations import SlackIntegration
from flask import Flask, request
app = Flask(__name__)
slack = SlackIntegration(
bot_token=os.getenv("SLACK_BOT_TOKEN"),
signing_secret=os.getenv("SLACK_SIGNING_SECRET")
)
empathy = EmpathyOS(
user_id="slack_bot",
target_level=4,
integrations=[slack]
)
@app.route("/slack/commands/empathy", methods=["POST"])
def handle_slack_command():
"""
Handle /empathy slash command in Slack
"""
# Verify Slack signature
if not slack.verify_request(request):
return "Invalid request", 403
# Parse command
data = request.form
user_id = data['user_id']
channel_id = data['channel_id']
text = data['text'] # User's query after /empathy
# Query Empathy
response = empathy.interact(
user_id=f"slack_user_{user_id}",
user_input=text,
context={
"channel_id": channel_id,
"platform": "slack"
}
)
# Send response to Slack
slack.send_message(
channel=channel_id,
text=response.response,
blocks=[
{
"type": "section",
"text": {"type": "mrkdwn", "text": response.response}
},
{
"type": "context",
"elements": [
{
"type": "mrkdwn",
"text": f"Empathy Level {response.level} | Confidence: {response.confidence:.0%}"
}
]
}
]
)
return "", 200
# Usage in Slack:
# /empathy How do I fix this SQL injection?
# → Bot responds with Level 4 anticipatory analysis
Proactive Slack Notifications¶
from empathy_os.integrations import SlackIntegration
import asyncio
slack = SlackIntegration(bot_token=os.getenv("SLACK_BOT_TOKEN"))
# Monitor for patterns and notify team
@empathy.on("pattern_learned")
async def notify_team_of_new_pattern(event: Event):
"""
When AI learns a new pattern, share it with the team
"""
await slack.send_message(
channel="#engineering",
text=f"📚 *New Pattern Learned*",
blocks=[
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"""
*Pattern:* {event.data['pattern_name']}
*Confidence:* {event.data['confidence']:.0%}
*Learn
ed from:* <@{event.data['user_id']}>
This pattern is now available for the whole team! 🎉
"""
}
},
{
"type": "actions",
"elements": [
{
"type": "button",
"text": {"type": "plain_text", "text": "View Pattern"},
"url": f"https://empathy-dashboard.company.com/patterns/{event.data['pattern_id']}"
}
]
}
]
)
Part 8: JIRA Integration¶
Auto-Create Tickets from Predictions¶
from empathy_os.integrations import JIRAIntegration
jira = JIRAIntegration(
url="https://company.atlassian.net",
username=os.getenv("JIRA_USERNAME"),
api_token=os.getenv("JIRA_API_TOKEN"),
project_key="ENG"
)
@bus.on("level_4_prediction")
async def create_jira_ticket(event: Event):
"""
Create JIRA ticket for high-confidence predictions
"""
if event.data['confidence'] > 0.85 and event.data.get('severity') == 'high':
ticket = await jira.create_issue(
project="ENG",
issue_type="Bug" if "bug" in event.data.get('tags', []) else "Task",
summary=f"🔮 Predicted Issue: {event.data['prediction'][:100]}",
description=f"""
h2. Empathy Level 4 Prediction
*Prediction:*
{event.data['prediction']}
*Confidence:* {event.data['confidence']:.0%}
*Context:*
* User: {event.data['user_id']}
* Timestamp: {event.data['timestamp']}
* Tags: {', '.join(event.data.get('tags', []))}
*Recommended Action:*
{event.data.get('recommendation', 'Review and address this prediction')}
---
_This ticket was automatically created by Empathy Framework_
""",
priority="High" if event.data['confidence'] > 0.90 else "Medium",
labels=["empathy-prediction", "ai-generated"],
assignee="tech-lead"
)
print(f"✅ Created JIRA ticket {ticket.key}")
# Prediction → JIRA ticket created automatically
Part 9: Custom Webhook Server¶
Receive Webhooks from Empathy¶
from flask import Flask, request
import json
app = Flask(__name__)
@app.route("/webhooks/empathy", methods=["POST"])
def handle_empathy_webhook():
"""
Receive webhooks from Empathy Framework
"""
# Parse webhook payload
data = request.json
event_type = data.get('event_type')
timestamp = data.get('timestamp')
payload = data.get('data', {})
# Handle different event types
if event_type == "level_4_prediction":
handle_prediction(payload)
elif event_type == "pattern_learned":
handle_pattern_learned(payload)
elif event_type == "trust_milestone":
handle_trust_milestone(payload)
elif event_type == "coordination_request":
handle_coordination_request(payload)
return {"status": "received"}, 200
def handle_prediction(payload):
"""Custom business logic for predictions"""
prediction = payload['prediction']
confidence = payload['confidence']
user_id = payload['user_id']
# Store in database
db.predictions.insert({
"prediction": prediction,
"confidence": confidence,
"user_id": user_id,
"timestamp": datetime.utcnow()
})
# Alert ops team if critical
if confidence > 0.90:
ops_alert_system.send(
severity="high",
message=f"Critical prediction: {prediction}"
)
# Update analytics dashboard
analytics.track_event("empathy_prediction", {
"confidence": confidence,
"user_id": user_id
})
# Start webhook server
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000)
Part 10: Event Types Reference¶
All Available Events¶
# Complete list of Empathy Framework events
EVENT_TYPES = {
# Core interaction events
"interaction_started": {
"data": ["user_id", "user_input", "timestamp"],
"description": "User started interaction"
},
"interaction_completed": {
"data": ["user_id", "response", "level", "confidence", "duration_ms"],
"description": "Interaction completed"
},
# Level transition events
"level_transition": {
"data": ["user_id", "from_level", "to_level", "reason"],
"description": "Empathy level changed"
},
# Level-specific events
"level_1_response": {"description": "Reactive response (Level 1)"},
"level_2_clarification": {"description": "Guided clarification (Level 2)"},
"level_3_proactive_suggestion": {"description": "Proactive suggestion (Level 3)"},
"level_4_prediction": {"description": "Anticipatory prediction (Level 4)"},
"level_5_transformation": {"description": "Transformative framework (Level 5)"},
# Pattern events
"pattern_learned": {
"data": ["pattern_id", "pattern_name", "confidence", "user_id"],
"description": "New pattern learned"
},
"pattern_applied": {
"data": ["pattern_id", "pattern_name", "confidence", "success"],
"description": "Pattern applied to interaction"
},
"pattern_updated": {
"data": ["pattern_id", "old_confidence", "new_confidence"],
"description": "Pattern confidence updated"
},
# Trust events
"trust_increased": {
"data": ["user_id", "old_trust", "new_trust", "delta"],
"description": "Trust level increased"
},
"trust_decreased": {
"data": ["user_id", "old_trust", "new_trust", "delta"],
"description": "Trust level decreased"
},
"trust_milestone": {
"data": ["user_id", "trust_level", "milestone"],
"description": "Trust milestone reached (e.g., 0.5, 0.75, 0.9)"
},
# Coordination events (multi-agent)
"coordination_request": {
"data": ["requesting_agent", "target_agents", "topic", "priority"],
"description": "Agent requested coordination"
},
"conflict_detected": {
"data": ["agent1", "agent2", "resource", "severity"],
"description": "Conflict detected between agents"
},
"handoff_initiated": {
"data": ["from_agent", "to_agent", "task", "context"],
"description": "Task handoff between agents"
},
# Failure/error events
"prediction_failure": {
"data": ["prediction_id", "reason", "confidence"],
"description": "Prediction was incorrect or rejected"
},
"error": {
"data": ["error_type", "error_message", "context"],
"description": "Error occurred during interaction"
}
}
Performance & Best Practices¶
Webhook Performance: - Average latency: ~50-100ms (HTTP POST) - Retry logic: 3 attempts with exponential backoff - Timeout: 5 seconds default - Async delivery: Webhooks don't block interactions
Best Practices: 1. Use conditional webhooks: Don't spam low-value events 2. Batch when possible: Group multiple events into single webhook 3. Monitor failures: Set up alerts for webhook delivery failures 4. Secure endpoints: Use HTTPS + API tokens 5. Idempotency: Make webhook handlers idempotent (handle duplicates)
Security Considerations¶
Webhook Security:
from empathy_os.webhooks import SecureWebhook
# Add HMAC signature verification
webhooks.register(
event_type="level_4_prediction",
url="https://external-service.com/webhook",
secret=os.getenv("WEBHOOK_SECRET"), # HMAC signing key
verify_ssl=True, # Verify SSL certificates
timeout=10, # Request timeout (seconds)
retry_count=3 # Number of retries on failure
)
# Receiving end verifies signature:
import hmac
import hashlib
def verify_webhook_signature(request, secret):
signature = request.headers.get('X-Empathy-Signature')
body = request.get_data()
expected_sig = hmac.new(
secret.encode(),
body,
hashlib.sha256
).hexdigest()
return hmac.compare_digest(signature, expected_sig)
Next Steps¶
Enhance integrations: 1. Add more services: Microsoft Teams, Discord, PagerDuty 2. Custom event types: Define domain-specific events 3. Event filtering: Advanced filtering rules for webhooks 4. Webhook dashboard: Monitor delivery rates, failures 5. Real-time dashboards: Stream events to live dashboard
Related examples: - Multi-Agent Coordination - Coordination events - Adaptive Learning - Adaptation events - SBAR Clinical Handoff - Healthcare events
Troubleshooting¶
"Webhook delivery failed"
- Check URL is reachable: curl https://webhook-url
- Verify SSL certificate if HTTPS
- Check request timeout (increase if needed)
- Review webhook logs: webhooks.get_delivery_logs()
"Events not firing"
- Verify event bus connected: empathy.event_bus is not None
- Check event handler registered: bus.handlers
- Test event manually: bus.emit(Event(type="test", data={}))
"Too many webhook requests"
- Add conditional webhooks (filter low-value events)
- Batch events: batch_size=10, batch_timeout_seconds=5
- Use async webhooks: async_delivery=True
Questions? See Webhook Integration Guide