Overview
Your agent can integrate with any database to store and retrieve data during conversations.PostgreSQL
Setup Connection
Copy
import psycopg2
from psycopg2.pool import SimpleConnectionPool
class DatabaseAgent(ConversimpleAgent):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.db_pool = SimpleConnectionPool(
1, 10,
host="localhost",
database="myapp",
user="user",
password=os.getenv('DB_PASSWORD')
)
@tool("Get customer")
def get_customer(self, customer_id: str) -> dict:
"""Query PostgreSQL"""
conn = self.db_pool.getconn()
try:
with conn.cursor() as cur:
cur.execute(
"SELECT * FROM customers WHERE id = %s",
(customer_id,)
)
row = cur.fetchone()
return {"id": row[0], "name": row[1], "email": row[2]}
finally:
self.db_pool.putconn(conn)
SQLite
Simple File-Based Database
Copy
import sqlite3
class SQLiteAgent(ConversimpleAgent):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.db = sqlite3.connect('agent.db')
self.create_tables()
def create_tables(self):
self.db.execute('''
CREATE TABLE IF NOT EXISTS conversations (
id TEXT PRIMARY KEY,
data JSON,
created_at TIMESTAMP
)
''')
self.db.commit()
@tool("Save data")
def save_data(self, key: str, value: str) -> dict:
"""Store data"""
self.db.execute(
"INSERT INTO data (key, value) VALUES (?, ?)",
(key, value)
)
self.db.commit()
return {"success": True}
MongoDB
Document Database
Copy
from pymongo import MongoClient
class MongoAgent(ConversimpleAgent):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.client = MongoClient('mongodb://localhost:27017/')
self.db = self.client.myapp
@tool("Get product")
def get_product(self, product_id: str) -> dict:
"""Query MongoDB"""
product = self.db.products.find_one({"_id": product_id})
return product if product else {"error": "not_found"}
@tool("Search products")
def search_products(self, query: str) -> dict:
"""Full-text search"""
products = list(self.db.products.find(
{"$text": {"$search": query}}
).limit(10))
return {"products": products}
Redis
Caching and Session Storage
Copy
import redis
import json
class RedisAgent(ConversimpleAgent):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.redis = redis.Redis(
host='localhost',
port=6379,
decode_responses=True
)
@tool("Get cached data")
def get_cached(self, key: str) -> dict:
"""Get from cache"""
value = self.redis.get(f"cache:{key}")
return json.loads(value) if value else {"error": "not_found"}
@tool("Save to cache")
def save_cached(self, key: str, value: dict, ttl: int = 3600) -> dict:
"""Save with TTL"""
self.redis.setex(
f"cache:{key}",
ttl,
json.dumps(value)
)
return {"success": True}
Best Practices
Use Connection Pooling
Copy
# ✅ Good - reuse connections
pool = SimpleConnectionPool(1, 10, **db_config)
# ❌ Bad - create new connection each time
def get_data():
conn = psycopg2.connect(**db_config) # Slow!
Handle Database Errors
Copy
@tool("Get data")
def get_data(self, id: str) -> dict:
"""Handle DB errors"""
try:
return database.get(id)
except DatabaseConnectionError:
logger.error("Database connection failed")
return {"error": "service_unavailable"}
Use Parameterized Queries
Copy
# ✅ Safe from SQL injection
cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
# ❌ Vulnerable to SQL injection
cursor.execute(f"SELECT * FROM users WHERE id = '{user_id}'")
Next Steps
External APIs
Integrate with REST APIs