Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.conversimple.com/llms.txt

Use this file to discover all available pages before exploring further.

Overview

Production voice agents typically handle multiple concurrent conversations. Conversimple’s agent architecture is designed for this use case, with each agent instance managing a single conversation session.

One Agent Per Conversation

The recommended pattern is one agent instance per conversation:
from conversimple import ConversimpleAgent

# DON'T: Share one agent across conversations
agent = MyAgent()  # This serves only ONE conversation

# DO: Create an agent per conversation
def handle_new_conversation(conversation_id):
    agent = MyAgent(api_key=api_key, customer_id=customer_id)
    await agent.start(conversation_id=conversation_id)
###Why This Pattern?
  • Isolation: Each conversation has its own state
  • Fault Tolerance: One conversation crash doesn’t affect others
  • Simplicity: No need to manage conversation routing
  • Scalability: Python’s async handles thousands of agents

Agent Pool Manager

Manage multiple agent instances effectively:
import asyncio
from typing import Dict
from conversimple import ConversimpleAgent

class AgentPoolManager:
    """Manages multiple conversation agents"""

    def __init__(self, api_key: str, customer_id: str, agent_class):
        self.api_key = api_key
        self.customer_id = customer_id
        self.agent_class = agent_class
        self.active_agents: Dict[str, ConversimpleAgent] = {}

    async def start_conversation(self, conversation_id: str) -> ConversimpleAgent:
        """Start a new conversation agent"""
        if conversation_id in self.active_agents:
            raise ValueError(f"Conversation {conversation_id} already active")

        # Create new agent instance
        agent = self.agent_class(
            api_key=self.api_key,
            customer_id=self.customer_id
        )

        # Start agent
        await agent.start(conversation_id=conversation_id)

        # Track agent
        self.active_agents[conversation_id] = agent

        print(f"✅ Started conversation {conversation_id}")
        print(f"   Active conversations: {len(self.active_agents)}")

        return agent

    async def end_conversation(self, conversation_id: str):
        """End a conversation and clean up agent"""
        if conversation_id not in self.active_agents:
            raise ValueError(f"Conversation {conversation_id} not found")

        agent = self.active_agents[conversation_id]

        # Stop agent
        await agent.stop()

        # Remove from tracking
        del self.active_agents[conversation_id]

        print(f"📞 Ended conversation {conversation_id}")
        print(f"   Active conversations: {len(self.active_agents)}")

    def get_agent(self, conversation_id: str) -> ConversimpleAgent:
        """Get agent for a specific conversation"""
        return self.active_agents.get(conversation_id)

    def get_stats(self) -> dict:
        """Get pool statistics"""
        return {
            "active_count": len(self.active_agents),
            "conversation_ids": list(self.active_agents.keys())
        }

    async def shutdown(self):
        """Shutdown all agents"""
        print(f"Shutting down {len(self.active_agents)} agents...")

        # Stop all agents concurrently
        await asyncio.gather(*[
            agent.stop()
            for agent in self.active_agents.values()
        ])

        self.active_agents.clear()
        print("✅ All agents shut down")

# Usage
async def main():
    manager = AgentPoolManager(
        api_key="your-api-key",
        customer_id="your-customer-id",
        agent_class=MyAgent
    )

    # Start multiple conversations
    conv1 = await manager.start_conversation("conv_1")
    conv2 = await manager.start_conversation("conv_2")
    conv3 = await manager.start_conversation("conv_3")

    # Keep running
    try:
        while True:
            await asyncio.sleep(1)
    except KeyboardInterrupt:
        await manager.shutdown()

Resource Management

Memory Management

Track memory usage per agent:
import psutil
import os

class ResourceAwareManager(AgentPoolManager):
    def __init__(self, *args, max_memory_mb=1000, **kwargs):
        super().__init__(*args, **kwargs)
        self.max_memory_mb = max_memory_mb

    def get_memory_usage(self) -> float:
        """Get current memory usage in MB"""
        process = psutil.Process(os.getpid())
        return process.memory_info().rss / 1024 / 1024

    async def start_conversation(self, conversation_id: str):
        """Start conversation with memory check"""
        current_memory = self.get_memory_usage()

        if current_memory > self.max_memory_mb:
            raise ResourceError(
                f"Memory limit exceeded: {current_memory:.1f}MB / {self.max_memory_mb}MB"
            )

        return await super().start_conversation(conversation_id)

    def get_stats(self) -> dict:
        """Get stats including memory"""
        stats = super().get_stats()
        stats['memory_mb'] = self.get_memory_usage()
        return stats

Conversation Limits

Limit concurrent conversations:
class LimitedAgentManager(AgentPoolManager):
    def __init__(self, *args, max_conversations=100, **kwargs):
        super().__init__(*args, **kwargs)
        self.max_conversations = max_conversations

    async def start_conversation(self, conversation_id: str):
        """Start conversation with limit check"""
        if len(self.active_agents) >= self.max_conversations:
            raise CapacityError(
                f"Maximum conversations reached: {self.max_conversations}"
            )

        return await super().start_conversation(conversation_id)

Load Balancing

Round-Robin Distribution

Distribute conversations across multiple instances:
from itertools import cycle

class LoadBalancedManager:
    """Distribute conversations across multiple manager instances"""

    def __init__(self, managers: list):
        self.managers = managers
        self.manager_cycle = cycle(managers)

    async def start_conversation(self, conversation_id: str):
        """Start conversation on next available manager"""
        # Get next manager in round-robin fashion
        manager = next(self.manager_cycle)

        try:
            return await manager.start_conversation(conversation_id)
        except CapacityError:
            # Try other managers
            for other_manager in self.managers:
                if other_manager != manager:
                    try:
                        return await other_manager.start_conversation(conversation_id)
                    except CapacityError:
                        continue

            raise CapacityError("All managers at capacity")

# Usage
async def main():
    # Create multiple manager instances
    managers = [
        AgentPoolManager(api_key, customer_id, MyAgent, max_conversations=50)
        for _ in range(4)  # 4 managers, 200 total capacity
    ]

    load_balancer = LoadBalancedManager(managers)

    # Distribute conversations automatically
    await load_balancer.start_conversation("conv_1")
    await load_balancer.start_conversation("conv_2")

Least-Loaded Distribution

Route to the manager with fewest active conversations:
class LeastLoadedManager(LoadBalancedManager):
    """Route to manager with least load"""

    async def start_conversation(self, conversation_id: str):
        """Start on least loaded manager"""
        # Find manager with fewest active conversations
        manager = min(
            self.managers,
            key=lambda m: len(m.active_agents)
        )

        return await manager.start_conversation(conversation_id)

Conversation Routing

Route by Customer

Route customers to specific agents:
class CustomerRoutedManager:
    """Route conversations based on customer ID"""

    def __init__(self, managers: dict):
        self.managers = managers  # {"segment": manager}

    def get_customer_segment(self, customer_id: str) -> str:
        """Determine customer segment"""
        # Example: VIP customers get dedicated resources
        if customer_id.startswith("vip_"):
            return "vip"
        return "standard"

    async def start_conversation(self, conversation_id: str, customer_id: str):
        """Route based on customer"""
        segment = self.get_customer_segment(customer_id)
        manager = self.managers.get(segment, self.managers["standard"])

        return await manager.start_conversation(conversation_id)

# Usage
router = CustomerRoutedManager({
    "vip": AgentPoolManager(..., max_conversations=20),
    "standard": AgentPoolManager(..., max_conversations=100)
})

Monitoring and Health Checks

Health Check Endpoint

Expose agent pool health:
from fastapi import FastAPI

app = FastAPI()
manager = AgentPoolManager(...)

@app.get("/health")
async def health_check():
    """Health check endpoint"""
    stats = manager.get_stats()

    return {
        "status": "healthy" if stats["active_count"] < 100 else "degraded",
        "active_conversations": stats["active_count"],
        "memory_mb": stats.get("memory_mb", 0),
        "capacity_remaining": 100 - stats["active_count"]
    }

@app.get("/metrics")
async def metrics():
    """Detailed metrics"""
    return {
        "conversations": manager.get_stats(),
        "uptime_seconds": time.time() - start_time
    }

Alerting

Set up alerts for capacity issues:
class MonitoredManager(AgentPoolManager):
    def __init__(self, *args, alert_threshold=80, **kwargs):
        super().__init__(*args, **kwargs)
        self.alert_threshold = alert_threshold
        self.alert_sent = False

    async def start_conversation(self, conversation_id: str):
        """Start with capacity monitoring"""
        result = await super().start_conversation(conversation_id)

        # Check capacity
        capacity_pct = (len(self.active_agents) / self.max_conversations) * 100

        if capacity_pct > self.alert_threshold and not self.alert_sent:
            await self.send_alert(
                f"Capacity at {capacity_pct:.1f}% ({len(self.active_agents)}/{self.max_conversations})"
            )
            self.alert_sent = True
        elif capacity_pct < self.alert_threshold:
            self.alert_sent = False

        return result

    async def send_alert(self, message: str):
        """Send alert (implement based on your system)"""
        print(f"🚨 ALERT: {message}")
        # Send to Slack, PagerDuty, etc.

Graceful Shutdown

Handle shutdown without dropping conversations:
class GracefulManager(AgentPoolManager):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.shutting_down = False

    async def start_conversation(self, conversation_id: str):
        """Prevent new conversations during shutdown"""
        if self.shutting_down:
            raise ServiceUnavailable("Service is shutting down")

        return await super().start_conversation(conversation_id)

    async def graceful_shutdown(self, timeout: int = 30):
        """Wait for conversations to end, then shutdown"""
        print(f"Starting graceful shutdown ({len(self.active_agents)} active)")
        self.shutting_down = True

        # Wait for conversations to end naturally
        start_time = time.time()
        while self.active_agents and (time.time() - start_time) < timeout:
            print(f"Waiting for {len(self.active_agents)} conversations...")
            await asyncio.sleep(5)

        # Force shutdown remaining
        if self.active_agents:
            print(f"Force stopping {len(self.active_agents)} conversations")
            await self.shutdown()

        print("✅ Graceful shutdown complete")

Best Practices

1. Set Appropriate Limits

# Start conservative
manager = AgentPoolManager(
    max_conversations=50,  # Limit per instance
    max_memory_mb=1000     # 1GB memory limit
)

2. Monitor Resource Usage

# Log regularly
async def monitor_loop():
    while True:
        stats = manager.get_stats()
        logger.info(f"Active: {stats['active_count']}, Memory: {stats['memory_mb']:.1f}MB")
        await asyncio.sleep(60)

3. Handle Cleanup Properly

# Always clean up ended conversations
def on_conversation_ended(self, conversation_id: str):
    # Remove from tracking
    self.conversations.pop(conversation_id, None)
    # Clear any cached data
    self.cache.clear(conversation_id)

Next Steps

State Management

Advanced state management patterns

Scaling

Scale to thousands of conversations