Documentation Index
Fetch the complete documentation index at: https://docs.conversimple.com/llms.txt
Use this file to discover all available pages before exploring further.
Overview
Production voice agents typically handle multiple concurrent conversations. Conversimple’s agent architecture is designed for this use case, with each agent instance managing a single conversation session.One Agent Per Conversation
The recommended pattern is one agent instance per conversation:from conversimple import ConversimpleAgent
# DON'T: Share one agent across conversations
agent = MyAgent() # This serves only ONE conversation
# DO: Create an agent per conversation
def handle_new_conversation(conversation_id):
agent = MyAgent(api_key=api_key, customer_id=customer_id)
await agent.start(conversation_id=conversation_id)
- Isolation: Each conversation has its own state
- Fault Tolerance: One conversation crash doesn’t affect others
- Simplicity: No need to manage conversation routing
- Scalability: Python’s async handles thousands of agents
Agent Pool Manager
Manage multiple agent instances effectively:import asyncio
from typing import Dict
from conversimple import ConversimpleAgent
class AgentPoolManager:
"""Manages multiple conversation agents"""
def __init__(self, api_key: str, customer_id: str, agent_class):
self.api_key = api_key
self.customer_id = customer_id
self.agent_class = agent_class
self.active_agents: Dict[str, ConversimpleAgent] = {}
async def start_conversation(self, conversation_id: str) -> ConversimpleAgent:
"""Start a new conversation agent"""
if conversation_id in self.active_agents:
raise ValueError(f"Conversation {conversation_id} already active")
# Create new agent instance
agent = self.agent_class(
api_key=self.api_key,
customer_id=self.customer_id
)
# Start agent
await agent.start(conversation_id=conversation_id)
# Track agent
self.active_agents[conversation_id] = agent
print(f"✅ Started conversation {conversation_id}")
print(f" Active conversations: {len(self.active_agents)}")
return agent
async def end_conversation(self, conversation_id: str):
"""End a conversation and clean up agent"""
if conversation_id not in self.active_agents:
raise ValueError(f"Conversation {conversation_id} not found")
agent = self.active_agents[conversation_id]
# Stop agent
await agent.stop()
# Remove from tracking
del self.active_agents[conversation_id]
print(f"📞 Ended conversation {conversation_id}")
print(f" Active conversations: {len(self.active_agents)}")
def get_agent(self, conversation_id: str) -> ConversimpleAgent:
"""Get agent for a specific conversation"""
return self.active_agents.get(conversation_id)
def get_stats(self) -> dict:
"""Get pool statistics"""
return {
"active_count": len(self.active_agents),
"conversation_ids": list(self.active_agents.keys())
}
async def shutdown(self):
"""Shutdown all agents"""
print(f"Shutting down {len(self.active_agents)} agents...")
# Stop all agents concurrently
await asyncio.gather(*[
agent.stop()
for agent in self.active_agents.values()
])
self.active_agents.clear()
print("✅ All agents shut down")
# Usage
async def main():
manager = AgentPoolManager(
api_key="your-api-key",
customer_id="your-customer-id",
agent_class=MyAgent
)
# Start multiple conversations
conv1 = await manager.start_conversation("conv_1")
conv2 = await manager.start_conversation("conv_2")
conv3 = await manager.start_conversation("conv_3")
# Keep running
try:
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
await manager.shutdown()
Resource Management
Memory Management
Track memory usage per agent:import psutil
import os
class ResourceAwareManager(AgentPoolManager):
def __init__(self, *args, max_memory_mb=1000, **kwargs):
super().__init__(*args, **kwargs)
self.max_memory_mb = max_memory_mb
def get_memory_usage(self) -> float:
"""Get current memory usage in MB"""
process = psutil.Process(os.getpid())
return process.memory_info().rss / 1024 / 1024
async def start_conversation(self, conversation_id: str):
"""Start conversation with memory check"""
current_memory = self.get_memory_usage()
if current_memory > self.max_memory_mb:
raise ResourceError(
f"Memory limit exceeded: {current_memory:.1f}MB / {self.max_memory_mb}MB"
)
return await super().start_conversation(conversation_id)
def get_stats(self) -> dict:
"""Get stats including memory"""
stats = super().get_stats()
stats['memory_mb'] = self.get_memory_usage()
return stats
Conversation Limits
Limit concurrent conversations:class LimitedAgentManager(AgentPoolManager):
def __init__(self, *args, max_conversations=100, **kwargs):
super().__init__(*args, **kwargs)
self.max_conversations = max_conversations
async def start_conversation(self, conversation_id: str):
"""Start conversation with limit check"""
if len(self.active_agents) >= self.max_conversations:
raise CapacityError(
f"Maximum conversations reached: {self.max_conversations}"
)
return await super().start_conversation(conversation_id)
Load Balancing
Round-Robin Distribution
Distribute conversations across multiple instances:from itertools import cycle
class LoadBalancedManager:
"""Distribute conversations across multiple manager instances"""
def __init__(self, managers: list):
self.managers = managers
self.manager_cycle = cycle(managers)
async def start_conversation(self, conversation_id: str):
"""Start conversation on next available manager"""
# Get next manager in round-robin fashion
manager = next(self.manager_cycle)
try:
return await manager.start_conversation(conversation_id)
except CapacityError:
# Try other managers
for other_manager in self.managers:
if other_manager != manager:
try:
return await other_manager.start_conversation(conversation_id)
except CapacityError:
continue
raise CapacityError("All managers at capacity")
# Usage
async def main():
# Create multiple manager instances
managers = [
AgentPoolManager(api_key, customer_id, MyAgent, max_conversations=50)
for _ in range(4) # 4 managers, 200 total capacity
]
load_balancer = LoadBalancedManager(managers)
# Distribute conversations automatically
await load_balancer.start_conversation("conv_1")
await load_balancer.start_conversation("conv_2")
Least-Loaded Distribution
Route to the manager with fewest active conversations:class LeastLoadedManager(LoadBalancedManager):
"""Route to manager with least load"""
async def start_conversation(self, conversation_id: str):
"""Start on least loaded manager"""
# Find manager with fewest active conversations
manager = min(
self.managers,
key=lambda m: len(m.active_agents)
)
return await manager.start_conversation(conversation_id)
Conversation Routing
Route by Customer
Route customers to specific agents:class CustomerRoutedManager:
"""Route conversations based on customer ID"""
def __init__(self, managers: dict):
self.managers = managers # {"segment": manager}
def get_customer_segment(self, customer_id: str) -> str:
"""Determine customer segment"""
# Example: VIP customers get dedicated resources
if customer_id.startswith("vip_"):
return "vip"
return "standard"
async def start_conversation(self, conversation_id: str, customer_id: str):
"""Route based on customer"""
segment = self.get_customer_segment(customer_id)
manager = self.managers.get(segment, self.managers["standard"])
return await manager.start_conversation(conversation_id)
# Usage
router = CustomerRoutedManager({
"vip": AgentPoolManager(..., max_conversations=20),
"standard": AgentPoolManager(..., max_conversations=100)
})
Monitoring and Health Checks
Health Check Endpoint
Expose agent pool health:from fastapi import FastAPI
app = FastAPI()
manager = AgentPoolManager(...)
@app.get("/health")
async def health_check():
"""Health check endpoint"""
stats = manager.get_stats()
return {
"status": "healthy" if stats["active_count"] < 100 else "degraded",
"active_conversations": stats["active_count"],
"memory_mb": stats.get("memory_mb", 0),
"capacity_remaining": 100 - stats["active_count"]
}
@app.get("/metrics")
async def metrics():
"""Detailed metrics"""
return {
"conversations": manager.get_stats(),
"uptime_seconds": time.time() - start_time
}
Alerting
Set up alerts for capacity issues:class MonitoredManager(AgentPoolManager):
def __init__(self, *args, alert_threshold=80, **kwargs):
super().__init__(*args, **kwargs)
self.alert_threshold = alert_threshold
self.alert_sent = False
async def start_conversation(self, conversation_id: str):
"""Start with capacity monitoring"""
result = await super().start_conversation(conversation_id)
# Check capacity
capacity_pct = (len(self.active_agents) / self.max_conversations) * 100
if capacity_pct > self.alert_threshold and not self.alert_sent:
await self.send_alert(
f"Capacity at {capacity_pct:.1f}% ({len(self.active_agents)}/{self.max_conversations})"
)
self.alert_sent = True
elif capacity_pct < self.alert_threshold:
self.alert_sent = False
return result
async def send_alert(self, message: str):
"""Send alert (implement based on your system)"""
print(f"🚨 ALERT: {message}")
# Send to Slack, PagerDuty, etc.
Graceful Shutdown
Handle shutdown without dropping conversations:class GracefulManager(AgentPoolManager):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.shutting_down = False
async def start_conversation(self, conversation_id: str):
"""Prevent new conversations during shutdown"""
if self.shutting_down:
raise ServiceUnavailable("Service is shutting down")
return await super().start_conversation(conversation_id)
async def graceful_shutdown(self, timeout: int = 30):
"""Wait for conversations to end, then shutdown"""
print(f"Starting graceful shutdown ({len(self.active_agents)} active)")
self.shutting_down = True
# Wait for conversations to end naturally
start_time = time.time()
while self.active_agents and (time.time() - start_time) < timeout:
print(f"Waiting for {len(self.active_agents)} conversations...")
await asyncio.sleep(5)
# Force shutdown remaining
if self.active_agents:
print(f"Force stopping {len(self.active_agents)} conversations")
await self.shutdown()
print("✅ Graceful shutdown complete")
Best Practices
1. Set Appropriate Limits
# Start conservative
manager = AgentPoolManager(
max_conversations=50, # Limit per instance
max_memory_mb=1000 # 1GB memory limit
)
2. Monitor Resource Usage
# Log regularly
async def monitor_loop():
while True:
stats = manager.get_stats()
logger.info(f"Active: {stats['active_count']}, Memory: {stats['memory_mb']:.1f}MB")
await asyncio.sleep(60)
3. Handle Cleanup Properly
# Always clean up ended conversations
def on_conversation_ended(self, conversation_id: str):
# Remove from tracking
self.conversations.pop(conversation_id, None)
# Clear any cached data
self.cache.clear(conversation_id)
Next Steps
State Management
Advanced state management patterns
Scaling
Scale to thousands of conversations