Skip to main content

Overview

Proper logging and monitoring are essential for operating reliable voice agents in production. Conversimple provides built-in logging capabilities and integrates with popular monitoring tools.

Logging Basics

Configure Logging Level

Set the logging level for your agent:
import logging
from conversimple import ConversimpleAgent

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

# Set SDK log level
logging.getLogger('conversimple').setLevel(logging.DEBUG)

# Your agent
class MyAgent(ConversimpleAgent):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.logger = logging.getLogger(__name__)

Log Levels

LevelWhen to UseExample
DEBUGDetailed diagnostic informationTool parameter values, state changes
INFOGeneral informational messagesConversation started/ended, tool calls
WARNINGUnexpected but handled situationsSlow tool execution, retry attempts
ERRORError eventsTool failures, service errors
CRITICALSerious errors requiring immediate attentionAuthentication failures, system crashes

Basic Logging Example

from conversimple import ConversimpleAgent, tool
import logging

logger = logging.getLogger(__name__)

class LoggingAgent(ConversimpleAgent):
    def on_conversation_started(self, conversation_id: str):
        """Log conversation start"""
        logger.info(f"Conversation started: {conversation_id}")

    @tool("Get customer info")
    def get_customer(self, customer_id: str) -> dict:
        """Tool with logging"""
        logger.debug(f"Looking up customer: {customer_id}")

        try:
            customer = database.get_customer(customer_id)
            logger.info(f"Customer found: {customer_id}")
            return customer
        except Exception as e:
            logger.error(f"Failed to get customer {customer_id}: {e}")
            raise

    def on_conversation_ended(self, conversation_id: str):
        """Log conversation end"""
        logger.info(f"Conversation ended: {conversation_id}")

Structured Logging

JSON Logging

Use structured logs for better searchability:
import json
import logging
from datetime import datetime

class JsonFormatter(logging.Formatter):
    """JSON log formatter"""
    def format(self, record):
        log_data = {
            "timestamp": datetime.utcnow().isoformat(),
            "level": record.levelname,
            "logger": record.name,
            "message": record.getMessage(),
            "function": record.funcName,
            "line": record.lineno,
        }

        # Add extra fields
        if hasattr(record, 'conversation_id'):
            log_data['conversation_id'] = record.conversation_id
        if hasattr(record, 'tool_name'):
            log_data['tool_name'] = record.tool_name
        if hasattr(record, 'duration_ms'):
            log_data['duration_ms'] = record.duration_ms

        return json.dumps(log_data)

# Configure handler with JSON formatter
handler = logging.StreamHandler()
handler.setFormatter(JsonFormatter())
logger = logging.getLogger('agent')
logger.addHandler(handler)
logger.setLevel(logging.INFO)

Contextual Logging

Add context to your logs:
class ContextualAgent(ConversimpleAgent):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.logger = logging.getLogger(__name__)

    def on_conversation_started(self, conversation_id: str):
        """Log with context"""
        self.logger.info(
            "Conversation started",
            extra={
                "conversation_id": conversation_id,
                "agent_id": self.agent_id,
                "timestamp": datetime.now().isoformat()
            }
        )

    @tool("Process order")
    def process_order(self, order_id: str, amount: float) -> dict:
        """Tool with contextual logging"""
        start_time = time.time()

        self.logger.info(
            "Processing order",
            extra={
                "tool_name": "process_order",
                "order_id": order_id,
                "amount": amount
            }
        )

        try:
            result = payment_service.charge(order_id, amount)

            duration_ms = (time.time() - start_time) * 1000
            self.logger.info(
                "Order processed successfully",
                extra={
                    "tool_name": "process_order",
                    "order_id": order_id,
                    "duration_ms": duration_ms,
                    "transaction_id": result.transaction_id
                }
            )

            return {"success": True, "transaction_id": result.transaction_id}

        except Exception as e:
            self.logger.error(
                "Order processing failed",
                extra={
                    "tool_name": "process_order",
                    "order_id": order_id,
                    "error": str(e)
                },
                exc_info=True  # Include stack trace
            )
            raise

Metrics and Monitoring

Track Key Metrics

Monitor important agent metrics:
from collections import defaultdict
from datetime import datetime, timedelta
import time

class MetricsAgent(ConversimpleAgent):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.metrics = {
            "conversations_started": 0,
            "conversations_ended": 0,
            "tool_calls": defaultdict(int),
            "tool_durations": defaultdict(list),
            "errors": defaultdict(int)
        }

    def on_conversation_started(self, conversation_id: str):
        """Track conversation starts"""
        self.metrics["conversations_started"] += 1

    def on_conversation_ended(self, conversation_id: str):
        """Track conversation ends"""
        self.metrics["conversations_ended"] += 1

    def on_tool_called(self, tool_call):
        """Track tool calls"""
        self.metrics["tool_calls"][tool_call.tool_name] += 1
        # Store start time for duration tracking
        tool_call._start_time = time.time()

    def on_tool_completed(self, call_id: str, result):
        """Track tool completion and duration"""
        if hasattr(result, '_start_time'):
            duration = time.time() - result._start_time
            self.metrics["tool_durations"][result.tool_name].append(duration)

    def on_error(self, error_type: str, message: str, details: dict):
        """Track errors"""
        self.metrics["errors"][error_type] += 1

    def get_metrics_summary(self) -> dict:
        """Get metrics summary"""
        return {
            "conversations": {
                "started": self.metrics["conversations_started"],
                "ended": self.metrics["conversations_ended"],
                "active": self.metrics["conversations_started"] - self.metrics["conversations_ended"]
            },
            "tool_calls": dict(self.metrics["tool_calls"]),
            "tool_avg_duration": {
                name: sum(durations) / len(durations)
                for name, durations in self.metrics["tool_durations"].items()
                if durations
            },
            "errors": dict(self.metrics["errors"])
        }

Periodic Metrics Reporting

Report metrics at regular intervals:
import asyncio

class ReportingAgent(ConversimpleAgent):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.metrics_task = None

    async def start(self):
        """Start with metrics reporting"""
        await super().start()

        # Start metrics reporting task
        self.metrics_task = asyncio.create_task(self.report_metrics())

    async def report_metrics(self):
        """Report metrics every minute"""
        while True:
            await asyncio.sleep(60)  # Report every minute

            metrics = self.get_metrics_summary()
            logger.info(
                "Metrics report",
                extra={"metrics": metrics}
            )

            # Send to monitoring service
            await self.send_to_monitoring(metrics)

    async def stop(self):
        """Stop and cancel metrics task"""
        if self.metrics_task:
            self.metrics_task.cancel()
        await super().stop()

Integration with Monitoring Services

Datadog Integration

from datadog import initialize, statsd

# Initialize Datadog
initialize(
    api_key=os.getenv('DATADOG_API_KEY'),
    app_key=os.getenv('DATADOG_APP_KEY')
)

class DatadogAgent(ConversimpleAgent):
    def on_conversation_started(self, conversation_id: str):
        """Track conversation starts"""
        statsd.increment('agent.conversations.started', tags=['agent:voice'])

    @tool("Get customer")
    def get_customer(self, customer_id: str) -> dict:
        """Tool with Datadog metrics"""
        start_time = time.time()

        try:
            customer = database.get_customer(customer_id)

            # Record duration
            duration = (time.time() - start_time) * 1000
            statsd.histogram('agent.tool.duration', duration, tags=['tool:get_customer'])

            # Count success
            statsd.increment('agent.tool.success', tags=['tool:get_customer'])

            return customer

        except Exception as e:
            # Count error
            statsd.increment('agent.tool.error', tags=['tool:get_customer', f'error:{type(e).__name__}'])
            raise

Prometheus Integration

from prometheus_client import Counter, Histogram, Gauge, start_http_server

# Define metrics
conversations_total = Counter('agent_conversations_total', 'Total conversations')
tool_calls_total = Counter('agent_tool_calls_total', 'Total tool calls', ['tool_name'])
tool_duration = Histogram('agent_tool_duration_seconds', 'Tool execution duration', ['tool_name'])
active_conversations = Gauge('agent_active_conversations', 'Currently active conversations')
errors_total = Counter('agent_errors_total', 'Total errors', ['error_type'])

class PrometheusAgent(ConversimpleAgent):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        # Start Prometheus HTTP server on port 8000
        start_http_server(8000)

    def on_conversation_started(self, conversation_id: str):
        """Track with Prometheus"""
        conversations_total.inc()
        active_conversations.inc()

    def on_conversation_ended(self, conversation_id: str):
        """Update active conversations"""
        active_conversations.dec()

    @tool("Search products")
    def search_products(self, query: str) -> dict:
        """Tool with Prometheus metrics"""
        tool_calls_total.labels(tool_name='search_products').inc()

        with tool_duration.labels(tool_name='search_products').time():
            return search_engine.search(query)

    def on_error(self, error_type: str, message: str, details: dict):
        """Track errors"""
        errors_total.labels(error_type=error_type).inc()

CloudWatch Integration

import boto3
from datetime import datetime

cloudwatch = boto3.client('cloudwatch')

class CloudWatchAgent(ConversimpleAgent):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.namespace = 'Conversimple/Agent'

    def put_metric(self, metric_name: str, value: float, unit: str = 'Count', **dimensions):
        """Put metric to CloudWatch"""
        cloudwatch.put_metric_data(
            Namespace=self.namespace,
            MetricData=[{
                'MetricName': metric_name,
                'Value': value,
                'Unit': unit,
                'Timestamp': datetime.utcnow(),
                'Dimensions': [
                    {'Name': k, 'Value': v}
                    for k, v in dimensions.items()
                ]
            }]
        )

    def on_conversation_started(self, conversation_id: str):
        """Track conversation starts"""
        self.put_metric('ConversationsStarted', 1, agent_id=self.agent_id)

    @tool("Process payment")
    def process_payment(self, amount: float) -> dict:
        """Tool with CloudWatch metrics"""
        start_time = time.time()

        try:
            result = payment_service.charge(amount)

            # Record duration
            duration = (time.time() - start_time) * 1000
            self.put_metric('ToolDuration', duration, unit='Milliseconds', tool='process_payment')

            # Record amount
            self.put_metric('PaymentAmount', amount, unit='None', tool='process_payment')

            return result

        except Exception as e:
            self.put_metric('ToolError', 1, tool='process_payment', error=type(e).__name__)
            raise

Log Aggregation

ELK Stack (Elasticsearch, Logstash, Kibana)

import logging
from logging.handlers import SocketHandler

class ELKAgent(ConversimpleAgent):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)

        # Set up Logstash handler
        handler = SocketHandler('logstash-server', 5000)
        handler.setFormatter(JsonFormatter())

        logger = logging.getLogger('agent')
        logger.addHandler(handler)
        logger.setLevel(logging.INFO)

Logtail/Better Stack

from logtail import LogtailHandler

class LogtailAgent(ConversimpleAgent):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)

        # Set up Logtail handler
        handler = LogtailHandler(source_token=os.getenv('LOGTAIL_SOURCE_TOKEN'))

        logger = logging.getLogger('agent')
        logger.addHandler(handler)
        logger.setLevel(logging.INFO)

Best Practices

1. Log at Appropriate Levels

# DEBUG: Detailed information for diagnosing issues
logger.debug(f"Tool parameters: {params}")

# INFO: General informational messages
logger.info(f"Processing order {order_id}")

# WARNING: Something unexpected but handled
logger.warning(f"Slow tool execution: {duration}ms")

# ERROR: Error events
logger.error(f"Payment failed: {error}")

# CRITICAL: Serious errors
logger.critical(f"Database connection lost")

2. Don’t Log Sensitive Information

# ❌ Bad - logs sensitive data
logger.info(f"Processing payment for card {card_number}")

# ✅ Good - logs safely
logger.info(f"Processing payment for card ending in {card_number[-4:]}")

3. Use Correlation IDs

import uuid

class CorrelatedAgent(ConversimpleAgent):
    def on_conversation_started(self, conversation_id: str):
        """Generate correlation ID"""
        correlation_id = str(uuid.uuid4())

        logger.info(
            "Conversation started",
            extra={
                "conversation_id": conversation_id,
                "correlation_id": correlation_id
            }
        )

        # Store for later use
        self.correlation_ids[conversation_id] = correlation_id

4. Monitor Key Performance Indicators

Track these essential metrics:
  • Conversation Count: Active, started, ended
  • Tool Call Rate: Calls per minute/hour
  • Tool Duration: Average and P95/P99 latency
  • Error Rate: Errors per minute, by type
  • Success Rate: Successful tool executions vs failures

5. Set Up Alerts

Alert on critical conditions:
class AlertingAgent(ConversimpleAgent):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.error_count = 0
        self.alert_threshold = 10

    def on_error(self, error_type: str, message: str, details: dict):
        """Alert on high error rates"""
        self.error_count += 1

        if self.error_count >= self.alert_threshold:
            self.send_alert(
                severity="HIGH",
                message=f"Error threshold exceeded: {self.error_count} errors"
            )
            self.error_count = 0  # Reset counter

Next Steps