Overview
Proper logging and monitoring are essential for operating reliable voice agents in production. Conversimple provides built-in logging capabilities and integrates with popular monitoring tools.Logging Basics
Configure Logging Level
Set the logging level for your agent:Copy
import logging
from conversimple import ConversimpleAgent
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Set SDK log level
logging.getLogger('conversimple').setLevel(logging.DEBUG)
# Your agent
class MyAgent(ConversimpleAgent):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.logger = logging.getLogger(__name__)
Log Levels
| Level | When to Use | Example |
|---|---|---|
DEBUG | Detailed diagnostic information | Tool parameter values, state changes |
INFO | General informational messages | Conversation started/ended, tool calls |
WARNING | Unexpected but handled situations | Slow tool execution, retry attempts |
ERROR | Error events | Tool failures, service errors |
CRITICAL | Serious errors requiring immediate attention | Authentication failures, system crashes |
Basic Logging Example
Copy
from conversimple import ConversimpleAgent, tool
import logging
logger = logging.getLogger(__name__)
class LoggingAgent(ConversimpleAgent):
def on_conversation_started(self, conversation_id: str):
"""Log conversation start"""
logger.info(f"Conversation started: {conversation_id}")
@tool("Get customer info")
def get_customer(self, customer_id: str) -> dict:
"""Tool with logging"""
logger.debug(f"Looking up customer: {customer_id}")
try:
customer = database.get_customer(customer_id)
logger.info(f"Customer found: {customer_id}")
return customer
except Exception as e:
logger.error(f"Failed to get customer {customer_id}: {e}")
raise
def on_conversation_ended(self, conversation_id: str):
"""Log conversation end"""
logger.info(f"Conversation ended: {conversation_id}")
Structured Logging
JSON Logging
Use structured logs for better searchability:Copy
import json
import logging
from datetime import datetime
class JsonFormatter(logging.Formatter):
"""JSON log formatter"""
def format(self, record):
log_data = {
"timestamp": datetime.utcnow().isoformat(),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"function": record.funcName,
"line": record.lineno,
}
# Add extra fields
if hasattr(record, 'conversation_id'):
log_data['conversation_id'] = record.conversation_id
if hasattr(record, 'tool_name'):
log_data['tool_name'] = record.tool_name
if hasattr(record, 'duration_ms'):
log_data['duration_ms'] = record.duration_ms
return json.dumps(log_data)
# Configure handler with JSON formatter
handler = logging.StreamHandler()
handler.setFormatter(JsonFormatter())
logger = logging.getLogger('agent')
logger.addHandler(handler)
logger.setLevel(logging.INFO)
Contextual Logging
Add context to your logs:Copy
class ContextualAgent(ConversimpleAgent):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.logger = logging.getLogger(__name__)
def on_conversation_started(self, conversation_id: str):
"""Log with context"""
self.logger.info(
"Conversation started",
extra={
"conversation_id": conversation_id,
"agent_id": self.agent_id,
"timestamp": datetime.now().isoformat()
}
)
@tool("Process order")
def process_order(self, order_id: str, amount: float) -> dict:
"""Tool with contextual logging"""
start_time = time.time()
self.logger.info(
"Processing order",
extra={
"tool_name": "process_order",
"order_id": order_id,
"amount": amount
}
)
try:
result = payment_service.charge(order_id, amount)
duration_ms = (time.time() - start_time) * 1000
self.logger.info(
"Order processed successfully",
extra={
"tool_name": "process_order",
"order_id": order_id,
"duration_ms": duration_ms,
"transaction_id": result.transaction_id
}
)
return {"success": True, "transaction_id": result.transaction_id}
except Exception as e:
self.logger.error(
"Order processing failed",
extra={
"tool_name": "process_order",
"order_id": order_id,
"error": str(e)
},
exc_info=True # Include stack trace
)
raise
Metrics and Monitoring
Track Key Metrics
Monitor important agent metrics:Copy
from collections import defaultdict
from datetime import datetime, timedelta
import time
class MetricsAgent(ConversimpleAgent):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.metrics = {
"conversations_started": 0,
"conversations_ended": 0,
"tool_calls": defaultdict(int),
"tool_durations": defaultdict(list),
"errors": defaultdict(int)
}
def on_conversation_started(self, conversation_id: str):
"""Track conversation starts"""
self.metrics["conversations_started"] += 1
def on_conversation_ended(self, conversation_id: str):
"""Track conversation ends"""
self.metrics["conversations_ended"] += 1
def on_tool_called(self, tool_call):
"""Track tool calls"""
self.metrics["tool_calls"][tool_call.tool_name] += 1
# Store start time for duration tracking
tool_call._start_time = time.time()
def on_tool_completed(self, call_id: str, result):
"""Track tool completion and duration"""
if hasattr(result, '_start_time'):
duration = time.time() - result._start_time
self.metrics["tool_durations"][result.tool_name].append(duration)
def on_error(self, error_type: str, message: str, details: dict):
"""Track errors"""
self.metrics["errors"][error_type] += 1
def get_metrics_summary(self) -> dict:
"""Get metrics summary"""
return {
"conversations": {
"started": self.metrics["conversations_started"],
"ended": self.metrics["conversations_ended"],
"active": self.metrics["conversations_started"] - self.metrics["conversations_ended"]
},
"tool_calls": dict(self.metrics["tool_calls"]),
"tool_avg_duration": {
name: sum(durations) / len(durations)
for name, durations in self.metrics["tool_durations"].items()
if durations
},
"errors": dict(self.metrics["errors"])
}
Periodic Metrics Reporting
Report metrics at regular intervals:Copy
import asyncio
class ReportingAgent(ConversimpleAgent):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.metrics_task = None
async def start(self):
"""Start with metrics reporting"""
await super().start()
# Start metrics reporting task
self.metrics_task = asyncio.create_task(self.report_metrics())
async def report_metrics(self):
"""Report metrics every minute"""
while True:
await asyncio.sleep(60) # Report every minute
metrics = self.get_metrics_summary()
logger.info(
"Metrics report",
extra={"metrics": metrics}
)
# Send to monitoring service
await self.send_to_monitoring(metrics)
async def stop(self):
"""Stop and cancel metrics task"""
if self.metrics_task:
self.metrics_task.cancel()
await super().stop()
Integration with Monitoring Services
Datadog Integration
Copy
from datadog import initialize, statsd
# Initialize Datadog
initialize(
api_key=os.getenv('DATADOG_API_KEY'),
app_key=os.getenv('DATADOG_APP_KEY')
)
class DatadogAgent(ConversimpleAgent):
def on_conversation_started(self, conversation_id: str):
"""Track conversation starts"""
statsd.increment('agent.conversations.started', tags=['agent:voice'])
@tool("Get customer")
def get_customer(self, customer_id: str) -> dict:
"""Tool with Datadog metrics"""
start_time = time.time()
try:
customer = database.get_customer(customer_id)
# Record duration
duration = (time.time() - start_time) * 1000
statsd.histogram('agent.tool.duration', duration, tags=['tool:get_customer'])
# Count success
statsd.increment('agent.tool.success', tags=['tool:get_customer'])
return customer
except Exception as e:
# Count error
statsd.increment('agent.tool.error', tags=['tool:get_customer', f'error:{type(e).__name__}'])
raise
Prometheus Integration
Copy
from prometheus_client import Counter, Histogram, Gauge, start_http_server
# Define metrics
conversations_total = Counter('agent_conversations_total', 'Total conversations')
tool_calls_total = Counter('agent_tool_calls_total', 'Total tool calls', ['tool_name'])
tool_duration = Histogram('agent_tool_duration_seconds', 'Tool execution duration', ['tool_name'])
active_conversations = Gauge('agent_active_conversations', 'Currently active conversations')
errors_total = Counter('agent_errors_total', 'Total errors', ['error_type'])
class PrometheusAgent(ConversimpleAgent):
def __init__(self, **kwargs):
super().__init__(**kwargs)
# Start Prometheus HTTP server on port 8000
start_http_server(8000)
def on_conversation_started(self, conversation_id: str):
"""Track with Prometheus"""
conversations_total.inc()
active_conversations.inc()
def on_conversation_ended(self, conversation_id: str):
"""Update active conversations"""
active_conversations.dec()
@tool("Search products")
def search_products(self, query: str) -> dict:
"""Tool with Prometheus metrics"""
tool_calls_total.labels(tool_name='search_products').inc()
with tool_duration.labels(tool_name='search_products').time():
return search_engine.search(query)
def on_error(self, error_type: str, message: str, details: dict):
"""Track errors"""
errors_total.labels(error_type=error_type).inc()
CloudWatch Integration
Copy
import boto3
from datetime import datetime
cloudwatch = boto3.client('cloudwatch')
class CloudWatchAgent(ConversimpleAgent):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.namespace = 'Conversimple/Agent'
def put_metric(self, metric_name: str, value: float, unit: str = 'Count', **dimensions):
"""Put metric to CloudWatch"""
cloudwatch.put_metric_data(
Namespace=self.namespace,
MetricData=[{
'MetricName': metric_name,
'Value': value,
'Unit': unit,
'Timestamp': datetime.utcnow(),
'Dimensions': [
{'Name': k, 'Value': v}
for k, v in dimensions.items()
]
}]
)
def on_conversation_started(self, conversation_id: str):
"""Track conversation starts"""
self.put_metric('ConversationsStarted', 1, agent_id=self.agent_id)
@tool("Process payment")
def process_payment(self, amount: float) -> dict:
"""Tool with CloudWatch metrics"""
start_time = time.time()
try:
result = payment_service.charge(amount)
# Record duration
duration = (time.time() - start_time) * 1000
self.put_metric('ToolDuration', duration, unit='Milliseconds', tool='process_payment')
# Record amount
self.put_metric('PaymentAmount', amount, unit='None', tool='process_payment')
return result
except Exception as e:
self.put_metric('ToolError', 1, tool='process_payment', error=type(e).__name__)
raise
Log Aggregation
ELK Stack (Elasticsearch, Logstash, Kibana)
Copy
import logging
from logging.handlers import SocketHandler
class ELKAgent(ConversimpleAgent):
def __init__(self, **kwargs):
super().__init__(**kwargs)
# Set up Logstash handler
handler = SocketHandler('logstash-server', 5000)
handler.setFormatter(JsonFormatter())
logger = logging.getLogger('agent')
logger.addHandler(handler)
logger.setLevel(logging.INFO)
Logtail/Better Stack
Copy
from logtail import LogtailHandler
class LogtailAgent(ConversimpleAgent):
def __init__(self, **kwargs):
super().__init__(**kwargs)
# Set up Logtail handler
handler = LogtailHandler(source_token=os.getenv('LOGTAIL_SOURCE_TOKEN'))
logger = logging.getLogger('agent')
logger.addHandler(handler)
logger.setLevel(logging.INFO)
Best Practices
1. Log at Appropriate Levels
Copy
# DEBUG: Detailed information for diagnosing issues
logger.debug(f"Tool parameters: {params}")
# INFO: General informational messages
logger.info(f"Processing order {order_id}")
# WARNING: Something unexpected but handled
logger.warning(f"Slow tool execution: {duration}ms")
# ERROR: Error events
logger.error(f"Payment failed: {error}")
# CRITICAL: Serious errors
logger.critical(f"Database connection lost")
2. Don’t Log Sensitive Information
Copy
# ❌ Bad - logs sensitive data
logger.info(f"Processing payment for card {card_number}")
# ✅ Good - logs safely
logger.info(f"Processing payment for card ending in {card_number[-4:]}")
3. Use Correlation IDs
Copy
import uuid
class CorrelatedAgent(ConversimpleAgent):
def on_conversation_started(self, conversation_id: str):
"""Generate correlation ID"""
correlation_id = str(uuid.uuid4())
logger.info(
"Conversation started",
extra={
"conversation_id": conversation_id,
"correlation_id": correlation_id
}
)
# Store for later use
self.correlation_ids[conversation_id] = correlation_id
4. Monitor Key Performance Indicators
Track these essential metrics:- Conversation Count: Active, started, ended
- Tool Call Rate: Calls per minute/hour
- Tool Duration: Average and P95/P99 latency
- Error Rate: Errors per minute, by type
- Success Rate: Successful tool executions vs failures
5. Set Up Alerts
Alert on critical conditions:Copy
class AlertingAgent(ConversimpleAgent):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.error_count = 0
self.alert_threshold = 10
def on_error(self, error_type: str, message: str, details: dict):
"""Alert on high error rates"""
self.error_count += 1
if self.error_count >= self.alert_threshold:
self.send_alert(
severity="HIGH",
message=f"Error threshold exceeded: {self.error_count} errors"
)
self.error_count = 0 # Reset counter