This example demonstrates a more complex agent with multiple tools for customer service, including both synchronous and asynchronous operations.Documentation Index
Fetch the complete documentation index at: https://docs.conversimple.com/llms.txt
Use this file to discover all available pages before exploring further.
Code
import asyncio
import aiofiles
import aiohttp
import json
import logging
from typing import Dict, List, Optional
from datetime import datetime, timedelta
from conversimple import ConversimpleAgent, tool, tool_async
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class CustomerServiceAgent(ConversimpleAgent):
"""
Advanced customer service agent with multiple business tools.
Demonstrates:
- Multiple sync and async tools
- State management across tool calls
- External API integration
- File operations
- Error handling and recovery
- Complex business logic
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Agent state management
self.customer_sessions: Dict[str, Dict] = {}
self.active_tickets: Dict[str, Dict] = {}
# Mock customer database
self.customer_db = {
"cust_123": {
"name": "John Doe",
"email": "john@example.com",
"phone": "+1-555-0123",
"account_type": "premium",
"balance": 1250.50,
"recent_orders": ["ORD-001", "ORD-002"]
},
"cust_456": {
"name": "Jane Smith",
"email": "jane@example.com",
"phone": "+1-555-0456",
"account_type": "standard",
"balance": 45.75,
"recent_orders": ["ORD-003"]
}
}
@tool("Look up customer information by ID or email")
def lookup_customer(self, identifier: str) -> Dict:
"""
Look up customer information by customer ID or email.
Args:
identifier: Customer ID or email address
Returns:
Customer information dictionary
"""
logger.info(f"Looking up customer: {identifier}")
# Search by customer ID first
if identifier in self.customer_db:
customer = self.customer_db[identifier].copy()
customer["customer_id"] = identifier
return customer
# Search by email
for cust_id, customer in self.customer_db.items():
if customer.get("email") == identifier:
result = customer.copy()
result["customer_id"] = cust_id
return result
return {"error": "Customer not found", "searched_for": identifier}
@tool("Get customer account balance and recent transactions")
def get_account_balance(self, customer_id: str) -> Dict:
"""
Get customer account balance and recent transaction history.
Args:
customer_id: Customer identifier
Returns:
Account balance and transaction information
"""
logger.info(f"Getting account balance for: {customer_id}")
customer = self.customer_db.get(customer_id)
if not customer:
return {"error": "Customer not found"}
# Simulate transaction history
transactions = [
{"date": "2025-01-20", "amount": -50.00, "description": "Online purchase"},
{"date": "2025-01-18", "amount": 100.00, "description": "Account credit"},
{"date": "2025-01-15", "amount": -25.50, "description": "Subscription fee"}
]
return {
"customer_id": customer_id,
"current_balance": customer["balance"],
"account_type": customer["account_type"],
"recent_transactions": transactions
}
@tool_async("Create a support ticket for the customer")
async def create_support_ticket(
self,
customer_id: str,
issue_type: str,
description: str,
priority: str = "normal"
) -> Dict:
"""
Create a new support ticket for the customer.
Args:
customer_id: Customer identifier
issue_type: Type of issue (billing, technical, general)
description: Detailed description of the issue
priority: Priority level (low, normal, high, urgent)
Returns:
Created ticket information
"""
logger.info(f"Creating support ticket for customer: {customer_id}")
# Simulate async ticket creation
await asyncio.sleep(0.5) # Simulate API delay
ticket_id = f"TKT-{datetime.now().strftime('%Y%m%d%H%M%S')}"
ticket = {
"ticket_id": ticket_id,
"customer_id": customer_id,
"issue_type": issue_type,
"description": description,
"priority": priority,
"status": "open",
"created_at": datetime.now().isoformat(),
"estimated_resolution": (datetime.now() + timedelta(hours=24)).isoformat()
}
# Store in active tickets
self.active_tickets[ticket_id] = ticket
return {
"success": True,
"ticket": ticket,
"message": f"Support ticket {ticket_id} created successfully"
}
@tool_async("Send email notification to customer")
async def send_email_notification(
self,
customer_id: str,
subject: str,
message: str,
email_type: str = "general"
) -> Dict:
"""
Send email notification to customer.
Args:
customer_id: Customer identifier
subject: Email subject line
message: Email message content
email_type: Type of email (general, billing, technical)
Returns:
Email sending result
"""
logger.info(f"Sending email to customer: {customer_id}")
customer = self.customer_db.get(customer_id)
if not customer:
return {"success": False, "error": "Customer not found"}
# Simulate async email sending
await asyncio.sleep(0.3)
email_id = f"EMAIL-{datetime.now().strftime('%Y%m%d%H%M%S')}"
# In production, this would integrate with actual email service
email_result = {
"success": True,
"email_id": email_id,
"recipient": customer["email"],
"subject": subject,
"sent_at": datetime.now().isoformat(),
"delivery_status": "sent"
}
return email_result
@tool("Process refund request for customer")
def process_refund(
self,
customer_id: str,
order_id: str,
amount: float,
reason: str
) -> Dict:
"""
Process refund request for a customer order.
Args:
customer_id: Customer identifier
order_id: Order ID to refund
amount: Refund amount
reason: Reason for refund
Returns:
Refund processing result
"""
logger.info(f"Processing refund for customer {customer_id}, order {order_id}")
customer = self.customer_db.get(customer_id)
if not customer:
return {"success": False, "error": "Customer not found"}
# Validate order belongs to customer
if order_id not in customer["recent_orders"]:
return {"success": False, "error": "Order not found for this customer"}
# Process refund (simulate business logic)
refund_id = f"REF-{datetime.now().strftime('%Y%m%d%H%M%S')}"
# Update customer balance
customer["balance"] += amount
refund_result = {
"success": True,
"refund_id": refund_id,
"customer_id": customer_id,
"order_id": order_id,
"amount": amount,
"reason": reason,
"processed_at": datetime.now().isoformat(),
"new_balance": customer["balance"],
"estimated_arrival": "3-5 business days"
}
return refund_result
@tool_async("Get real-time order status from external system")
async def get_order_status(self, order_id: str) -> Dict:
"""
Get real-time order status from external order management system.
Args:
order_id: Order identifier
Returns:
Current order status and tracking information
"""
logger.info(f"Fetching order status: {order_id}")
# Simulate external API call
await asyncio.sleep(0.4)
# Mock order status data
order_statuses = {
"ORD-001": {
"status": "delivered",
"tracking_number": "1Z999AA1234567890",
"estimated_delivery": "2025-01-25",
"current_location": "Customer delivered"
},
"ORD-002": {
"status": "in_transit",
"tracking_number": "1Z999AA1234567891",
"estimated_delivery": "2025-01-28",
"current_location": "Distribution center - Chicago, IL"
},
"ORD-003": {
"status": "processing",
"tracking_number": None,
"estimated_delivery": "2025-01-30",
"current_location": "Fulfillment center"
}
}
if order_id in order_statuses:
return {
"order_id": order_id,
"found": True,
**order_statuses[order_id]
}
else:
return {
"order_id": order_id,
"found": False,
"error": "Order not found in system"
}
@tool_async("Save conversation summary to customer file")
async def save_conversation_summary(
self,
customer_id: str,
summary: str,
resolution: str = None
) -> Dict:
"""
Save conversation summary to customer's file.
Args:
customer_id: Customer identifier
summary: Conversation summary
resolution: Resolution details if applicable
Returns:
File save result
"""
logger.info(f"Saving conversation summary for customer: {customer_id}")
# Create conversation record
conversation_record = {
"customer_id": customer_id,
"timestamp": datetime.now().isoformat(),
"agent": "customer_service",
"summary": summary,
"resolution": resolution,
"conversation_id": self.conversation_id
}
# Save to file (simulate customer records system)
filename = f"customer_records_{customer_id}_{datetime.now().strftime('%Y%m%d')}.json"
try:
async with aiofiles.open(filename, 'a') as f:
await f.write(json.dumps(conversation_record) + '\n')
return {
"success": True,
"filename": filename,
"saved_at": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"Failed to save conversation summary: {e}")
return {
"success": False,
"error": str(e)
}
# Event handlers
def on_conversation_started(self, conversation_id: str) -> None:
"""Handle conversation started events."""
logger.info(f"🎧 Customer service agent ready: {conversation_id}")
print(f"Customer service agent is ready to help!")
print("Available services: account lookup, billing, refunds, order tracking")
def on_conversation_ended(self, conversation_id: str) -> None:
"""Handle conversation ended events."""
logger.info(f"Customer service conversation ended: {conversation_id}")
print("Thank you for contacting customer service. Have a great day!")
def on_tool_called(self, tool_call) -> None:
"""Handle tool call events."""
logger.info(f"🔧 Executing customer service tool: {tool_call.tool_name}")
def on_error(self, error_type: str, error_message: str, details: Dict) -> None:
"""Handle error events."""
logger.error(f"❌ Customer service error ({error_type}): {error_message}")
print(f"I apologize, but I encountered an error: {error_message}")
async def main():
"""Main function for customer service agent."""
print("🎧 Starting Customer Service Agent Example")
print("=" * 50)
# Configuration
import os
api_key = os.getenv("CONVERSIMPLE_API_KEY", "demo-service-key-456")
customer_id = os.getenv("CONVERSIMPLE_CUSTOMER_ID", "service-demo-customer")
platform_url = os.getenv("CONVERSIMPLE_PLATFORM_URL", "ws://localhost:4000/sdk/websocket")
print(f"Customer ID: {customer_id}")
print(f"Platform URL: {platform_url}")
print()
# Create customer service agent
agent = CustomerServiceAgent(
api_key=api_key,
customer_id=customer_id,
platform_url=platform_url
)
try:
# Start the agent
print("🔗 Connecting to platform...")
await agent.start()
print("✅ Customer service agent connected!")
print("🎯 Available tools:")
for tool in agent.registered_tools:
print(f" - {tool['name']}: {tool['description']}")
print()
print("🎤 Agent is ready for customer service calls...")
print("💡 Try asking about account balance, order status, or refund requests!")
print()
print("Press Ctrl+C to stop")
# Keep running
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
print("\n🛑 Stopping customer service agent...")
except Exception as e:
print(f"❌ Error: {e}")
logger.error(f"Agent error: {e}")
finally:
try:
await agent.stop()
print("✅ Customer service agent stopped")
except Exception as e:
logger.error(f"Error stopping agent: {e}")
if __name__ == "__main__":
asyncio.run(main())