Documentation Index Fetch the complete documentation index at: https://docs.conversimple.com/llms.txt
Use this file to discover all available pages before exploring further.
Overview
Errors are inevitable in distributed systems. Conversimple provides comprehensive error handling mechanisms to help you build reliable voice agents that gracefully recover from failures.
Error Types
Errors originating from the Conversimple platform:
Error Type Cause Typical Action authentication_errorInvalid API key or customer ID Check credentials connection_errorWebSocket connection failed Retry with backoff rate_limit_errorToo many requests Implement rate limiting service_unavailablePlatform maintenance Retry later conversation_errorConversation state issue Reset conversation
Errors from your tool functions:
Error Type Cause Typical Action tool_execution_errorException in tool code Fix tool logic tool_timeoutTool took too long Optimize tool tool_not_foundTool doesn’t exist Check tool registration invalid_parametersWrong parameter types Validate inputs permission_deniedAuthorization failed Check permissions
AI Service Errors
Errors from STT/LLM/TTS providers:
Error Type Cause Typical Action stt_errorSpeech recognition failed Request user to repeat llm_errorLanguage model error Retry or use fallback tts_errorSpeech synthesis failed Retry or use alternative provider_timeoutAI service timed out Retry with backoff
Error Handling in Your Agent
Basic Error Handling
Implement the on_error callback:
from conversimple import ConversimpleAgent, tool
class ErrorAwareAgent ( ConversimpleAgent ):
def on_error ( self , error_type : str , message : str , details : dict ):
"""Handle all errors in one place"""
print ( f "❌ Error: { error_type } " )
print ( f " Message: { message } " )
print ( f " Details: { details } " )
# Log error
logger.error( f " { error_type } : { message } " , extra = details)
# Handle specific error types
if error_type == "authentication_error" :
self .handle_auth_error(details)
elif error_type == "tool_execution_error" :
self .handle_tool_error(details)
elif error_type == "rate_limit_error" :
self .handle_rate_limit(details)
Handle errors within tools:
@tool ( "Get customer information" )
def get_customer ( self , customer_id : str ) -> dict :
"""Tool with error handling"""
try :
# Attempt to fetch customer
customer = database.get_customer(customer_id)
if not customer:
return {
"error" : "customer_not_found" ,
"message" : f "No customer found with ID { customer_id } "
}
return {
"success" : True ,
"customer" : customer
}
except DatabaseConnectionError as e:
logger.error( f "Database connection failed: { e } " )
return {
"error" : "service_unavailable" ,
"message" : "Unable to access customer database. Please try again."
}
except Exception as e:
logger.exception( f "Unexpected error in get_customer: { e } " )
return {
"error" : "internal_error" ,
"message" : "An unexpected error occurred. Please try again."
}
For async tools:
@tool_async ( "Send email notification" )
async def send_email ( self , email : str , subject : str , body : str ) -> dict :
"""Async tool with error handling"""
try :
# Validate email format
if not self .is_valid_email(email):
return {
"error" : "invalid_email" ,
"message" : "Please provide a valid email address"
}
# Attempt to send email
result = await email_service.send(
to = email,
subject = subject,
body = body,
timeout = 5.0 # 5 second timeout
)
return {
"success" : True ,
"message_id" : result.message_id
}
except asyncio.TimeoutError:
logger.warning( f "Email send timeout for { email } " )
return {
"error" : "timeout" ,
"message" : "Email service is slow. Your email will be sent shortly."
}
except EmailServiceError as e:
logger.error( f "Email service error: { e } " )
return {
"error" : "service_error" ,
"message" : "Unable to send email at this time. Please try again later."
}
Retry Strategies
Exponential Backoff
For transient errors:
import asyncio
from typing import Optional
class RetryableAgent ( ConversimpleAgent ):
async def start_with_retry ( self , max_retries : int = 3 ):
"""Start agent with exponential backoff"""
for attempt in range (max_retries):
try :
await self .start()
print ( "✅ Connected successfully" )
return
except ConnectionError as e:
if attempt == max_retries - 1 :
print ( f "❌ Failed after { max_retries } attempts" )
raise
# Calculate backoff: 2^attempt seconds
backoff = 2 ** attempt
print ( f "⚠️ Connection failed (attempt { attempt + 1 } )" )
print ( f " Retrying in { backoff } seconds..." )
await asyncio.sleep(backoff)
Create a reusable retry decorator:
from functools import wraps
import asyncio
def retry ( max_attempts = 3 , backoff = 1.0 , exceptions = ( Exception ,)):
"""Retry decorator for tools"""
def decorator ( func ):
@wraps (func)
async def wrapper ( * args , ** kwargs ):
for attempt in range (max_attempts):
try :
if asyncio.iscoroutinefunction(func):
return await func( * args, ** kwargs)
else :
return func( * args, ** kwargs)
except exceptions as e:
if attempt == max_attempts - 1 :
raise
await asyncio.sleep(backoff * ( 2 ** attempt))
return None
return wrapper
return decorator
# Use the decorator
@tool_async ( "Fetch data from API" )
@retry ( max_attempts = 3 , backoff = 0.5 , exceptions = (requests.RequestException,))
async def fetch_data ( self , endpoint : str ) -> dict :
"""Tool with automatic retry"""
response = await http_client.get(endpoint)
return response.json()
Circuit Breaker Pattern
Prevent cascading failures:
from datetime import datetime, timedelta
class CircuitBreaker :
def __init__ ( self , failure_threshold = 5 , timeout = 60 ):
self .failure_threshold = failure_threshold
self .timeout = timeout
self .failures = 0
self .last_failure_time = None
self .state = "closed" # closed, open, half_open
def call ( self , func , * args , ** kwargs ):
"""Execute function through circuit breaker"""
if self .state == "open" :
if self ._should_attempt_reset():
self .state = "half_open"
else :
raise CircuitBreakerOpen( "Service unavailable" )
try :
result = func( * args, ** kwargs)
self ._on_success()
return result
except Exception as e:
self ._on_failure()
raise
def _on_success ( self ):
"""Reset circuit breaker on success"""
self .failures = 0
self .state = "closed"
def _on_failure ( self ):
"""Record failure"""
self .failures += 1
self .last_failure_time = datetime.now()
if self .failures >= self .failure_threshold:
self .state = "open"
print ( f "🔴 Circuit breaker opened after { self .failures } failures" )
def _should_attempt_reset ( self ):
"""Check if we should try again"""
return (datetime.now() - self .last_failure_time).seconds >= self .timeout
# Use circuit breaker
class ResilientAgent ( ConversimpleAgent ):
def __init__ ( self , ** kwargs ):
super (). __init__ ( ** kwargs)
self .payment_circuit = CircuitBreaker( failure_threshold = 3 , timeout = 30 )
@tool ( "Process payment" )
def process_payment ( self , amount : float , card : str ) -> dict :
"""Tool protected by circuit breaker"""
try :
return self .payment_circuit.call(
payment_service.charge,
amount = amount,
card = card
)
except CircuitBreakerOpen:
return {
"error" : "service_unavailable" ,
"message" : "Payment service is temporarily unavailable"
}
Fallback Strategies
Default Responses
Provide fallback responses when tools fail:
@tool ( "Get weather" )
def get_weather ( self , location : str ) -> dict :
"""Weather tool with fallback"""
try :
weather = weather_api.get_current(location)
return {
"temperature" : weather.temp,
"condition" : weather.condition
}
except WeatherAPIError:
# Fallback to cached data
cached = cache.get( f "weather: { location } " )
if cached:
return {
** cached,
"cached" : True ,
"message" : "Showing recent weather data"
}
# Ultimate fallback
return {
"error" : "unavailable" ,
"message" : "Weather information is currently unavailable"
}
Graceful Degradation
Reduce functionality instead of failing completely:
@tool ( "Search products" )
def search_products ( self , query : str , limit : int = 10 ) -> dict :
"""Search with graceful degradation"""
try :
# Try full-text search
results = search_engine.search(query, limit = limit)
return { "results" : results, "method" : "search" }
except SearchEngineError:
try :
# Fallback to database LIKE query
results = database.query(
f "SELECT * FROM products WHERE name LIKE '% { query } %' LIMIT { limit } "
)
return { "results" : results, "method" : "database" , "limited" : True }
except DatabaseError:
# Ultimate fallback: return popular products
popular = cache.get( "popular_products" )
return {
"results" : popular[:limit],
"method" : "popular" ,
"message" : "Showing popular products"
}
Logging and Monitoring
Structured Logging
Use structured logging for better debugging:
import logging
import json
class StructuredLogger :
def __init__ ( self , name ):
self .logger = logging.getLogger(name)
def log_error ( self , error_type , message , ** extra ):
"""Log errors with structured data"""
log_data = {
"error_type" : error_type,
"message" : message,
"timestamp" : datetime.now().isoformat(),
** extra
}
self .logger.error(json.dumps(log_data))
class MonitoredAgent ( ConversimpleAgent ):
def __init__ ( self , ** kwargs ):
super (). __init__ ( ** kwargs)
self .logger = StructuredLogger( "agent" )
def on_error ( self , error_type : str , message : str , details : dict ):
"""Log all errors with context"""
self .logger.log_error(
error_type = error_type,
message = message,
conversation_id = details.get( "conversation_id" ),
tool_name = details.get( "tool_name" ),
user_id = details.get( "user_id" )
)
Error Metrics
Track error rates and patterns:
from collections import defaultdict
from datetime import datetime, timedelta
class MetricsAgent ( ConversimpleAgent ):
def __init__ ( self , ** kwargs ):
super (). __init__ ( ** kwargs)
self .error_counts = defaultdict( int )
self .error_history = []
def on_error ( self , error_type : str , message : str , details : dict ):
"""Track error metrics"""
# Count by type
self .error_counts[error_type] += 1
# Record with timestamp
self .error_history.append({
"type" : error_type,
"message" : message,
"timestamp" : datetime.now(),
"details" : details
})
# Alert on high error rate
recent_errors = [
e for e in self .error_history
if e[ "timestamp" ] > datetime.now() - timedelta( minutes = 5 )
]
if len (recent_errors) > 10 :
self .send_alert( f "High error rate: { len (recent_errors) } errors in 5 minutes" )
def get_error_report ( self ) -> dict :
"""Generate error report"""
return {
"total_errors" : sum ( self .error_counts.values()),
"by_type" : dict ( self .error_counts),
"recent_errors" : self .error_history[ - 10 :]
}
User-Friendly Error Messages
Return Helpful Messages
Provide clear, actionable error messages to users:
def format_error_for_user ( error_type : str , details : dict ) -> str :
"""Convert technical errors to user-friendly messages"""
error_messages = {
"customer_not_found" : "I couldn't find that customer in our system. Could you verify the customer ID?" ,
"payment_failed" : "There was an issue processing the payment. Please check the payment details and try again." ,
"service_unavailable" : "I'm having trouble connecting to our systems right now. Please try again in a moment." ,
"invalid_input" : "I didn't quite understand that. Could you provide that information again?" ,
"timeout" : "This is taking longer than expected. Let me try that again." ,
}
return error_messages.get(error_type, "Something went wrong. Let me help you with something else." )
@tool ( "Book appointment" )
def book_appointment ( self , date : str , time : str ) -> dict :
"""Booking with user-friendly errors"""
try :
appointment = booking_system.create(date, time)
return { "success" : True , "appointment_id" : appointment.id}
except SlotUnavailableError:
return {
"error" : "slot_unavailable" ,
"message" : "That time slot is no longer available. Would you like to see other available times?"
}
except InvalidDateError:
return {
"error" : "invalid_date" ,
"message" : "I couldn't understand that date. Could you provide it in MM/DD/YYYY format?"
}
Next Steps
Logging & Monitoring Set up comprehensive logging
Debugging Advanced debugging techniques