Skip to main content

Overview

Production voice agents typically handle multiple concurrent conversations. Conversimple’s agent architecture is designed for this use case, with each agent instance managing a single conversation session.

One Agent Per Conversation

The recommended pattern is one agent instance per conversation:
from conversimple import ConversimpleAgent

# DON'T: Share one agent across conversations
agent = MyAgent()  # This serves only ONE conversation

# DO: Create an agent per conversation
def handle_new_conversation(conversation_id):
    agent = MyAgent(api_key=api_key, customer_id=customer_id)
    await agent.start(conversation_id=conversation_id)
###Why This Pattern?
  • Isolation: Each conversation has its own state
  • Fault Tolerance: One conversation crash doesn’t affect others
  • Simplicity: No need to manage conversation routing
  • Scalability: Python’s async handles thousands of agents

Agent Pool Manager

Manage multiple agent instances effectively:
import asyncio
from typing import Dict
from conversimple import ConversimpleAgent

class AgentPoolManager:
    """Manages multiple conversation agents"""

    def __init__(self, api_key: str, customer_id: str, agent_class):
        self.api_key = api_key
        self.customer_id = customer_id
        self.agent_class = agent_class
        self.active_agents: Dict[str, ConversimpleAgent] = {}

    async def start_conversation(self, conversation_id: str) -> ConversimpleAgent:
        """Start a new conversation agent"""
        if conversation_id in self.active_agents:
            raise ValueError(f"Conversation {conversation_id} already active")

        # Create new agent instance
        agent = self.agent_class(
            api_key=self.api_key,
            customer_id=self.customer_id
        )

        # Start agent
        await agent.start(conversation_id=conversation_id)

        # Track agent
        self.active_agents[conversation_id] = agent

        print(f"✅ Started conversation {conversation_id}")
        print(f"   Active conversations: {len(self.active_agents)}")

        return agent

    async def end_conversation(self, conversation_id: str):
        """End a conversation and clean up agent"""
        if conversation_id not in self.active_agents:
            raise ValueError(f"Conversation {conversation_id} not found")

        agent = self.active_agents[conversation_id]

        # Stop agent
        await agent.stop()

        # Remove from tracking
        del self.active_agents[conversation_id]

        print(f"📞 Ended conversation {conversation_id}")
        print(f"   Active conversations: {len(self.active_agents)}")

    def get_agent(self, conversation_id: str) -> ConversimpleAgent:
        """Get agent for a specific conversation"""
        return self.active_agents.get(conversation_id)

    def get_stats(self) -> dict:
        """Get pool statistics"""
        return {
            "active_count": len(self.active_agents),
            "conversation_ids": list(self.active_agents.keys())
        }

    async def shutdown(self):
        """Shutdown all agents"""
        print(f"Shutting down {len(self.active_agents)} agents...")

        # Stop all agents concurrently
        await asyncio.gather(*[
            agent.stop()
            for agent in self.active_agents.values()
        ])

        self.active_agents.clear()
        print("✅ All agents shut down")

# Usage
async def main():
    manager = AgentPoolManager(
        api_key="your-api-key",
        customer_id="your-customer-id",
        agent_class=MyAgent
    )

    # Start multiple conversations
    conv1 = await manager.start_conversation("conv_1")
    conv2 = await manager.start_conversation("conv_2")
    conv3 = await manager.start_conversation("conv_3")

    # Keep running
    try:
        while True:
            await asyncio.sleep(1)
    except KeyboardInterrupt:
        await manager.shutdown()

Resource Management

Memory Management

Track memory usage per agent:
import psutil
import os

class ResourceAwareManager(AgentPoolManager):
    def __init__(self, *args, max_memory_mb=1000, **kwargs):
        super().__init__(*args, **kwargs)
        self.max_memory_mb = max_memory_mb

    def get_memory_usage(self) -> float:
        """Get current memory usage in MB"""
        process = psutil.Process(os.getpid())
        return process.memory_info().rss / 1024 / 1024

    async def start_conversation(self, conversation_id: str):
        """Start conversation with memory check"""
        current_memory = self.get_memory_usage()

        if current_memory > self.max_memory_mb:
            raise ResourceError(
                f"Memory limit exceeded: {current_memory:.1f}MB / {self.max_memory_mb}MB"
            )

        return await super().start_conversation(conversation_id)

    def get_stats(self) -> dict:
        """Get stats including memory"""
        stats = super().get_stats()
        stats['memory_mb'] = self.get_memory_usage()
        return stats

Conversation Limits

Limit concurrent conversations:
class LimitedAgentManager(AgentPoolManager):
    def __init__(self, *args, max_conversations=100, **kwargs):
        super().__init__(*args, **kwargs)
        self.max_conversations = max_conversations

    async def start_conversation(self, conversation_id: str):
        """Start conversation with limit check"""
        if len(self.active_agents) >= self.max_conversations:
            raise CapacityError(
                f"Maximum conversations reached: {self.max_conversations}"
            )

        return await super().start_conversation(conversation_id)

Load Balancing

Round-Robin Distribution

Distribute conversations across multiple instances:
from itertools import cycle

class LoadBalancedManager:
    """Distribute conversations across multiple manager instances"""

    def __init__(self, managers: list):
        self.managers = managers
        self.manager_cycle = cycle(managers)

    async def start_conversation(self, conversation_id: str):
        """Start conversation on next available manager"""
        # Get next manager in round-robin fashion
        manager = next(self.manager_cycle)

        try:
            return await manager.start_conversation(conversation_id)
        except CapacityError:
            # Try other managers
            for other_manager in self.managers:
                if other_manager != manager:
                    try:
                        return await other_manager.start_conversation(conversation_id)
                    except CapacityError:
                        continue

            raise CapacityError("All managers at capacity")

# Usage
async def main():
    # Create multiple manager instances
    managers = [
        AgentPoolManager(api_key, customer_id, MyAgent, max_conversations=50)
        for _ in range(4)  # 4 managers, 200 total capacity
    ]

    load_balancer = LoadBalancedManager(managers)

    # Distribute conversations automatically
    await load_balancer.start_conversation("conv_1")
    await load_balancer.start_conversation("conv_2")

Least-Loaded Distribution

Route to the manager with fewest active conversations:
class LeastLoadedManager(LoadBalancedManager):
    """Route to manager with least load"""

    async def start_conversation(self, conversation_id: str):
        """Start on least loaded manager"""
        # Find manager with fewest active conversations
        manager = min(
            self.managers,
            key=lambda m: len(m.active_agents)
        )

        return await manager.start_conversation(conversation_id)

Conversation Routing

Route by Customer

Route customers to specific agents:
class CustomerRoutedManager:
    """Route conversations based on customer ID"""

    def __init__(self, managers: dict):
        self.managers = managers  # {"segment": manager}

    def get_customer_segment(self, customer_id: str) -> str:
        """Determine customer segment"""
        # Example: VIP customers get dedicated resources
        if customer_id.startswith("vip_"):
            return "vip"
        return "standard"

    async def start_conversation(self, conversation_id: str, customer_id: str):
        """Route based on customer"""
        segment = self.get_customer_segment(customer_id)
        manager = self.managers.get(segment, self.managers["standard"])

        return await manager.start_conversation(conversation_id)

# Usage
router = CustomerRoutedManager({
    "vip": AgentPoolManager(..., max_conversations=20),
    "standard": AgentPoolManager(..., max_conversations=100)
})

Monitoring and Health Checks

Health Check Endpoint

Expose agent pool health:
from fastapi import FastAPI

app = FastAPI()
manager = AgentPoolManager(...)

@app.get("/health")
async def health_check():
    """Health check endpoint"""
    stats = manager.get_stats()

    return {
        "status": "healthy" if stats["active_count"] < 100 else "degraded",
        "active_conversations": stats["active_count"],
        "memory_mb": stats.get("memory_mb", 0),
        "capacity_remaining": 100 - stats["active_count"]
    }

@app.get("/metrics")
async def metrics():
    """Detailed metrics"""
    return {
        "conversations": manager.get_stats(),
        "uptime_seconds": time.time() - start_time
    }

Alerting

Set up alerts for capacity issues:
class MonitoredManager(AgentPoolManager):
    def __init__(self, *args, alert_threshold=80, **kwargs):
        super().__init__(*args, **kwargs)
        self.alert_threshold = alert_threshold
        self.alert_sent = False

    async def start_conversation(self, conversation_id: str):
        """Start with capacity monitoring"""
        result = await super().start_conversation(conversation_id)

        # Check capacity
        capacity_pct = (len(self.active_agents) / self.max_conversations) * 100

        if capacity_pct > self.alert_threshold and not self.alert_sent:
            await self.send_alert(
                f"Capacity at {capacity_pct:.1f}% ({len(self.active_agents)}/{self.max_conversations})"
            )
            self.alert_sent = True
        elif capacity_pct < self.alert_threshold:
            self.alert_sent = False

        return result

    async def send_alert(self, message: str):
        """Send alert (implement based on your system)"""
        print(f"🚨 ALERT: {message}")
        # Send to Slack, PagerDuty, etc.

Graceful Shutdown

Handle shutdown without dropping conversations:
class GracefulManager(AgentPoolManager):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.shutting_down = False

    async def start_conversation(self, conversation_id: str):
        """Prevent new conversations during shutdown"""
        if self.shutting_down:
            raise ServiceUnavailable("Service is shutting down")

        return await super().start_conversation(conversation_id)

    async def graceful_shutdown(self, timeout: int = 30):
        """Wait for conversations to end, then shutdown"""
        print(f"Starting graceful shutdown ({len(self.active_agents)} active)")
        self.shutting_down = True

        # Wait for conversations to end naturally
        start_time = time.time()
        while self.active_agents and (time.time() - start_time) < timeout:
            print(f"Waiting for {len(self.active_agents)} conversations...")
            await asyncio.sleep(5)

        # Force shutdown remaining
        if self.active_agents:
            print(f"Force stopping {len(self.active_agents)} conversations")
            await self.shutdown()

        print("✅ Graceful shutdown complete")

Best Practices

1. Set Appropriate Limits

# Start conservative
manager = AgentPoolManager(
    max_conversations=50,  # Limit per instance
    max_memory_mb=1000     # 1GB memory limit
)

2. Monitor Resource Usage

# Log regularly
async def monitor_loop():
    while True:
        stats = manager.get_stats()
        logger.info(f"Active: {stats['active_count']}, Memory: {stats['memory_mb']:.1f}MB")
        await asyncio.sleep(60)

3. Handle Cleanup Properly

# Always clean up ended conversations
def on_conversation_ended(self, conversation_id: str):
    # Remove from tracking
    self.conversations.pop(conversation_id, None)
    # Clear any cached data
    self.cache.clear(conversation_id)

Next Steps