Skip to main content
This example demonstrates a more complex agent with multiple tools for customer service, including both synchronous and asynchronous operations.

Code

import asyncio
import aiofiles
import aiohttp
import json
import logging
from typing import Dict, List, Optional
from datetime import datetime, timedelta

from conversimple import ConversimpleAgent, tool, tool_async

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class CustomerServiceAgent(ConversimpleAgent):
    """
    Advanced customer service agent with multiple business tools.
    
    Demonstrates:
    - Multiple sync and async tools
    - State management across tool calls  
    - External API integration
    - File operations
    - Error handling and recovery
    - Complex business logic
    """

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        
        # Agent state management
        self.customer_sessions: Dict[str, Dict] = {}
        self.active_tickets: Dict[str, Dict] = {}
        
        # Mock customer database
        self.customer_db = {
            "cust_123": {
                "name": "John Doe",
                "email": "john@example.com",
                "phone": "+1-555-0123", 
                "account_type": "premium",
                "balance": 1250.50,
                "recent_orders": ["ORD-001", "ORD-002"]
            },
            "cust_456": {
                "name": "Jane Smith",
                "email": "jane@example.com",
                "phone": "+1-555-0456",
                "account_type": "standard", 
                "balance": 45.75,
                "recent_orders": ["ORD-003"]
            }
        }

    @tool("Look up customer information by ID or email")
    def lookup_customer(self, identifier: str) -> Dict:
        """
        Look up customer information by customer ID or email.
        
        Args:
            identifier: Customer ID or email address
            
        Returns:
            Customer information dictionary
        """
        logger.info(f"Looking up customer: {identifier}")
        
        # Search by customer ID first
        if identifier in self.customer_db:
            customer = self.customer_db[identifier].copy()
            customer["customer_id"] = identifier
            return customer
            
        # Search by email
        for cust_id, customer in self.customer_db.items():
            if customer.get("email") == identifier:
                result = customer.copy()
                result["customer_id"] = cust_id
                return result
                
        return {"error": "Customer not found", "searched_for": identifier}

    @tool("Get customer account balance and recent transactions")
    def get_account_balance(self, customer_id: str) -> Dict:
        """
        Get customer account balance and recent transaction history.
        
        Args:
            customer_id: Customer identifier
            
        Returns:
            Account balance and transaction information
        """
        logger.info(f"Getting account balance for: {customer_id}")
        
        customer = self.customer_db.get(customer_id)
        if not customer:
            return {"error": "Customer not found"}
            
        # Simulate transaction history
        transactions = [
            {"date": "2025-01-20", "amount": -50.00, "description": "Online purchase"},
            {"date": "2025-01-18", "amount": 100.00, "description": "Account credit"},
            {"date": "2025-01-15", "amount": -25.50, "description": "Subscription fee"}
        ]
        
        return {
            "customer_id": customer_id,
            "current_balance": customer["balance"],
            "account_type": customer["account_type"],
            "recent_transactions": transactions
        }

    @tool_async("Create a support ticket for the customer")
    async def create_support_ticket(
        self, 
        customer_id: str, 
        issue_type: str, 
        description: str,
        priority: str = "normal"
    ) -> Dict:
        """
        Create a new support ticket for the customer.
        
        Args:
            customer_id: Customer identifier
            issue_type: Type of issue (billing, technical, general)
            description: Detailed description of the issue
            priority: Priority level (low, normal, high, urgent)
            
        Returns:
            Created ticket information
        """
        logger.info(f"Creating support ticket for customer: {customer_id}")
        
        # Simulate async ticket creation
        await asyncio.sleep(0.5)  # Simulate API delay
        
        ticket_id = f"TKT-{datetime.now().strftime('%Y%m%d%H%M%S')}"
        
        ticket = {
            "ticket_id": ticket_id,
            "customer_id": customer_id,
            "issue_type": issue_type,
            "description": description,
            "priority": priority,
            "status": "open",
            "created_at": datetime.now().isoformat(),
            "estimated_resolution": (datetime.now() + timedelta(hours=24)).isoformat()
        }
        
        # Store in active tickets
        self.active_tickets[ticket_id] = ticket
        
        return {
            "success": True,
            "ticket": ticket,
            "message": f"Support ticket {ticket_id} created successfully"
        }

    @tool_async("Send email notification to customer")
    async def send_email_notification(
        self,
        customer_id: str,
        subject: str, 
        message: str,
        email_type: str = "general"
    ) -> Dict:
        """
        Send email notification to customer.
        
        Args:
            customer_id: Customer identifier
            subject: Email subject line
            message: Email message content
            email_type: Type of email (general, billing, technical)
            
        Returns:
            Email sending result
        """
        logger.info(f"Sending email to customer: {customer_id}")
        
        customer = self.customer_db.get(customer_id)
        if not customer:
            return {"success": False, "error": "Customer not found"}
            
        # Simulate async email sending
        await asyncio.sleep(0.3)
        
        email_id = f"EMAIL-{datetime.now().strftime('%Y%m%d%H%M%S')}"
        
        # In production, this would integrate with actual email service
        email_result = {
            "success": True,
            "email_id": email_id,
            "recipient": customer["email"],
            "subject": subject,
            "sent_at": datetime.now().isoformat(),
            "delivery_status": "sent"
        }
        
        return email_result

    @tool("Process refund request for customer")  
    def process_refund(
        self,
        customer_id: str,
        order_id: str,
        amount: float,
        reason: str
    ) -> Dict:
        """
        Process refund request for a customer order.
        
        Args:
            customer_id: Customer identifier
            order_id: Order ID to refund
            amount: Refund amount
            reason: Reason for refund
            
        Returns:
            Refund processing result
        """
        logger.info(f"Processing refund for customer {customer_id}, order {order_id}")
        
        customer = self.customer_db.get(customer_id)
        if not customer:
            return {"success": False, "error": "Customer not found"}
            
        # Validate order belongs to customer
        if order_id not in customer["recent_orders"]:
            return {"success": False, "error": "Order not found for this customer"}
            
        # Process refund (simulate business logic)
        refund_id = f"REF-{datetime.now().strftime('%Y%m%d%H%M%S')}"
        
        # Update customer balance
        customer["balance"] += amount
        
        refund_result = {
            "success": True,
            "refund_id": refund_id,
            "customer_id": customer_id,
            "order_id": order_id,
            "amount": amount,
            "reason": reason,
            "processed_at": datetime.now().isoformat(),
            "new_balance": customer["balance"],
            "estimated_arrival": "3-5 business days"
        }
        
        return refund_result

    @tool_async("Get real-time order status from external system")
    async def get_order_status(self, order_id: str) -> Dict:
        """
        Get real-time order status from external order management system.
        
        Args:
            order_id: Order identifier
            
        Returns:
            Current order status and tracking information
        """
        logger.info(f"Fetching order status: {order_id}")
        
        # Simulate external API call
        await asyncio.sleep(0.4)
        
        # Mock order status data
        order_statuses = {
            "ORD-001": {
                "status": "delivered",
                "tracking_number": "1Z999AA1234567890",
                "estimated_delivery": "2025-01-25",
                "current_location": "Customer delivered"
            },
            "ORD-002": {
                "status": "in_transit", 
                "tracking_number": "1Z999AA1234567891",
                "estimated_delivery": "2025-01-28",
                "current_location": "Distribution center - Chicago, IL"
            },
            "ORD-003": {
                "status": "processing",
                "tracking_number": None,
                "estimated_delivery": "2025-01-30", 
                "current_location": "Fulfillment center"
            }
        }
        
        if order_id in order_statuses:
            return {
                "order_id": order_id,
                "found": True,
                **order_statuses[order_id]
            }
        else:
            return {
                "order_id": order_id,
                "found": False,
                "error": "Order not found in system"
            }

    @tool_async("Save conversation summary to customer file")
    async def save_conversation_summary(
        self,
        customer_id: str,
        summary: str,
        resolution: str = None
    ) -> Dict:
        """
        Save conversation summary to customer's file.
        
        Args:
            customer_id: Customer identifier
            summary: Conversation summary
            resolution: Resolution details if applicable
            
        Returns:
            File save result
        """
        logger.info(f"Saving conversation summary for customer: {customer_id}")
        
        # Create conversation record
        conversation_record = {
            "customer_id": customer_id,
            "timestamp": datetime.now().isoformat(),
            "agent": "customer_service",
            "summary": summary,
            "resolution": resolution,
            "conversation_id": self.conversation_id
        }
        
        # Save to file (simulate customer records system)
        filename = f"customer_records_{customer_id}_{datetime.now().strftime('%Y%m%d')}.json"
        
        try:
            async with aiofiles.open(filename, 'a') as f:
                await f.write(json.dumps(conversation_record) + '\n')
                
            return {
                "success": True,
                "filename": filename,
                "saved_at": datetime.now().isoformat()
            }
            
        except Exception as e:
            logger.error(f"Failed to save conversation summary: {e}")
            return {
                "success": False,
                "error": str(e)
            }

    # Event handlers
    def on_conversation_started(self, conversation_id: str) -> None:
        """Handle conversation started events."""
        logger.info(f"🎧 Customer service agent ready: {conversation_id}")
        print(f"Customer service agent is ready to help!")
        print("Available services: account lookup, billing, refunds, order tracking")

    def on_conversation_ended(self, conversation_id: str) -> None:
        """Handle conversation ended events."""
        logger.info(f"Customer service conversation ended: {conversation_id}")
        print("Thank you for contacting customer service. Have a great day!")

    def on_tool_called(self, tool_call) -> None:
        """Handle tool call events."""
        logger.info(f"🔧 Executing customer service tool: {tool_call.tool_name}")

    def on_error(self, error_type: str, error_message: str, details: Dict) -> None:
        """Handle error events."""
        logger.error(f"❌ Customer service error ({error_type}): {error_message}")
        print(f"I apologize, but I encountered an error: {error_message}")


async def main():
    """Main function for customer service agent."""
    print("🎧 Starting Customer Service Agent Example")
    print("=" * 50)
    
    # Configuration
    import os
    api_key = os.getenv("CONVERSIMPLE_API_KEY", "demo-service-key-456")
    customer_id = os.getenv("CONVERSIMPLE_CUSTOMER_ID", "service-demo-customer")
    platform_url = os.getenv("CONVERSIMPLE_PLATFORM_URL", "ws://localhost:4000/sdk/websocket")
    
    print(f"Customer ID: {customer_id}")
    print(f"Platform URL: {platform_url}")
    print()

    # Create customer service agent
    agent = CustomerServiceAgent(
        api_key=api_key,
        customer_id=customer_id,
        platform_url=platform_url
    )

    try:
        # Start the agent
        print("🔗 Connecting to platform...")
        await agent.start()
        
        print("✅ Customer service agent connected!")
        print("🎯 Available tools:")
        for tool in agent.registered_tools:
            print(f"  - {tool['name']}: {tool['description']}")
        print()
        
        print("🎤 Agent is ready for customer service calls...")
        print("💡 Try asking about account balance, order status, or refund requests!")
        print()
        print("Press Ctrl+C to stop")
        
        # Keep running
        while True:
            await asyncio.sleep(1)
            
    except KeyboardInterrupt:
        print("\n🛑 Stopping customer service agent...")
        
    except Exception as e:
        print(f"❌ Error: {e}")
        logger.error(f"Agent error: {e}")
        
    finally:
        try:
            await agent.stop()
            print("✅ Customer service agent stopped")
        except Exception as e:
            logger.error(f"Error stopping agent: {e}")


if __name__ == "__main__":
    asyncio.run(main())