Compare commits

...

2 Commits

Author SHA1 Message Date
Timothy 17df56fd66 feat: brevo tools 2026-02-22 20:26:16 -08:00
Timothy c409bd7b0e feat: consistent tool testing 2026-02-22 20:09:00 -08:00
8 changed files with 1294 additions and 1 deletions
+11 -1
View File
@@ -56,6 +56,7 @@ To add a new credential:
from .apollo import APOLLO_CREDENTIALS
from .base import CredentialError, CredentialSpec
from .bigquery import BIGQUERY_CREDENTIALS
from .brevo import BREVO_CREDENTIALS
from .browser import get_aden_auth_url, get_aden_setup_url, open_browser
from .calcom import CALCOM_CREDENTIALS
from .discord import DISCORD_CREDENTIALS
@@ -65,7 +66,12 @@ from .github import GITHUB_CREDENTIALS
from .google_calendar import GOOGLE_CALENDAR_CREDENTIALS
from .google_docs import GOOGLE_DOCS_CREDENTIALS
from .google_maps import GOOGLE_MAPS_CREDENTIALS
from .health_check import HealthCheckResult, check_credential_health
from .health_check import (
BaseHttpHealthChecker,
HealthCheckResult,
check_credential_health,
validate_integration_wiring,
)
from .hubspot import HUBSPOT_CREDENTIALS
from .llm import LLM_CREDENTIALS
from .news import NEWS_CREDENTIALS
@@ -104,6 +110,7 @@ CREDENTIAL_SPECS = {
**BIGQUERY_CREDENTIALS,
**CALCOM_CREDENTIALS,
**STRIPE_CREDENTIALS,
**BREVO_CREDENTIALS,
}
__all__ = [
@@ -114,8 +121,10 @@ __all__ = [
# Credential store adapter (replaces deprecated CredentialManager)
"CredentialStoreAdapter",
# Health check utilities
"BaseHttpHealthChecker",
"HealthCheckResult",
"check_credential_health",
"validate_integration_wiring",
# Browser utilities for OAuth2 flows
"open_browser",
"get_aden_auth_url",
@@ -147,4 +156,5 @@ __all__ = [
"CALCOM_CREDENTIALS",
"DISCORD_CREDENTIALS",
"STRIPE_CREDENTIALS",
"BREVO_CREDENTIALS",
]
+42
View File
@@ -0,0 +1,42 @@
"""
Brevo tool credentials.
Contains credentials for Brevo (formerly Sendinblue) transactional email,
SMS, and contact management integration.
"""
from .base import CredentialSpec
BREVO_CREDENTIALS = {
"brevo": CredentialSpec(
env_var="BREVO_API_KEY",
tools=[
"brevo_send_email",
"brevo_send_sms",
"brevo_create_contact",
"brevo_get_contact",
"brevo_update_contact",
],
required=True,
startup_required=False,
help_url="https://app.brevo.com/settings/keys/api",
description="Brevo API key for transactional email, SMS, and contact management",
# Auth method support
aden_supported=False,
direct_api_key_supported=True,
api_key_instructions="""To get a Brevo API key:
1. Go to https://app.brevo.com and create an account (or sign in)
2. Navigate to Settings > API Keys (or visit https://app.brevo.com/settings/keys/api)
3. Click "Generate a new API key"
4. Give it a name (e.g., "Hive Agent")
5. Copy the API key (starts with xkeysib-)
6. Store it securely - you won't be able to see it again!
7. Note: For sending emails, you'll need a verified sender domain or email""",
# Health check configuration
health_check_endpoint="https://api.brevo.com/v3/account",
health_check_method="GET",
# Credential store mapping
credential_id="brevo",
credential_key="api_key",
),
}
@@ -239,6 +239,181 @@ class OAuthBearerHealthChecker:
)
class BaseHttpHealthChecker:
"""Configurable base class for HTTP-based credential health checkers.
Reduces boilerplate by handling the common HTTP request/response/error pattern.
Subclasses configure via class constants and override hooks as needed.
Supports five auth patterns:
- AUTH_BEARER: Authorization: Bearer <token>
- AUTH_HEADER: Custom header name/value template
- AUTH_QUERY: Token as query parameter
- AUTH_BASIC: HTTP Basic Authentication
- AUTH_URL: Token embedded in URL (e.g., Telegram)
Example::
class CalcomHealthChecker(BaseHttpHealthChecker):
ENDPOINT = "https://api.cal.com/v1/me"
SERVICE_NAME = "Cal.com"
AUTH_TYPE = "query"
AUTH_QUERY_PARAM_NAME = "apiKey"
"""
# Auth pattern constants
AUTH_BEARER = "bearer"
AUTH_HEADER = "header"
AUTH_QUERY = "query"
AUTH_BASIC = "basic"
AUTH_URL = "url"
# Subclass configuration
ENDPOINT: str = ""
SERVICE_NAME: str = ""
HTTP_METHOD: str = "GET"
TIMEOUT: float = 10.0
# Auth configuration
AUTH_TYPE: str = AUTH_BEARER
AUTH_HEADER_NAME: str = "Authorization"
AUTH_HEADER_TEMPLATE: str = "Bearer {token}"
AUTH_QUERY_PARAM_NAME: str = "key"
# Status code interpretation
VALID_STATUSES: frozenset[int] = frozenset({200})
RATE_LIMITED_STATUSES: frozenset[int] = frozenset({429})
AUTHENTICATED_ERROR_STATUSES: frozenset[int] = frozenset()
INVALID_STATUSES: frozenset[int] = frozenset({401})
FORBIDDEN_STATUSES: frozenset[int] = frozenset({403})
def _build_url(self, credential_value: str) -> str:
"""Build request URL. Override for URL-template auth."""
return self.ENDPOINT
def _build_headers(self, credential_value: str) -> dict[str, str]:
"""Build request headers based on AUTH_TYPE."""
headers: dict[str, str] = {"Accept": "application/json"}
if self.AUTH_TYPE == self.AUTH_BEARER:
headers["Authorization"] = f"Bearer {credential_value}"
elif self.AUTH_TYPE == self.AUTH_HEADER:
headers[self.AUTH_HEADER_NAME] = self.AUTH_HEADER_TEMPLATE.format(
token=credential_value
)
return headers
def _build_params(self, credential_value: str) -> dict[str, str]:
"""Build query parameters. Includes auth param for AUTH_QUERY type."""
if self.AUTH_TYPE == self.AUTH_QUERY:
return {self.AUTH_QUERY_PARAM_NAME: credential_value}
return {}
def _build_auth(self, credential_value: str) -> tuple[str, str] | None:
"""Build HTTP Basic auth tuple for AUTH_BASIC type."""
if self.AUTH_TYPE == self.AUTH_BASIC:
return (credential_value, "")
return None
def _build_json_body(self, credential_value: str) -> dict | None:
"""Build JSON request body. Override for POST requests that need one."""
return None
def _extract_identity(self, data: dict) -> dict[str, str]:
"""Extract identity info from successful response. Override in subclass."""
return {}
def _interpret_response(self, response: httpx.Response) -> HealthCheckResult:
"""Interpret HTTP response. Override for non-standard status logic."""
status = response.status_code
if status in self.VALID_STATUSES:
identity: dict[str, str] = {}
try:
data = response.json()
identity = self._extract_identity(data)
except Exception:
pass
return HealthCheckResult(
valid=True,
message=f"{self.SERVICE_NAME} credentials valid",
details={"identity": identity} if identity else {},
)
elif status in self.RATE_LIMITED_STATUSES:
return HealthCheckResult(
valid=True,
message=f"{self.SERVICE_NAME} credentials valid (rate limited)",
details={"status_code": status, "rate_limited": True},
)
elif status in self.AUTHENTICATED_ERROR_STATUSES:
return HealthCheckResult(
valid=True,
message=f"{self.SERVICE_NAME} credentials valid",
details={"status_code": status},
)
elif status in self.INVALID_STATUSES:
return HealthCheckResult(
valid=False,
message=f"{self.SERVICE_NAME} credentials are invalid or expired",
details={"status_code": status},
)
elif status in self.FORBIDDEN_STATUSES:
return HealthCheckResult(
valid=False,
message=f"{self.SERVICE_NAME} credentials lack required permissions",
details={"status_code": status},
)
else:
return HealthCheckResult(
valid=False,
message=f"{self.SERVICE_NAME} API returned status {status}",
details={"status_code": status},
)
def check(self, credential_value: str) -> HealthCheckResult:
"""Execute the health check. Normally not overridden."""
try:
url = self._build_url(credential_value)
headers = self._build_headers(credential_value)
params = self._build_params(credential_value)
auth = self._build_auth(credential_value)
json_body = self._build_json_body(credential_value)
with httpx.Client(timeout=self.TIMEOUT) as client:
kwargs: dict[str, Any] = {"headers": headers}
if params:
kwargs["params"] = params
if auth:
kwargs["auth"] = auth
if json_body is not None:
kwargs["json"] = json_body
if self.HTTP_METHOD.upper() == "POST":
response = client.post(url, **kwargs)
else:
response = client.get(url, **kwargs)
return self._interpret_response(response)
except httpx.TimeoutException:
return HealthCheckResult(
valid=False,
message=f"{self.SERVICE_NAME} API request timed out",
details={"error": "timeout"},
)
except httpx.RequestError as e:
error_msg = str(e)
if any(
s in error_msg
for s in ("Bearer", "Authorization", "api_key", "token")
):
error_msg = "Request failed (details redacted for security)"
return HealthCheckResult(
valid=False,
message=f"Failed to connect to {self.SERVICE_NAME}: {error_msg}",
details={"error": error_msg},
)
class GoogleCalendarHealthChecker(OAuthBearerHealthChecker):
"""Health checker for Google Calendar OAuth tokens."""
@@ -740,6 +915,152 @@ class GoogleGmailHealthChecker(OAuthBearerHealthChecker):
return {"email": email} if email else {}
# --- New checkers using BaseHttpHealthChecker ---
class StripeHealthChecker(BaseHttpHealthChecker):
"""Health checker for Stripe API key."""
ENDPOINT = "https://api.stripe.com/v1/balance"
SERVICE_NAME = "Stripe"
class ExaSearchHealthChecker(BaseHttpHealthChecker):
"""Health checker for Exa Search API key."""
ENDPOINT = "https://api.exa.ai/search"
SERVICE_NAME = "Exa Search"
HTTP_METHOD = "POST"
def _build_json_body(self, credential_value: str) -> dict:
return {"query": "test", "numResults": 1}
class GoogleDocsHealthChecker(OAuthBearerHealthChecker):
"""Health checker for Google Docs OAuth tokens."""
def __init__(self):
super().__init__(
endpoint="https://docs.googleapis.com/v1/documents/1",
service_name="Google Docs",
)
class CalcomHealthChecker(BaseHttpHealthChecker):
"""Health checker for Cal.com API key."""
ENDPOINT = "https://api.cal.com/v1/me"
SERVICE_NAME = "Cal.com"
AUTH_TYPE = BaseHttpHealthChecker.AUTH_QUERY
AUTH_QUERY_PARAM_NAME = "apiKey"
class SerpApiHealthChecker(BaseHttpHealthChecker):
"""Health checker for SerpAPI key."""
ENDPOINT = "https://serpapi.com/account.json"
SERVICE_NAME = "SerpAPI"
AUTH_TYPE = BaseHttpHealthChecker.AUTH_QUERY
AUTH_QUERY_PARAM_NAME = "api_key"
class ApolloHealthChecker(BaseHttpHealthChecker):
"""Health checker for Apollo.io API key."""
ENDPOINT = "https://api.apollo.io/v1/auth/health"
SERVICE_NAME = "Apollo"
AUTH_TYPE = BaseHttpHealthChecker.AUTH_QUERY
AUTH_QUERY_PARAM_NAME = "api_key"
class TelegramHealthChecker(BaseHttpHealthChecker):
"""Health checker for Telegram bot token."""
SERVICE_NAME = "Telegram"
AUTH_TYPE = BaseHttpHealthChecker.AUTH_URL
def _build_url(self, credential_value: str) -> str:
return f"https://api.telegram.org/bot{credential_value}/getMe"
def _build_headers(self, credential_value: str) -> dict[str, str]:
return {"Accept": "application/json"}
def _interpret_response(self, response: httpx.Response) -> HealthCheckResult:
if response.status_code == 200:
try:
data = response.json()
if data.get("ok"):
username = data.get("result", {}).get("username", "unknown")
identity = {"username": username} if username != "unknown" else {}
return HealthCheckResult(
valid=True,
message=f"Telegram bot token valid (bot: @{username})",
details={"identity": identity},
)
else:
return HealthCheckResult(
valid=False,
message="Telegram bot token is invalid",
details={"telegram_error": data.get("description", "")},
)
except Exception:
return HealthCheckResult(
valid=True,
message="Telegram credentials valid",
)
elif response.status_code == 401:
return HealthCheckResult(
valid=False,
message="Telegram bot token is invalid",
details={"status_code": 401},
)
else:
return HealthCheckResult(
valid=False,
message=f"Telegram API returned status {response.status_code}",
details={"status_code": response.status_code},
)
class NewsdataHealthChecker(BaseHttpHealthChecker):
"""Health checker for Newsdata.io API key."""
ENDPOINT = "https://newsdata.io/api/1/news"
SERVICE_NAME = "Newsdata"
AUTH_TYPE = BaseHttpHealthChecker.AUTH_QUERY
AUTH_QUERY_PARAM_NAME = "apikey"
def _build_params(self, credential_value: str) -> dict[str, str]:
params = super()._build_params(credential_value)
params["q"] = "test"
return params
class FinlightHealthChecker(BaseHttpHealthChecker):
"""Health checker for Finlight API key."""
ENDPOINT = "https://api.finlight.me/v1/news"
SERVICE_NAME = "Finlight"
class BrevoHealthChecker(BaseHttpHealthChecker):
"""Health checker for Brevo API key."""
ENDPOINT = "https://api.brevo.com/v3/account"
SERVICE_NAME = "Brevo"
AUTH_TYPE = BaseHttpHealthChecker.AUTH_HEADER
AUTH_HEADER_NAME = "api-key"
AUTH_HEADER_TEMPLATE = "{token}"
def _extract_identity(self, data: dict) -> dict[str, str]:
identity: dict[str, str] = {}
if data.get("email"):
identity["email"] = data["email"]
if data.get("companyName"):
identity["company"] = data["companyName"]
return identity
# Registry of health checkers
HEALTH_CHECKERS: dict[str, CredentialHealthChecker] = {
"discord": DiscordHealthChecker(),
@@ -753,6 +1074,16 @@ HEALTH_CHECKERS: dict[str, CredentialHealthChecker] = {
"anthropic": AnthropicHealthChecker(),
"github": GitHubHealthChecker(),
"resend": ResendHealthChecker(),
"stripe": StripeHealthChecker(),
"exa_search": ExaSearchHealthChecker(),
"google_docs": GoogleDocsHealthChecker(),
"calcom": CalcomHealthChecker(),
"serpapi": SerpApiHealthChecker(),
"apollo": ApolloHealthChecker(),
"telegram": TelegramHealthChecker(),
"newsdata": NewsdataHealthChecker(),
"finlight": FinlightHealthChecker(),
"brevo": BrevoHealthChecker(),
}
@@ -807,3 +1138,81 @@ def check_credential_health(
return checker.check(credential_value, kwargs["cse_id"])
return checker.check(credential_value)
def validate_integration_wiring(credential_name: str) -> list[str]:
"""Check that a credential integration is fully wired up.
Returns a list of issues found. Empty list means everything is correct.
Use during development to verify a new integration has all required pieces:
CredentialSpec, health checker, endpoint consistency, and required fields.
Args:
credential_name: The credential name to validate (e.g., 'jira').
Returns:
List of issue descriptions. Empty if fully wired.
Example::
issues = validate_integration_wiring("stripe")
for issue in issues:
print(f" - {issue}")
"""
from . import CREDENTIAL_SPECS
issues: list[str] = []
# 1. Check spec exists
spec = CREDENTIAL_SPECS.get(credential_name)
if spec is None:
issues.append(
f"No CredentialSpec for '{credential_name}' in CREDENTIAL_SPECS. "
f"Add it to the appropriate category file and import in __init__.py."
)
return issues
# 2. Check required fields
if not spec.env_var:
issues.append("CredentialSpec.env_var is empty")
if not spec.description:
issues.append("CredentialSpec.description is empty")
if not spec.tools and not spec.node_types:
issues.append("CredentialSpec has no tools or node_types")
if not spec.help_url:
issues.append("CredentialSpec.help_url is empty (users need this to get credentials)")
if spec.direct_api_key_supported and not spec.api_key_instructions:
issues.append(
"CredentialSpec.api_key_instructions is empty but "
"direct_api_key_supported=True"
)
# 3. Check health check
if not spec.health_check_endpoint:
issues.append(
"CredentialSpec.health_check_endpoint is empty. "
"Add a lightweight API endpoint for credential validation."
)
else:
checker = HEALTH_CHECKERS.get(credential_name)
if checker is None:
issues.append(
f"No entry in HEALTH_CHECKERS for '{credential_name}'. "
f"The OAuthBearerHealthChecker fallback will be used. "
f"Add a dedicated checker if auth is not Bearer token."
)
else:
checker_endpoint = getattr(checker, "ENDPOINT", None) or getattr(
checker, "endpoint", None
)
if checker_endpoint and spec.health_check_endpoint:
spec_base = spec.health_check_endpoint.split("?")[0]
checker_base = str(checker_endpoint).split("?")[0]
if spec_base != checker_base:
issues.append(
f"Endpoint mismatch: spec='{spec.health_check_endpoint}' "
f"vs checker='{checker_endpoint}'"
)
return issues
+2
View File
@@ -24,6 +24,7 @@ if TYPE_CHECKING:
from .account_info_tool import register_tools as register_account_info
from .apollo_tool import register_tools as register_apollo
from .bigquery_tool import register_tools as register_bigquery
from .brevo_tool import register_tools as register_brevo
from .calcom_tool import register_tools as register_calcom
from .calendar_tool import register_tools as register_calendar
from .csv_tool import register_tools as register_csv
@@ -144,6 +145,7 @@ def register_all_tools(
register_subdomain_enumerator(mcp)
register_risk_scorer(mcp)
register_stripe(mcp, credentials=credentials)
register_brevo(mcp, credentials=credentials)
# Return the list of all registered tool names
return list(mcp._tool_manager._tools.keys())
@@ -0,0 +1,5 @@
"""Brevo (formerly Sendinblue) tool - transactional email, SMS, and contacts."""
from .brevo_tool import register_tools
__all__ = ["register_tools"]
@@ -0,0 +1,489 @@
"""
Brevo Tool - Send transactional emails, SMS, and manage contacts via Brevo API.
Supports:
- Transactional email sending
- Transactional SMS sending
- Contact create/read/update
API Reference: https://developers.brevo.com/reference
"""
from __future__ import annotations
import os
from typing import TYPE_CHECKING, Any
import httpx
from fastmcp import FastMCP
if TYPE_CHECKING:
from aden_tools.credentials import CredentialStoreAdapter
BREVO_API_BASE = "https://api.brevo.com/v3"
class _BrevoClient:
"""Internal client wrapping Brevo API v3 calls."""
def __init__(self, api_key: str):
self._api_key = api_key
@property
def _headers(self) -> dict[str, str]:
return {
"api-key": self._api_key,
"Content-Type": "application/json",
"Accept": "application/json",
}
def _handle_response(self, response: httpx.Response) -> dict[str, Any]:
"""Handle common HTTP error codes."""
if response.status_code == 401:
return {"error": "Invalid Brevo API key"}
if response.status_code == 400:
try:
detail = response.json()
msg = detail.get("message", response.text)
except Exception:
msg = response.text
return {"error": f"Bad request: {msg}"}
if response.status_code == 403:
return {"error": "Brevo API key lacks required permissions"}
if response.status_code == 404:
return {"error": "Resource not found"}
if response.status_code == 429:
return {"error": "Rate limit exceeded. Try again later."}
if response.status_code >= 400:
try:
detail = response.json().get("message", response.text)
except Exception:
detail = response.text
return {
"error": f"Brevo API error (HTTP {response.status_code}): {detail}"
}
# Success (200, 201, 204)
if response.status_code == 204:
return {"success": True}
try:
return response.json()
except Exception:
return {"success": True}
def send_email(
self,
to: list[dict[str, str]],
subject: str,
html_content: str,
sender: dict[str, str],
text_content: str | None = None,
cc: list[dict[str, str]] | None = None,
bcc: list[dict[str, str]] | None = None,
reply_to: dict[str, str] | None = None,
tags: list[str] | None = None,
) -> dict[str, Any]:
"""Send a transactional email."""
payload: dict[str, Any] = {
"to": to,
"subject": subject,
"htmlContent": html_content,
"sender": sender,
}
if text_content:
payload["textContent"] = text_content
if cc:
payload["cc"] = cc
if bcc:
payload["bcc"] = bcc
if reply_to:
payload["replyTo"] = reply_to
if tags:
payload["tags"] = tags
response = httpx.post(
f"{BREVO_API_BASE}/smtp/email",
headers=self._headers,
json=payload,
timeout=30.0,
)
return self._handle_response(response)
def send_sms(
self,
sender: str,
recipient: str,
content: str,
sms_type: str = "transactional",
tag: str | None = None,
) -> dict[str, Any]:
"""Send a transactional SMS."""
payload: dict[str, Any] = {
"sender": sender,
"recipient": recipient,
"content": content,
"type": sms_type,
}
if tag:
payload["tag"] = tag
response = httpx.post(
f"{BREVO_API_BASE}/transactionalSMS/send",
headers=self._headers,
json=payload,
timeout=30.0,
)
return self._handle_response(response)
def create_contact(
self,
email: str | None = None,
attributes: dict[str, Any] | None = None,
list_ids: list[int] | None = None,
update_enabled: bool = False,
) -> dict[str, Any]:
"""Create a new contact."""
payload: dict[str, Any] = {}
if email:
payload["email"] = email
if attributes:
payload["attributes"] = attributes
if list_ids:
payload["listIds"] = list_ids
if update_enabled:
payload["updateEnabled"] = True
response = httpx.post(
f"{BREVO_API_BASE}/contacts",
headers=self._headers,
json=payload,
timeout=30.0,
)
return self._handle_response(response)
def get_contact(self, identifier: str) -> dict[str, Any]:
"""Get a contact by email or ID."""
response = httpx.get(
f"{BREVO_API_BASE}/contacts/{identifier}",
headers=self._headers,
timeout=30.0,
)
return self._handle_response(response)
def update_contact(
self,
identifier: str,
attributes: dict[str, Any] | None = None,
list_ids: list[int] | None = None,
unlink_list_ids: list[int] | None = None,
) -> dict[str, Any]:
"""Update a contact."""
payload: dict[str, Any] = {}
if attributes:
payload["attributes"] = attributes
if list_ids:
payload["listIds"] = list_ids
if unlink_list_ids:
payload["unlinkListIds"] = unlink_list_ids
response = httpx.put(
f"{BREVO_API_BASE}/contacts/{identifier}",
headers=self._headers,
json=payload,
timeout=30.0,
)
return self._handle_response(response)
def register_tools(
mcp: FastMCP,
credentials: CredentialStoreAdapter | None = None,
) -> None:
"""Register Brevo tools with the MCP server."""
def _get_api_key() -> str | None:
"""Get Brevo API key from credential store or environment."""
if credentials is not None:
key = credentials.get("brevo")
if key is not None and not isinstance(key, str):
raise TypeError(
f"Expected string from credentials.get('brevo'), got {type(key).__name__}"
)
return key
return os.getenv("BREVO_API_KEY")
def _get_client() -> _BrevoClient | dict[str, str]:
"""Get a Brevo client, or return an error dict if no credentials."""
api_key = _get_api_key()
if not api_key:
return {
"error": "Brevo API key not configured",
"help": (
"Set BREVO_API_KEY environment variable or configure via "
"credential store. Get your key at https://app.brevo.com/settings/keys/api"
),
}
return _BrevoClient(api_key)
@mcp.tool()
def brevo_send_email(
to: list[dict[str, str]],
subject: str,
html_content: str,
sender_email: str,
sender_name: str = "",
text_content: str = "",
cc: list[dict[str, str]] | None = None,
bcc: list[dict[str, str]] | None = None,
reply_to_email: str = "",
tags: list[str] | None = None,
) -> dict[str, Any]:
"""
Send a transactional email via Brevo.
Use this for notifications, alerts, confirmations, or any triggered email.
Args:
to: Recipients list. Each item: {"email": "user@example.com", "name": "User Name"}.
Name is optional.
subject: Email subject line.
html_content: Email body as HTML string.
sender_email: Sender email address (must be a verified sender in Brevo).
sender_name: Sender display name. Optional.
text_content: Plain text alternative body. Optional.
cc: CC recipients. Same format as 'to'. Optional.
bcc: BCC recipients. Same format as 'to'. Optional.
reply_to_email: Reply-to email address. Optional.
tags: Tags for categorizing the email. Optional.
Returns:
Dict with messageId on success, or error dict on failure.
"""
client = _get_client()
if isinstance(client, dict):
return client
if not to:
return {"error": "At least one recipient is required"}
if not subject:
return {"error": "Subject is required"}
if not html_content:
return {"error": "HTML content is required"}
if not sender_email:
return {"error": "Sender email is required"}
sender: dict[str, str] = {"email": sender_email}
if sender_name:
sender["name"] = sender_name
reply_to = {"email": reply_to_email} if reply_to_email else None
try:
result = client.send_email(
to=to,
subject=subject,
html_content=html_content,
sender=sender,
text_content=text_content if text_content else None,
cc=cc,
bcc=bcc,
reply_to=reply_to,
tags=tags,
)
if "error" in result:
return result
return {
"success": True,
"message_id": result.get("messageId", ""),
"to": [r.get("email") for r in to],
"subject": subject,
}
except httpx.TimeoutException:
return {"error": "Brevo request timed out"}
except httpx.RequestError as e:
return {"error": f"Network error: {e}"}
@mcp.tool()
def brevo_send_sms(
sender: str,
recipient: str,
content: str,
sms_type: str = "transactional",
tag: str = "",
) -> dict[str, Any]:
"""
Send a transactional SMS via Brevo.
Use this for SMS notifications, alerts, or verification messages.
Args:
sender: Sender name (max 11 alphanumeric chars) or phone number (max 15 digits).
recipient: Recipient phone number with country code (e.g., "33612345678").
content: SMS message text. Messages over 160 chars are sent as multiple SMS.
sms_type: Either "transactional" or "marketing". Defaults to "transactional".
tag: Optional tag for categorizing the SMS.
Returns:
Dict with messageId on success, or error dict on failure.
"""
client = _get_client()
if isinstance(client, dict):
return client
if not sender:
return {"error": "Sender is required"}
if not recipient:
return {"error": "Recipient phone number is required"}
if not content:
return {"error": "SMS content is required"}
try:
result = client.send_sms(
sender=sender,
recipient=recipient,
content=content,
sms_type=sms_type,
tag=tag if tag else None,
)
if "error" in result:
return result
return {
"success": True,
"message_id": result.get("messageId", ""),
"recipient": recipient,
}
except httpx.TimeoutException:
return {"error": "Brevo request timed out"}
except httpx.RequestError as e:
return {"error": f"Network error: {e}"}
@mcp.tool()
def brevo_create_contact(
email: str,
attributes: dict[str, Any] | None = None,
list_ids: list[int] | None = None,
update_enabled: bool = False,
) -> dict[str, Any]:
"""
Create a contact in Brevo.
Use this to add new contacts to your Brevo account for email/SMS campaigns.
Args:
email: Contact email address.
attributes: Contact attributes in UPPERCASE (e.g., {"FNAME": "John", "LNAME": "Doe"}).
Standard attributes: FNAME, LNAME, SMS (phone with country code like +33xxxxxxxxxx).
list_ids: List IDs to add the contact to. Optional.
update_enabled: If True, updates the contact if it already exists. Defaults to False.
Returns:
Dict with contact id on success, or error dict on failure.
"""
client = _get_client()
if isinstance(client, dict):
return client
if not email:
return {"error": "Email is required"}
try:
result = client.create_contact(
email=email,
attributes=attributes,
list_ids=list_ids,
update_enabled=update_enabled,
)
if "error" in result:
return result
return {
"success": True,
"id": result.get("id"),
"email": email,
}
except httpx.TimeoutException:
return {"error": "Brevo request timed out"}
except httpx.RequestError as e:
return {"error": f"Network error: {e}"}
@mcp.tool()
def brevo_get_contact(
identifier: str,
) -> dict[str, Any]:
"""
Get a contact from Brevo by email address or contact ID.
Args:
identifier: Contact email address or numeric contact ID.
Returns:
Dict with contact details (email, attributes, listIds, statistics)
or error dict on failure.
"""
client = _get_client()
if isinstance(client, dict):
return client
if not identifier:
return {"error": "Contact identifier (email or ID) is required"}
try:
result = client.get_contact(identifier)
if "error" in result:
return result
return {
"success": True,
"id": result.get("id"),
"email": result.get("email"),
"attributes": result.get("attributes", {}),
"list_ids": result.get("listIds", []),
"email_blacklisted": result.get("emailBlacklisted", False),
"sms_blacklisted": result.get("smsBlacklisted", False),
}
except httpx.TimeoutException:
return {"error": "Brevo request timed out"}
except httpx.RequestError as e:
return {"error": f"Network error: {e}"}
@mcp.tool()
def brevo_update_contact(
identifier: str,
attributes: dict[str, Any] | None = None,
list_ids: list[int] | None = None,
unlink_list_ids: list[int] | None = None,
) -> dict[str, Any]:
"""
Update a contact in Brevo.
Args:
identifier: Contact email address or numeric contact ID.
attributes: Attributes to update in UPPERCASE (e.g., {"FNAME": "Jane"}).
list_ids: List IDs to add the contact to. Optional.
unlink_list_ids: List IDs to remove the contact from. Optional.
Returns:
Dict with success status, or error dict on failure.
"""
client = _get_client()
if isinstance(client, dict):
return client
if not identifier:
return {"error": "Contact identifier (email or ID) is required"}
try:
result = client.update_contact(
identifier=identifier,
attributes=attributes,
list_ids=list_ids,
unlink_list_ids=unlink_list_ids,
)
if "error" in result:
return result
return {
"success": True,
"identifier": identifier,
"message": "Contact updated successfully",
}
except httpx.TimeoutException:
return {"error": "Brevo request timed out"}
except httpx.RequestError as e:
return {"error": f"Network error: {e}"}
+118
View File
@@ -0,0 +1,118 @@
"""Tests that enforce credential registry completeness and consistency.
These tests run in CI and catch common mistakes when adding new integrations:
- Missing health checker for a spec with health_check_endpoint
- Orphaned entries in HEALTH_CHECKERS (no corresponding spec)
- CredentialSpec fields that are incomplete
- Duplicate env var conflicts
"""
import pytest
from aden_tools.credentials import CREDENTIAL_SPECS
from aden_tools.credentials.health_check import HEALTH_CHECKERS, validate_integration_wiring
class TestRegistryCompleteness:
"""Every credential with a health_check_endpoint must have a registered checker."""
# Credentials that intentionally don't have their own dedicated checker:
# - google_cse: shares google_search checker (same credential_group)
# - razorpay/razorpay_secret: requires HTTP Basic auth with TWO credentials,
# which the single-value health check dispatcher can't support
KNOWN_EXCEPTIONS = {"google_cse", "razorpay", "razorpay_secret"}
def test_specs_with_endpoint_have_checkers(self):
"""Every CredentialSpec with health_check_endpoint has a HEALTH_CHECKERS entry."""
missing = []
for name, spec in CREDENTIAL_SPECS.items():
if name in self.KNOWN_EXCEPTIONS:
continue
if spec.health_check_endpoint and name not in HEALTH_CHECKERS:
missing.append(
f"{name}: has endpoint '{spec.health_check_endpoint}' "
f"but no dedicated health checker"
)
assert not missing, (
f"{len(missing)} credential(s) have health_check_endpoint but no checker:\n"
+ "\n".join(f" - {m}" for m in missing)
)
def test_checkers_have_corresponding_specs(self):
"""Every key in HEALTH_CHECKERS matches a CREDENTIAL_SPECS entry."""
orphaned = [name for name in HEALTH_CHECKERS if name not in CREDENTIAL_SPECS]
assert not orphaned, (
f"HEALTH_CHECKERS has entries with no CREDENTIAL_SPECS: {orphaned}"
)
class TestSpecRequiredFields:
"""Every CredentialSpec should have minimum required fields."""
@pytest.mark.parametrize(
"cred_name,spec",
list(CREDENTIAL_SPECS.items()),
ids=list(CREDENTIAL_SPECS.keys()),
)
def test_has_env_var(self, cred_name, spec):
assert spec.env_var, f"{cred_name}: missing env_var"
@pytest.mark.parametrize(
"cred_name,spec",
list(CREDENTIAL_SPECS.items()),
ids=list(CREDENTIAL_SPECS.keys()),
)
def test_has_description(self, cred_name, spec):
assert spec.description, f"{cred_name}: missing description"
@pytest.mark.parametrize(
"cred_name,spec",
list(CREDENTIAL_SPECS.items()),
ids=list(CREDENTIAL_SPECS.keys()),
)
def test_has_tools_or_node_types(self, cred_name, spec):
assert spec.tools or spec.node_types, (
f"{cred_name}: must have at least one tool or node_type"
)
class TestNoDuplicateEnvVars:
"""No two credential specs should use the same env_var (unless in same credential_group)."""
def test_no_accidental_env_var_collisions(self):
seen: dict[str, list[str]] = {}
for name, spec in CREDENTIAL_SPECS.items():
seen.setdefault(spec.env_var, []).append(name)
duplicates = {}
for env_var, names in seen.items():
if len(names) <= 1:
continue
# Filter out intentional duplicates (same credential_group)
groups = {CREDENTIAL_SPECS[n].credential_group for n in names}
if len(groups) == 1 and groups != {""}:
continue # All share the same non-empty group -- intentional
duplicates[env_var] = names
assert not duplicates, f"Duplicate env_vars across unrelated credentials: {duplicates}"
class TestIntegrationWiring:
"""validate_integration_wiring() catches wiring issues."""
def test_nonexistent_credential(self):
issues = validate_integration_wiring("nonexistent_service_xyz")
assert any("No CredentialSpec" in i for i in issues)
def test_known_credential_no_critical_issues(self):
"""A well-wired credential (e.g. 'hubspot') should have no issues."""
issues = validate_integration_wiring("hubspot")
assert not issues, f"Unexpected issues for hubspot: {issues}"
@pytest.mark.parametrize("cred_name", list(HEALTH_CHECKERS.keys()))
def test_all_checkers_pass_wiring(self, cred_name):
"""Every registered checker should pass wiring validation."""
issues = validate_integration_wiring(cred_name)
assert not issues, (
f"Wiring issues for '{cred_name}':\n" + "\n".join(f" - {i}" for i in issues)
)
+218
View File
@@ -6,13 +6,23 @@ import httpx
from aden_tools.credentials.health_check import (
HEALTH_CHECKERS,
ApolloHealthChecker,
AnthropicHealthChecker,
BrevoHealthChecker,
CalcomHealthChecker,
DiscordHealthChecker,
ExaSearchHealthChecker,
FinlightHealthChecker,
GitHubHealthChecker,
GoogleCalendarHealthChecker,
GoogleDocsHealthChecker,
GoogleMapsHealthChecker,
GoogleSearchHealthChecker,
NewsdataHealthChecker,
ResendHealthChecker,
SerpApiHealthChecker,
StripeHealthChecker,
TelegramHealthChecker,
check_credential_health,
)
@@ -69,6 +79,16 @@ class TestHealthCheckerRegistry:
"google",
"slack",
"discord",
"stripe",
"exa_search",
"google_docs",
"calcom",
"serpapi",
"apollo",
"telegram",
"newsdata",
"finlight",
"brevo",
}
assert set(HEALTH_CHECKERS.keys()) == expected
@@ -485,3 +505,201 @@ class TestGoogleCalendarHealthCheckerTokenSanitization:
assert not result.valid
assert "Connection refused" in result.message
# ---------------------------------------------------------------------------
# HealthCheckerTestSuite: reusable base class for standard test scenarios
# ---------------------------------------------------------------------------
class HealthCheckerTestSuite:
"""Reusable test mixin that auto-generates standard health check scenarios.
Subclass this and set ``CHECKER_CLASS`` and ``HTTP_METHOD`` to get 6 tests
for free. Add checker-specific tests alongside as needed.
Example::
class TestMyNewChecker(HealthCheckerTestSuite):
CHECKER_CLASS = MyNewHealthChecker
HTTP_METHOD = "get"
"""
CHECKER_CLASS: type | None = None
HTTP_METHOD: str = "get"
CHECKER_KWARGS: dict = {}
# Override these if the checker uses non-standard valid-status logic
EXPECT_200_VALID: bool = True
EXPECT_401_INVALID: bool = True
EXPECT_403_INVALID: bool = True
EXPECT_429_VALID: bool = True
def _make_checker(self):
assert self.CHECKER_CLASS is not None, "Set CHECKER_CLASS in subclass"
return self.CHECKER_CLASS(**self.CHECKER_KWARGS)
def _mock_response(self, status_code, json_data=None):
response = MagicMock(spec=httpx.Response)
response.status_code = status_code
if json_data:
response.json.return_value = json_data
else:
response.json.return_value = {}
return response
def _setup_mock(self, mock_client_cls, status_code=200, json_data=None):
mock_client = MagicMock()
mock_client_cls.return_value.__enter__ = MagicMock(return_value=mock_client)
mock_client_cls.return_value.__exit__ = MagicMock(return_value=False)
http_method = getattr(mock_client, self.HTTP_METHOD)
http_method.return_value = self._mock_response(status_code, json_data)
return mock_client, http_method
@patch("aden_tools.credentials.health_check.httpx.Client")
def test_valid_credential_200(self, mock_client_cls):
"""200 response means valid credential."""
if not self.EXPECT_200_VALID:
return
self._setup_mock(mock_client_cls, 200)
result = self._make_checker().check("test-credential")
assert result.valid is True
@patch("aden_tools.credentials.health_check.httpx.Client")
def test_invalid_credential_401(self, mock_client_cls):
"""401 response means invalid credential."""
if not self.EXPECT_401_INVALID:
return
self._setup_mock(mock_client_cls, 401)
result = self._make_checker().check("bad-credential")
assert result.valid is False
assert result.details.get("status_code") == 401
@patch("aden_tools.credentials.health_check.httpx.Client")
def test_forbidden_403(self, mock_client_cls):
"""403 response means insufficient permissions."""
if not self.EXPECT_403_INVALID:
return
self._setup_mock(mock_client_cls, 403)
result = self._make_checker().check("test-credential")
assert result.valid is False
assert result.details.get("status_code") == 403
@patch("aden_tools.credentials.health_check.httpx.Client")
def test_rate_limited_429(self, mock_client_cls):
"""429 (rate limited) typically means the credential is valid."""
if not self.EXPECT_429_VALID:
return
self._setup_mock(mock_client_cls, 429)
result = self._make_checker().check("test-credential")
assert result.valid is True
@patch("aden_tools.credentials.health_check.httpx.Client")
def test_timeout(self, mock_client_cls):
"""Timeout is handled gracefully."""
mock_client = MagicMock()
mock_client_cls.return_value.__enter__ = MagicMock(return_value=mock_client)
mock_client_cls.return_value.__exit__ = MagicMock(return_value=False)
getattr(mock_client, self.HTTP_METHOD).side_effect = httpx.TimeoutException(
"timed out"
)
result = self._make_checker().check("test-credential")
assert result.valid is False
assert result.details.get("error") == "timeout"
@patch("aden_tools.credentials.health_check.httpx.Client")
def test_network_error(self, mock_client_cls):
"""Network errors are handled gracefully."""
mock_client = MagicMock()
mock_client_cls.return_value.__enter__ = MagicMock(return_value=mock_client)
mock_client_cls.return_value.__exit__ = MagicMock(return_value=False)
getattr(mock_client, self.HTTP_METHOD).side_effect = httpx.RequestError(
"connection refused"
)
result = self._make_checker().check("test-credential")
assert result.valid is False
assert "error" in result.details
# ---------------------------------------------------------------------------
# Tests for new checkers (using HealthCheckerTestSuite)
# ---------------------------------------------------------------------------
class TestStripeHealthChecker(HealthCheckerTestSuite):
CHECKER_CLASS = StripeHealthChecker
HTTP_METHOD = "get"
class TestExaSearchHealthChecker(HealthCheckerTestSuite):
CHECKER_CLASS = ExaSearchHealthChecker
HTTP_METHOD = "post"
class TestGoogleDocsHealthChecker(HealthCheckerTestSuite):
CHECKER_CLASS = GoogleDocsHealthChecker
HTTP_METHOD = "get"
# OAuthBearerHealthChecker doesn't treat 429 as valid
EXPECT_429_VALID = False
class TestCalcomHealthChecker(HealthCheckerTestSuite):
CHECKER_CLASS = CalcomHealthChecker
HTTP_METHOD = "get"
class TestSerpApiHealthChecker(HealthCheckerTestSuite):
CHECKER_CLASS = SerpApiHealthChecker
HTTP_METHOD = "get"
class TestApolloHealthChecker(HealthCheckerTestSuite):
CHECKER_CLASS = ApolloHealthChecker
HTTP_METHOD = "get"
class TestTelegramHealthChecker(HealthCheckerTestSuite):
CHECKER_CLASS = TelegramHealthChecker
HTTP_METHOD = "get"
# Telegram returns 200 with {"ok": true/false} rather than using HTTP status codes
EXPECT_429_VALID = False
@patch("aden_tools.credentials.health_check.httpx.Client")
def test_valid_credential_200(self, mock_client_cls):
"""200 with ok=true means valid bot token."""
self._setup_mock(
mock_client_cls,
200,
{"ok": True, "result": {"username": "testbot"}},
)
result = self._make_checker().check("123:ABC")
assert result.valid is True
assert "testbot" in result.message
@patch("aden_tools.credentials.health_check.httpx.Client")
def test_ok_false_invalid(self, mock_client_cls):
"""200 with ok=false means invalid bot token."""
self._setup_mock(
mock_client_cls,
200,
{"ok": False, "description": "Unauthorized"},
)
result = self._make_checker().check("bad-token")
assert result.valid is False
class TestNewsdataHealthChecker(HealthCheckerTestSuite):
CHECKER_CLASS = NewsdataHealthChecker
HTTP_METHOD = "get"
class TestFinlightHealthChecker(HealthCheckerTestSuite):
CHECKER_CLASS = FinlightHealthChecker
HTTP_METHOD = "get"
class TestBrevoHealthChecker(HealthCheckerTestSuite):
CHECKER_CLASS = BrevoHealthChecker
HTTP_METHOD = "get"