from datetime import datetime, timedelta
class CircuitBreaker:
def __init__(self, failure_threshold=5, timeout=60):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.failures = 0
self.last_failure_time = None
self.state = "closed" # closed, open, half_open
def call(self, func, *args, **kwargs):
"""Execute function through circuit breaker"""
if self.state == "open":
if self._should_attempt_reset():
self.state = "half_open"
else:
raise CircuitBreakerOpen("Service unavailable")
try:
result = func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _on_success(self):
"""Reset circuit breaker on success"""
self.failures = 0
self.state = "closed"
def _on_failure(self):
"""Record failure"""
self.failures += 1
self.last_failure_time = datetime.now()
if self.failures >= self.failure_threshold:
self.state = "open"
print(f"🔴 Circuit breaker opened after {self.failures} failures")
def _should_attempt_reset(self):
"""Check if we should try again"""
return (datetime.now() - self.last_failure_time).seconds >= self.timeout
# Use circuit breaker
class ResilientAgent(ConversimpleAgent):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.payment_circuit = CircuitBreaker(failure_threshold=3, timeout=30)
@tool("Process payment")
def process_payment(self, amount: float, card: str) -> dict:
"""Tool protected by circuit breaker"""
try:
return self.payment_circuit.call(
payment_service.charge,
amount=amount,
card=card
)
except CircuitBreakerOpen:
return {
"error": "service_unavailable",
"message": "Payment service is temporarily unavailable"
}