Overview
Production voice agents typically handle multiple concurrent conversations. Conversimple’s agent architecture is designed for this use case, with each agent instance managing a single conversation session.One Agent Per Conversation
The recommended pattern is one agent instance per conversation:Copy
from conversimple import ConversimpleAgent
# DON'T: Share one agent across conversations
agent = MyAgent() # This serves only ONE conversation
# DO: Create an agent per conversation
def handle_new_conversation(conversation_id):
agent = MyAgent(api_key=api_key, customer_id=customer_id)
await agent.start(conversation_id=conversation_id)
- Isolation: Each conversation has its own state
- Fault Tolerance: One conversation crash doesn’t affect others
- Simplicity: No need to manage conversation routing
- Scalability: Python’s async handles thousands of agents
Agent Pool Manager
Manage multiple agent instances effectively:Copy
import asyncio
from typing import Dict
from conversimple import ConversimpleAgent
class AgentPoolManager:
"""Manages multiple conversation agents"""
def __init__(self, api_key: str, customer_id: str, agent_class):
self.api_key = api_key
self.customer_id = customer_id
self.agent_class = agent_class
self.active_agents: Dict[str, ConversimpleAgent] = {}
async def start_conversation(self, conversation_id: str) -> ConversimpleAgent:
"""Start a new conversation agent"""
if conversation_id in self.active_agents:
raise ValueError(f"Conversation {conversation_id} already active")
# Create new agent instance
agent = self.agent_class(
api_key=self.api_key,
customer_id=self.customer_id
)
# Start agent
await agent.start(conversation_id=conversation_id)
# Track agent
self.active_agents[conversation_id] = agent
print(f"✅ Started conversation {conversation_id}")
print(f" Active conversations: {len(self.active_agents)}")
return agent
async def end_conversation(self, conversation_id: str):
"""End a conversation and clean up agent"""
if conversation_id not in self.active_agents:
raise ValueError(f"Conversation {conversation_id} not found")
agent = self.active_agents[conversation_id]
# Stop agent
await agent.stop()
# Remove from tracking
del self.active_agents[conversation_id]
print(f"📞 Ended conversation {conversation_id}")
print(f" Active conversations: {len(self.active_agents)}")
def get_agent(self, conversation_id: str) -> ConversimpleAgent:
"""Get agent for a specific conversation"""
return self.active_agents.get(conversation_id)
def get_stats(self) -> dict:
"""Get pool statistics"""
return {
"active_count": len(self.active_agents),
"conversation_ids": list(self.active_agents.keys())
}
async def shutdown(self):
"""Shutdown all agents"""
print(f"Shutting down {len(self.active_agents)} agents...")
# Stop all agents concurrently
await asyncio.gather(*[
agent.stop()
for agent in self.active_agents.values()
])
self.active_agents.clear()
print("✅ All agents shut down")
# Usage
async def main():
manager = AgentPoolManager(
api_key="your-api-key",
customer_id="your-customer-id",
agent_class=MyAgent
)
# Start multiple conversations
conv1 = await manager.start_conversation("conv_1")
conv2 = await manager.start_conversation("conv_2")
conv3 = await manager.start_conversation("conv_3")
# Keep running
try:
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
await manager.shutdown()
Resource Management
Memory Management
Track memory usage per agent:Copy
import psutil
import os
class ResourceAwareManager(AgentPoolManager):
def __init__(self, *args, max_memory_mb=1000, **kwargs):
super().__init__(*args, **kwargs)
self.max_memory_mb = max_memory_mb
def get_memory_usage(self) -> float:
"""Get current memory usage in MB"""
process = psutil.Process(os.getpid())
return process.memory_info().rss / 1024 / 1024
async def start_conversation(self, conversation_id: str):
"""Start conversation with memory check"""
current_memory = self.get_memory_usage()
if current_memory > self.max_memory_mb:
raise ResourceError(
f"Memory limit exceeded: {current_memory:.1f}MB / {self.max_memory_mb}MB"
)
return await super().start_conversation(conversation_id)
def get_stats(self) -> dict:
"""Get stats including memory"""
stats = super().get_stats()
stats['memory_mb'] = self.get_memory_usage()
return stats
Conversation Limits
Limit concurrent conversations:Copy
class LimitedAgentManager(AgentPoolManager):
def __init__(self, *args, max_conversations=100, **kwargs):
super().__init__(*args, **kwargs)
self.max_conversations = max_conversations
async def start_conversation(self, conversation_id: str):
"""Start conversation with limit check"""
if len(self.active_agents) >= self.max_conversations:
raise CapacityError(
f"Maximum conversations reached: {self.max_conversations}"
)
return await super().start_conversation(conversation_id)
Load Balancing
Round-Robin Distribution
Distribute conversations across multiple instances:Copy
from itertools import cycle
class LoadBalancedManager:
"""Distribute conversations across multiple manager instances"""
def __init__(self, managers: list):
self.managers = managers
self.manager_cycle = cycle(managers)
async def start_conversation(self, conversation_id: str):
"""Start conversation on next available manager"""
# Get next manager in round-robin fashion
manager = next(self.manager_cycle)
try:
return await manager.start_conversation(conversation_id)
except CapacityError:
# Try other managers
for other_manager in self.managers:
if other_manager != manager:
try:
return await other_manager.start_conversation(conversation_id)
except CapacityError:
continue
raise CapacityError("All managers at capacity")
# Usage
async def main():
# Create multiple manager instances
managers = [
AgentPoolManager(api_key, customer_id, MyAgent, max_conversations=50)
for _ in range(4) # 4 managers, 200 total capacity
]
load_balancer = LoadBalancedManager(managers)
# Distribute conversations automatically
await load_balancer.start_conversation("conv_1")
await load_balancer.start_conversation("conv_2")
Least-Loaded Distribution
Route to the manager with fewest active conversations:Copy
class LeastLoadedManager(LoadBalancedManager):
"""Route to manager with least load"""
async def start_conversation(self, conversation_id: str):
"""Start on least loaded manager"""
# Find manager with fewest active conversations
manager = min(
self.managers,
key=lambda m: len(m.active_agents)
)
return await manager.start_conversation(conversation_id)
Conversation Routing
Route by Customer
Route customers to specific agents:Copy
class CustomerRoutedManager:
"""Route conversations based on customer ID"""
def __init__(self, managers: dict):
self.managers = managers # {"segment": manager}
def get_customer_segment(self, customer_id: str) -> str:
"""Determine customer segment"""
# Example: VIP customers get dedicated resources
if customer_id.startswith("vip_"):
return "vip"
return "standard"
async def start_conversation(self, conversation_id: str, customer_id: str):
"""Route based on customer"""
segment = self.get_customer_segment(customer_id)
manager = self.managers.get(segment, self.managers["standard"])
return await manager.start_conversation(conversation_id)
# Usage
router = CustomerRoutedManager({
"vip": AgentPoolManager(..., max_conversations=20),
"standard": AgentPoolManager(..., max_conversations=100)
})
Monitoring and Health Checks
Health Check Endpoint
Expose agent pool health:Copy
from fastapi import FastAPI
app = FastAPI()
manager = AgentPoolManager(...)
@app.get("/health")
async def health_check():
"""Health check endpoint"""
stats = manager.get_stats()
return {
"status": "healthy" if stats["active_count"] < 100 else "degraded",
"active_conversations": stats["active_count"],
"memory_mb": stats.get("memory_mb", 0),
"capacity_remaining": 100 - stats["active_count"]
}
@app.get("/metrics")
async def metrics():
"""Detailed metrics"""
return {
"conversations": manager.get_stats(),
"uptime_seconds": time.time() - start_time
}
Alerting
Set up alerts for capacity issues:Copy
class MonitoredManager(AgentPoolManager):
def __init__(self, *args, alert_threshold=80, **kwargs):
super().__init__(*args, **kwargs)
self.alert_threshold = alert_threshold
self.alert_sent = False
async def start_conversation(self, conversation_id: str):
"""Start with capacity monitoring"""
result = await super().start_conversation(conversation_id)
# Check capacity
capacity_pct = (len(self.active_agents) / self.max_conversations) * 100
if capacity_pct > self.alert_threshold and not self.alert_sent:
await self.send_alert(
f"Capacity at {capacity_pct:.1f}% ({len(self.active_agents)}/{self.max_conversations})"
)
self.alert_sent = True
elif capacity_pct < self.alert_threshold:
self.alert_sent = False
return result
async def send_alert(self, message: str):
"""Send alert (implement based on your system)"""
print(f"🚨 ALERT: {message}")
# Send to Slack, PagerDuty, etc.
Graceful Shutdown
Handle shutdown without dropping conversations:Copy
class GracefulManager(AgentPoolManager):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.shutting_down = False
async def start_conversation(self, conversation_id: str):
"""Prevent new conversations during shutdown"""
if self.shutting_down:
raise ServiceUnavailable("Service is shutting down")
return await super().start_conversation(conversation_id)
async def graceful_shutdown(self, timeout: int = 30):
"""Wait for conversations to end, then shutdown"""
print(f"Starting graceful shutdown ({len(self.active_agents)} active)")
self.shutting_down = True
# Wait for conversations to end naturally
start_time = time.time()
while self.active_agents and (time.time() - start_time) < timeout:
print(f"Waiting for {len(self.active_agents)} conversations...")
await asyncio.sleep(5)
# Force shutdown remaining
if self.active_agents:
print(f"Force stopping {len(self.active_agents)} conversations")
await self.shutdown()
print("✅ Graceful shutdown complete")
Best Practices
1. Set Appropriate Limits
Copy
# Start conservative
manager = AgentPoolManager(
max_conversations=50, # Limit per instance
max_memory_mb=1000 # 1GB memory limit
)
2. Monitor Resource Usage
Copy
# Log regularly
async def monitor_loop():
while True:
stats = manager.get_stats()
logger.info(f"Active: {stats['active_count']}, Memory: {stats['memory_mb']:.1f}MB")
await asyncio.sleep(60)
3. Handle Cleanup Properly
Copy
# Always clean up ended conversations
def on_conversation_ended(self, conversation_id: str):
# Remove from tracking
self.conversations.pop(conversation_id, None)
# Clear any cached data
self.cache.clear(conversation_id)