Salesforce
Copy
from simple_salesforce import Salesforce
class SalesforceAgent(ConversimpleAgent):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.sf = Salesforce(
username=os.getenv('SF_USERNAME'),
password=os.getenv('SF_PASSWORD'),
security_token=os.getenv('SF_TOKEN')
)
@tool("Get contact")
def get_contact(self, email: str) -> dict:
"""Query Salesforce contact"""
result = self.sf.query(
f"SELECT Id, Name, Email FROM Contact WHERE Email = '{email}'"
)
if result['records']:
return result['records'][0]
return {"error": "not_found"}
@tool("Create lead")
def create_lead(self, name: str, email: str, company: str) -> dict:
"""Create Salesforce lead"""
result = self.sf.Lead.create({
'LastName': name,
'Email': email,
'Company': company
})
return {"id": result['id'], "success": result['success']}
HubSpot
Copy
from hubspot import HubSpot
class HubSpotAgent(ConversimpleAgent):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.hubspot = HubSpot(access_token=os.getenv('HUBSPOT_TOKEN'))
@tool("Get contact")
def get_contact(self, email: str) -> dict:
"""Get HubSpot contact"""
try:
contact = self.hubspot.crm.contacts.basic_api.get_by_id(email)
return contact.to_dict()
except:
return {"error": "not_found"}
@tool("Create contact")
def create_contact(self, email: str, firstname: str, lastname: str) -> dict:
"""Create HubSpot contact"""
properties = {
"email": email,
"firstname": firstname,
"lastname": lastname
}
contact = self.hubspot.crm.contacts.basic_api.create(
simple_public_object_input={"properties": properties}
)
return {"id": contact.id, "success": True}
Notification Services
Email and SMS integration