From 6d0a3b952a6c830b228416ee718af48575bbaadc Mon Sep 17 00:00:00 2001 From: Amit Kumar Date: Sat, 7 Feb 2026 19:27:13 +0530 Subject: [PATCH] feat(tools): add Apollo.io contact and company data enrichment integration (#3167) Add Apollo.io MCP tool integration for B2B contact and company data enrichment. Implements 4 MCP tools: - apollo_enrich_person: Enrich contact by email, LinkedIn URL, or name+domain - apollo_enrich_company: Enrich company by domain - apollo_search_people: Search contacts with filters (titles, seniorities, etc.) - apollo_search_companies: Search companies with filters (industries, size, etc.) Features: - Authentication via X-Api-Key header (APOLLO_API_KEY env var) - Credential spec in dedicated apollo.py (follows repo pattern) - Comprehensive error handling (401, 403, 404, 422, 429) - Full test coverage (36 tests) Closes #3061 --- tools/src/aden_tools/credentials/__init__.py | 4 + tools/src/aden_tools/credentials/apollo.py | 43 ++ tools/src/aden_tools/tools/__init__.py | 6 + .../aden_tools/tools/apollo_tool/README.md | 42 ++ .../aden_tools/tools/apollo_tool/__init__.py | 13 + .../tools/apollo_tool/apollo_tool.py | 581 +++++++++++++++ tools/tests/tools/test_apollo_tool.py | 675 ++++++++++++++++++ 7 files changed, 1364 insertions(+) create mode 100644 tools/src/aden_tools/credentials/apollo.py create mode 100644 tools/src/aden_tools/tools/apollo_tool/README.md create mode 100644 tools/src/aden_tools/tools/apollo_tool/__init__.py create mode 100644 tools/src/aden_tools/tools/apollo_tool/apollo_tool.py create mode 100644 tools/tests/tools/test_apollo_tool.py diff --git a/tools/src/aden_tools/credentials/__init__.py b/tools/src/aden_tools/credentials/__init__.py index 6538256f..e10cbee0 100644 --- a/tools/src/aden_tools/credentials/__init__.py +++ b/tools/src/aden_tools/credentials/__init__.py @@ -36,6 +36,7 @@ Credential categories: - llm.py: LLM provider credentials (anthropic, openai, etc.) - search.py: Search tool credentials (brave_search, google_search, etc.) - email.py: Email provider credentials (resend, google/gmail) +- apollo.py: Apollo.io API credentials - github.py: GitHub API credentials - hubspot.py: HubSpot CRM credentials - slack.py: Slack workspace credentials @@ -49,6 +50,7 @@ To add a new credential: 3. If new category, import and merge it in this __init__.py """ +from .apollo import APOLLO_CREDENTIALS from .base import CredentialError, CredentialSpec from .browser import get_aden_auth_url, get_aden_setup_url, open_browser from .email import EMAIL_CREDENTIALS @@ -71,6 +73,7 @@ CREDENTIAL_SPECS = { **LLM_CREDENTIALS, **SEARCH_CREDENTIALS, **EMAIL_CREDENTIALS, + **APOLLO_CREDENTIALS, **GITHUB_CREDENTIALS, **HUBSPOT_CREDENTIALS, **SLACK_CREDENTIALS, @@ -104,4 +107,5 @@ __all__ = [ "GITHUB_CREDENTIALS", "HUBSPOT_CREDENTIALS", "SLACK_CREDENTIALS", + "APOLLO_CREDENTIALS", ] diff --git a/tools/src/aden_tools/credentials/apollo.py b/tools/src/aden_tools/credentials/apollo.py new file mode 100644 index 00000000..8b91288e --- /dev/null +++ b/tools/src/aden_tools/credentials/apollo.py @@ -0,0 +1,43 @@ +""" +Apollo.io tool credentials. + +Contains credentials for Apollo.io API integration. +""" + +from .base import CredentialSpec + +APOLLO_CREDENTIALS = { + "apollo": CredentialSpec( + env_var="APOLLO_API_KEY", + tools=[ + "apollo_enrich_person", + "apollo_enrich_company", + "apollo_search_people", + "apollo_search_companies", + ], + required=True, + startup_required=False, + help_url="https://apolloio.github.io/apollo-api-docs/", + description="Apollo.io API key for contact and company data enrichment", + # Auth method support + aden_supported=False, + direct_api_key_supported=True, + api_key_instructions="""To get an Apollo.io API key: +1. Sign up or log in at https://app.apollo.io/ +2. Go to Settings > Integrations > API +3. Click "Connect" to generate your API key +4. Copy the API key + +Note: Apollo uses export credits for enrichment: +- Free plan: 10 credits/month +- Basic ($49/user/mo): 1,000 credits/month +- Professional ($79/user/mo): 2,000 credits/month +- Overage: $0.20/credit""", + # Health check configuration + health_check_endpoint="https://api.apollo.io/v1/auth/health", + health_check_method="GET", + # Credential store mapping + credential_id="apollo", + credential_key="api_key", + ), +} diff --git a/tools/src/aden_tools/tools/__init__.py b/tools/src/aden_tools/tools/__init__.py index 45f69eb4..36d3495e 100644 --- a/tools/src/aden_tools/tools/__init__.py +++ b/tools/src/aden_tools/tools/__init__.py @@ -21,6 +21,7 @@ if TYPE_CHECKING: from aden_tools.credentials import CredentialStoreAdapter # Import register_tools from each tool module +from .apollo_tool import register_tools as register_apollo from .csv_tool import register_tools as register_csv from .email_tool import register_tools as register_email from .example_tool import register_tools as register_example @@ -76,6 +77,7 @@ def register_all_tools( # email supports multiple providers (Resend) with auto-detection register_email(mcp, credentials=credentials) register_hubspot(mcp, credentials=credentials) + register_apollo(mcp, credentials=credentials) register_slack(mcp, credentials=credentials) # Register file system toolkits @@ -112,6 +114,10 @@ def register_all_tools( "csv_append", "csv_info", "csv_sql", + "apollo_enrich_person", + "apollo_enrich_company", + "apollo_search_people", + "apollo_search_companies", "github_list_repos", "github_get_repo", "github_search_repos", diff --git a/tools/src/aden_tools/tools/apollo_tool/README.md b/tools/src/aden_tools/tools/apollo_tool/README.md new file mode 100644 index 00000000..c419692f --- /dev/null +++ b/tools/src/aden_tools/tools/apollo_tool/README.md @@ -0,0 +1,42 @@ +# Apollo.io Tool + +B2B contact and company data enrichment via the Apollo.io API. + +## Tools + +| Tool | Description | +|------|-------------| +| `apollo_enrich_person` | Enrich a contact by email, LinkedIn URL, or name+domain | +| `apollo_enrich_company` | Enrich a company by domain | +| `apollo_search_people` | Search contacts with filters (titles, seniorities, locations, etc.) | +| `apollo_search_companies` | Search companies with filters (industries, employee counts, etc.) | + +## Authentication + +Requires an Apollo.io API key passed via `APOLLO_API_KEY` environment variable or the credential store. + +**How to get an API key:** + +1. Sign up or log in at https://app.apollo.io/ +2. Go to Settings > Integrations > API +3. Click "Connect" to generate your API key +4. Copy the API key + +## Pricing + +| Plan | Price | Export Credits/month | +|------|-------|---------------------| +| Free | $0 | 10 | +| Basic | $49/user/mo | 1,000 | +| Professional | $79/user/mo | 2,000 | +| Overage | - | $0.20/credit | + +## Error Handling + +Returns error dicts for common failure modes: + +- `401` - Invalid API key +- `403` - Insufficient credits or permissions +- `404` - Resource not found +- `422` - Invalid parameters +- `429` - Rate limit exceeded diff --git a/tools/src/aden_tools/tools/apollo_tool/__init__.py b/tools/src/aden_tools/tools/apollo_tool/__init__.py new file mode 100644 index 00000000..3dc16098 --- /dev/null +++ b/tools/src/aden_tools/tools/apollo_tool/__init__.py @@ -0,0 +1,13 @@ +""" +Apollo.io Tool - Contact and company data enrichment via Apollo API. + +Supports API key authentication for: +- Person enrichment by email or LinkedIn +- Company enrichment by domain +- People search with filters +- Company search with filters +""" + +from .apollo_tool import register_tools + +__all__ = ["register_tools"] diff --git a/tools/src/aden_tools/tools/apollo_tool/apollo_tool.py b/tools/src/aden_tools/tools/apollo_tool/apollo_tool.py new file mode 100644 index 00000000..0d8e37f8 --- /dev/null +++ b/tools/src/aden_tools/tools/apollo_tool/apollo_tool.py @@ -0,0 +1,581 @@ +""" +Apollo.io Tool - Contact and company data enrichment via Apollo API. + +Supports: +- API key authentication (APOLLO_API_KEY) + +Use Cases: +- Enrich contacts by email or LinkedIn URL +- Enrich companies by domain +- Search for people by titles, seniorities, locations +- Search for companies by industries, employee counts, technologies + +API Reference: https://apolloio.github.io/apollo-api-docs/ +""" + +from __future__ import annotations + +import os +from typing import TYPE_CHECKING, Any + +import httpx +from fastmcp import FastMCP + +if TYPE_CHECKING: + from aden_tools.credentials import CredentialStoreAdapter + +APOLLO_API_BASE = "https://api.apollo.io/api/v1" + + +class _ApolloClient: + """Internal client wrapping Apollo.io API calls.""" + + def __init__(self, api_key: str): + self._api_key = api_key + + @property + def _headers(self) -> dict[str, str]: + return { + "Content-Type": "application/json", + "Accept": "application/json", + "Cache-Control": "no-cache", + "X-Api-Key": self._api_key, + } + + def _handle_response(self, response: httpx.Response) -> dict[str, Any]: + """Handle common HTTP error codes.""" + if response.status_code == 401: + return {"error": "Invalid Apollo API key"} + if response.status_code == 403: + return { + "error": "Insufficient credits or permissions. Check your Apollo plan.", + "help": "Apollo uses export credits for enrichment. Visit https://app.apollo.io/#/settings/plans", + } + if response.status_code == 404: + return {"error": "Resource not found"} + if response.status_code == 422: + try: + detail = response.json().get("error", response.text) + except Exception: + detail = response.text + return {"error": f"Invalid parameters: {detail}"} + if response.status_code == 429: + return {"error": "Apollo rate limit exceeded. Try again later."} + if response.status_code >= 400: + try: + detail = response.json().get("error", response.text) + except Exception: + detail = response.text + return {"error": f"Apollo API error (HTTP {response.status_code}): {detail}"} + return response.json() + + def enrich_person( + self, + email: str | None = None, + linkedin_url: str | None = None, + first_name: str | None = None, + last_name: str | None = None, + name: str | None = None, + domain: str | None = None, + reveal_personal_emails: bool = False, + reveal_phone_number: bool = False, + ) -> dict[str, Any]: + """Enrich a person by email, LinkedIn URL, or name and domain.""" + body: dict[str, Any] = { + "reveal_personal_emails": reveal_personal_emails, + "reveal_phone_number": reveal_phone_number, + } + + if email: + body["email"] = email + if linkedin_url: + body["linkedin_url"] = linkedin_url + if first_name: + body["first_name"] = first_name + if last_name: + body["last_name"] = last_name + if name: + body["name"] = name + if domain: + body["domain"] = domain + + response = httpx.post( + f"{APOLLO_API_BASE}/people/match", + headers=self._headers, + params=body if not email and not linkedin_url else None, + json=body, + timeout=30.0, + ) + result = self._handle_response(response) + + # Handle "not found" gracefully + if "error" not in result and result.get("person") is None: + return {"match_found": False, "message": "No matching person found"} + + if "error" not in result: + person = result.get("person", {}) + return { + "match_found": True, + "person": { + "id": person.get("id"), + "first_name": person.get("first_name"), + "last_name": person.get("last_name"), + "name": person.get("name"), + "title": person.get("title"), + "email": person.get("email"), + "email_status": person.get("email_status"), + "phone_numbers": person.get("phone_numbers", []), + "linkedin_url": person.get("linkedin_url"), + "twitter_url": person.get("twitter_url"), + "city": person.get("city"), + "state": person.get("state"), + "country": person.get("country"), + "organization": { + "id": person.get("organization", {}).get("id"), + "name": person.get("organization", {}).get("name"), + "domain": person.get("organization", {}).get("primary_domain"), + "industry": person.get("organization", {}).get("industry"), + "employee_count": person.get("organization", {}).get( + "estimated_num_employees" + ), + }, + }, + } + return result + + def enrich_company(self, domain: str) -> dict[str, Any]: + """Enrich a company by domain.""" + body: dict[str, Any] = { + "domain": domain, + } + + response = httpx.post( + f"{APOLLO_API_BASE}/organizations/enrich", + headers=self._headers, + json=body, + timeout=30.0, + ) + result = self._handle_response(response) + + # Handle "not found" gracefully + if "error" not in result and result.get("organization") is None: + return {"match_found": False, "message": "No matching company found"} + + if "error" not in result: + org = result.get("organization", {}) + return { + "match_found": True, + "organization": { + "id": org.get("id"), + "name": org.get("name"), + "domain": org.get("primary_domain"), + "website_url": org.get("website_url"), + "linkedin_url": org.get("linkedin_url"), + "twitter_url": org.get("twitter_url"), + "facebook_url": org.get("facebook_url"), + "industry": org.get("industry"), + "keywords": org.get("keywords", []), + "employee_count": org.get("estimated_num_employees"), + "employee_count_range": org.get("employee_count_range"), + "annual_revenue": org.get("annual_revenue"), + "annual_revenue_printed": org.get("annual_revenue_printed"), + "total_funding": org.get("total_funding"), + "total_funding_printed": org.get("total_funding_printed"), + "latest_funding_round_date": org.get("latest_funding_round_date"), + "latest_funding_stage": org.get("latest_funding_stage"), + "founded_year": org.get("founded_year"), + "phone": org.get("phone"), + "city": org.get("city"), + "state": org.get("state"), + "country": org.get("country"), + "street_address": org.get("street_address"), + "technologies": org.get("technologies", []), + "short_description": org.get("short_description"), + }, + } + return result + + def search_people( + self, + titles: list[str] | None = None, + seniorities: list[str] | None = None, + locations: list[str] | None = None, + company_sizes: list[str] | None = None, + industries: list[str] | None = None, + technologies: list[str] | None = None, + limit: int = 10, + ) -> dict[str, Any]: + """Search for people with filters.""" + body: dict[str, Any] = { + "per_page": min(limit, 100), + "page": 1, + } + + if titles: + body["person_titles"] = titles + if seniorities: + body["person_seniorities"] = seniorities + if locations: + body["person_locations"] = locations + if company_sizes: + body["organization_num_employees_ranges"] = company_sizes + if industries: + body["organization_industry_tag_ids"] = industries + if technologies: + body["currently_using_any_of_technology_uids"] = technologies + + response = httpx.post( + f"{APOLLO_API_BASE}/mixed_people/search", + headers=self._headers, + json=body, + timeout=30.0, + ) + result = self._handle_response(response) + + if "error" not in result: + people = result.get("people", []) + return { + "total": result.get("pagination", {}).get("total_entries", len(people)), + "page": result.get("pagination", {}).get("page", 1), + "per_page": result.get("pagination", {}).get("per_page", limit), + "results": [ + { + "id": p.get("id"), + "first_name": p.get("first_name"), + "last_name": p.get("last_name"), + "name": p.get("name"), + "title": p.get("title"), + "email": p.get("email"), + "email_status": p.get("email_status"), + "linkedin_url": p.get("linkedin_url"), + "city": p.get("city"), + "state": p.get("state"), + "country": p.get("country"), + "seniority": p.get("seniority"), + "organization": { + "id": p.get("organization", {}).get("id") + if p.get("organization") + else None, + "name": p.get("organization", {}).get("name") + if p.get("organization") + else None, + "domain": p.get("organization", {}).get("primary_domain") + if p.get("organization") + else None, + }, + } + for p in people + ], + } + return result + + def search_companies( + self, + industries: list[str] | None = None, + employee_counts: list[str] | None = None, + locations: list[str] | None = None, + technologies: list[str] | None = None, + limit: int = 10, + ) -> dict[str, Any]: + """Search for companies with filters.""" + body: dict[str, Any] = { + "per_page": min(limit, 100), + "page": 1, + } + + if industries: + body["organization_industry_tag_ids"] = industries + if employee_counts: + body["organization_num_employees_ranges"] = employee_counts + if locations: + body["organization_locations"] = locations + if technologies: + body["currently_using_any_of_technology_uids"] = technologies + + response = httpx.post( + f"{APOLLO_API_BASE}/mixed_companies/search", + headers=self._headers, + json=body, + timeout=30.0, + ) + result = self._handle_response(response) + + if "error" not in result: + orgs = result.get("organizations", []) + return { + "total": result.get("pagination", {}).get("total_entries", len(orgs)), + "page": result.get("pagination", {}).get("page", 1), + "per_page": result.get("pagination", {}).get("per_page", limit), + "results": [ + { + "id": o.get("id"), + "name": o.get("name"), + "domain": o.get("primary_domain"), + "website_url": o.get("website_url"), + "linkedin_url": o.get("linkedin_url"), + "industry": o.get("industry"), + "employee_count": o.get("estimated_num_employees"), + "employee_count_range": o.get("employee_count_range"), + "annual_revenue_printed": o.get("annual_revenue_printed"), + "city": o.get("city"), + "state": o.get("state"), + "country": o.get("country"), + "short_description": o.get("short_description"), + } + for o in orgs + ], + } + return result + + +def register_tools( + mcp: FastMCP, + credentials: CredentialStoreAdapter | None = None, +) -> None: + """Register Apollo.io data enrichment tools with the MCP server.""" + + def _get_api_key() -> str | None: + """Get Apollo API key from credential manager or environment.""" + if credentials is not None: + api_key = credentials.get("apollo") + # Defensive check: ensure we get a string, not a complex object + if api_key is not None and not isinstance(api_key, str): + raise TypeError( + f"Expected string from credentials.get('apollo'), got {type(api_key).__name__}" + ) + return api_key + return os.getenv("APOLLO_API_KEY") + + def _get_client() -> _ApolloClient | dict[str, str]: + """Get an Apollo client, or return an error dict if no credentials.""" + api_key = _get_api_key() + if not api_key: + return { + "error": "Apollo credentials not configured", + "help": ( + "Set APOLLO_API_KEY environment variable " + "or configure via credential store. " + "Get your API key at https://app.apollo.io/#/settings/integrations/api" + ), + } + return _ApolloClient(api_key) + + # --- Person Enrichment --- + + @mcp.tool() + def apollo_enrich_person( + email: str | None = None, + linkedin_url: str | None = None, + first_name: str | None = None, + last_name: str | None = None, + name: str | None = None, + domain: str | None = None, + reveal_personal_emails: bool = False, + reveal_phone_number: bool = False, + ) -> dict: + """ + Enrich a person's information by email, LinkedIn URL, or name and domain. + + Args: + email: Person's email address + linkedin_url: Person's LinkedIn profile URL + first_name: Person's first name (use with last_name and domain) + last_name: Person's last name (use with first_name and domain) + name: Person's full name (use with domain) + domain: Person's company domain (e.g., "acme.com") + reveal_personal_emails: Whether to reveal personal email addresses (default: False) + reveal_phone_number: Whether to reveal phone numbers (default: False) + + Returns: + Dict with person details including: + - Full name, title + - Email and email status + - Phone numbers (if revealed) + - Location (city, state, country) + - LinkedIn/Twitter URLs + - Company info (name, industry, size) + Or error dict if enrichment fails + + Example: + apollo_enrich_person(email="john@acme.com") + apollo_enrich_person(name="John Doe", domain="acme.com") + """ + client = _get_client() + if isinstance(client, dict): + return client + + # Validate that we have enough info to match + has_email_or_linkedin = bool(email or linkedin_url) + has_name_and_domain = bool((first_name and last_name and domain) or (name and domain)) + + if not has_email_or_linkedin and not has_name_and_domain: + return { + "error": ( + "Invalid search criteria. Provide either (email), (linkedin_url), " + "or (name/first_name+last_name AND domain)." + ) + } + try: + return client.enrich_person( + email=email, + linkedin_url=linkedin_url, + first_name=first_name, + last_name=last_name, + name=name, + domain=domain, + reveal_personal_emails=reveal_personal_emails, + reveal_phone_number=reveal_phone_number, + ) + except httpx.TimeoutException: + return {"error": "Request timed out"} + except httpx.RequestError as e: + return {"error": f"Network error: {e}"} + + # --- Company Enrichment --- + + @mcp.tool() + def apollo_enrich_company(domain: str) -> dict: + """ + Enrich a company by domain. + + Args: + domain: Company domain (e.g., "acme.com") + + Returns: + Dict with company firmographics including: + - name, domain, website URL + - Industry, keywords + - Employee count and range + - Annual revenue, funding info + - Founded year, location + - Technologies used + Or error dict if enrichment fails + + Example: + apollo_enrich_company(domain="openai.com") + """ + client = _get_client() + if isinstance(client, dict): + return client + try: + return client.enrich_company(domain) + except httpx.TimeoutException: + return {"error": "Request timed out"} + except httpx.RequestError as e: + return {"error": f"Network error: {e}"} + + # --- People Search --- + + @mcp.tool() + def apollo_search_people( + titles: list[str] | None = None, + seniorities: list[str] | None = None, + locations: list[str] | None = None, + company_sizes: list[str] | None = None, + industries: list[str] | None = None, + technologies: list[str] | None = None, + limit: int = 10, + ) -> dict: + """ + Search for contacts with filters. + + Args: + titles: Job titles to search for + (e.g., ["VP Sales", "Director of Marketing"]) + seniorities: Seniority levels + (e.g., ["vp", "director", "c_suite", "manager", "senior"]) + locations: Geographic locations + (e.g., ["San Francisco, CA", "New York, NY"]) + company_sizes: Company employee count ranges + (e.g., ["1-10", "11-50", "51-200", "201-500", "501-1000", "1001-5000"]) + industries: Industry tags + (e.g., ["technology", "finance", "healthcare"]) + technologies: Technologies used by company + (e.g., ["salesforce", "hubspot", "aws"]) + limit: Maximum results (1-100, default 10) + + Returns: + Dict with: + - total: Total matching results + - results: List of matching contacts with email and company info + Or error dict if search fails + + Example: + apollo_search_people( + titles=["VP Sales", "Head of Sales"], + seniorities=["vp", "director"], + company_sizes=["51-200", "201-500"], + limit=25 + ) + """ + client = _get_client() + if isinstance(client, dict): + return client + try: + return client.search_people( + titles=titles, + seniorities=seniorities, + locations=locations, + company_sizes=company_sizes, + industries=industries, + technologies=technologies, + limit=limit, + ) + except httpx.TimeoutException: + return {"error": "Request timed out"} + except httpx.RequestError as e: + return {"error": f"Network error: {e}"} + + # --- Company Search --- + + @mcp.tool() + def apollo_search_companies( + industries: list[str] | None = None, + employee_counts: list[str] | None = None, + locations: list[str] | None = None, + technologies: list[str] | None = None, + limit: int = 10, + ) -> dict: + """ + Search for companies with filters. + + Args: + industries: Industry tags + (e.g., ["technology", "finance", "healthcare"]) + employee_counts: Employee count ranges + (e.g., ["1-10", "11-50", "51-200", "201-500", "501-1000"]) + locations: Geographic locations + (e.g., ["San Francisco, CA", "United States"]) + technologies: Technologies used + (e.g., ["salesforce", "hubspot", "aws", "kubernetes"]) + limit: Maximum results (1-100, default 10) + + Returns: + Dict with: + - total: Total matching results + - results: List of matching companies with firmographic data + Or error dict if search fails + + Example: + apollo_search_companies( + industries=["technology"], + employee_counts=["51-200", "201-500"], + technologies=["kubernetes"], + limit=20 + ) + """ + client = _get_client() + if isinstance(client, dict): + return client + try: + return client.search_companies( + industries=industries, + employee_counts=employee_counts, + locations=locations, + technologies=technologies, + limit=limit, + ) + except httpx.TimeoutException: + return {"error": "Request timed out"} + except httpx.RequestError as e: + return {"error": f"Network error: {e}"} diff --git a/tools/tests/tools/test_apollo_tool.py b/tools/tests/tools/test_apollo_tool.py new file mode 100644 index 00000000..da6a20fe --- /dev/null +++ b/tools/tests/tools/test_apollo_tool.py @@ -0,0 +1,675 @@ +""" +Tests for Apollo.io data enrichment tool. + +Covers: +- _ApolloClient methods (enrich_person, enrich_company, search_people, search_companies) +- Error handling (401, 403, 404, 422, 429, 500, timeout) +- Credential retrieval (CredentialStoreAdapter vs env var) +- All 4 MCP tool functions +- "Not found" graceful handling +""" + +from __future__ import annotations + +from unittest.mock import MagicMock, patch + +import httpx +import pytest + +from aden_tools.tools.apollo_tool.apollo_tool import ( + APOLLO_API_BASE, + _ApolloClient, + register_tools, +) + +# --- _ApolloClient tests --- + + +class TestApolloClient: + def setup_method(self): + self.client = _ApolloClient("test-api-key") + + def test_headers(self): + headers = self.client._headers + assert headers["Content-Type"] == "application/json" + assert headers["Accept"] == "application/json" + # API key is passed in X-Api-Key header + assert headers["X-Api-Key"] == "test-api-key" + + def test_handle_response_success(self): + response = MagicMock() + response.status_code = 200 + response.json.return_value = {"person": {"id": "123"}} + assert self.client._handle_response(response) == {"person": {"id": "123"}} + + @pytest.mark.parametrize( + "status_code,expected_substring", + [ + (401, "Invalid Apollo API key"), + (403, "Insufficient credits"), + (404, "not found"), + (422, "Invalid parameters"), + (429, "rate limit"), + ], + ) + def test_handle_response_errors(self, status_code, expected_substring): + response = MagicMock() + response.status_code = status_code + response.json.return_value = {"error": "Test error"} + response.text = "Test error" + result = self.client._handle_response(response) + assert "error" in result + assert expected_substring in result["error"] + + def test_handle_response_generic_error(self): + response = MagicMock() + response.status_code = 500 + response.json.return_value = {"error": "Internal Server Error"} + result = self.client._handle_response(response) + assert "error" in result + assert "500" in result["error"] + + @patch("aden_tools.tools.apollo_tool.apollo_tool.httpx.post") + def test_enrich_person_by_email(self, mock_post): + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "person": { + "id": "p123", + "first_name": "John", + "last_name": "Doe", + "name": "John Doe", + "title": "VP Sales", + "email": "john@acme.com", + "email_status": "verified", + "phone_numbers": [{"sanitized_number": "+1234567890"}], + "linkedin_url": "https://linkedin.com/in/johndoe", + "twitter_url": None, + "city": "San Francisco", + "state": "California", + "country": "United States", + "organization": { + "id": "o456", + "name": "Acme Inc", + "primary_domain": "acme.com", + "industry": "Technology", + "estimated_num_employees": 250, + }, + } + } + mock_post.return_value = mock_response + + result = self.client.enrich_person(email="john@acme.com") + + mock_post.assert_called_once_with( + f"{APOLLO_API_BASE}/people/match", + headers=self.client._headers, + params=None, + json={ + "email": "john@acme.com", + "reveal_personal_emails": False, + "reveal_phone_number": False, + }, + timeout=30.0, + ) + assert result["match_found"] is True + assert result["person"]["first_name"] == "John" + assert result["person"]["title"] == "VP Sales" + assert result["person"]["organization"]["name"] == "Acme Inc" + + @patch("aden_tools.tools.apollo_tool.apollo_tool.httpx.post") + def test_enrich_person_by_linkedin(self, mock_post): + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "person": { + "id": "p456", + "first_name": "Jane", + "last_name": "Smith", + "name": "Jane Smith", + "title": "CTO", + "email": "jane@startup.io", + "linkedin_url": "https://linkedin.com/in/janesmith", + "organization": {}, + } + } + mock_post.return_value = mock_response + + result = self.client.enrich_person(linkedin_url="https://linkedin.com/in/janesmith") + + call_json = mock_post.call_args.kwargs["json"] + assert call_json["linkedin_url"] == "https://linkedin.com/in/janesmith" + assert result["match_found"] is True + assert result["person"]["title"] == "CTO" + + @patch("aden_tools.tools.apollo_tool.apollo_tool.httpx.post") + def test_enrich_person_by_name_and_domain(self, mock_post): + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"person": {"id": "p123"}} + mock_post.return_value = mock_response + + self.client.enrich_person(name="John Doe", domain="acme.com") + + call_json = mock_post.call_args.kwargs["json"] + assert call_json["name"] == "John Doe" + assert call_json["domain"] == "acme.com" + + @patch("aden_tools.tools.apollo_tool.apollo_tool.httpx.post") + def test_enrich_person_with_reveal_flags(self, mock_post): + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"person": {"id": "p123"}} + mock_post.return_value = mock_response + + self.client.enrich_person( + email="john@acme.com", + reveal_personal_emails=True, + reveal_phone_number=True, + ) + + call_json = mock_post.call_args.kwargs["json"] + assert call_json["reveal_personal_emails"] is True + assert call_json["reveal_phone_number"] is True + + @patch("aden_tools.tools.apollo_tool.apollo_tool.httpx.post") + def test_enrich_person_with_optional_params(self, mock_post): + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"person": {"id": "p789"}} + mock_post.return_value = mock_response + + self.client.enrich_person( + email="john@acme.com", + first_name="John", + last_name="Doe", + domain="acme.com", + ) + + call_json = mock_post.call_args.kwargs["json"] + assert call_json["email"] == "john@acme.com" + assert call_json["first_name"] == "John" + assert call_json["last_name"] == "Doe" + assert call_json["domain"] == "acme.com" + + @patch("aden_tools.tools.apollo_tool.apollo_tool.httpx.post") + def test_enrich_person_not_found(self, mock_post): + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"person": None} + mock_post.return_value = mock_response + + result = self.client.enrich_person(email="nobody@nowhere.xyz") + + assert result["match_found"] is False + assert "No matching person found" in result["message"] + + @patch("aden_tools.tools.apollo_tool.apollo_tool.httpx.post") + def test_enrich_company(self, mock_post): + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "organization": { + "id": "o123", + "name": "OpenAI", + "primary_domain": "openai.com", + "website_url": "https://openai.com", + "linkedin_url": "https://linkedin.com/company/openai", + "industry": "Artificial Intelligence", + "keywords": ["ai", "machine learning", "gpt"], + "estimated_num_employees": 1500, + "employee_count_range": "1001-5000", + "annual_revenue": 1000000000, + "annual_revenue_printed": "$1B", + "total_funding": 11000000000, + "total_funding_printed": "$11B", + "latest_funding_round_date": "2023-01-23", + "latest_funding_stage": "Series D", + "founded_year": 2015, + "phone": "+1-415-123-4567", + "city": "San Francisco", + "state": "California", + "country": "United States", + "street_address": "123 Mission St", + "technologies": ["python", "kubernetes", "aws"], + "short_description": "AI research and deployment company", + } + } + mock_post.return_value = mock_response + + result = self.client.enrich_company("openai.com") + + mock_post.assert_called_once_with( + f"{APOLLO_API_BASE}/organizations/enrich", + headers=self.client._headers, + json={"domain": "openai.com"}, + timeout=30.0, + ) + assert result["match_found"] is True + assert result["organization"]["name"] == "OpenAI" + assert result["organization"]["industry"] == "Artificial Intelligence" + assert result["organization"]["employee_count"] == 1500 + assert "python" in result["organization"]["technologies"] + + @patch("aden_tools.tools.apollo_tool.apollo_tool.httpx.post") + def test_enrich_company_not_found(self, mock_post): + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"organization": None} + mock_post.return_value = mock_response + + result = self.client.enrich_company("notarealcompany12345.xyz") + + assert result["match_found"] is False + assert "No matching company found" in result["message"] + + @patch("aden_tools.tools.apollo_tool.apollo_tool.httpx.post") + def test_search_people(self, mock_post): + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "pagination": {"total_entries": 150, "page": 1, "per_page": 10}, + "people": [ + { + "id": "p1", + "first_name": "Alice", + "last_name": "Johnson", + "name": "Alice Johnson", + "title": "VP Sales", + "email": "alice@company.com", + "email_status": "verified", + "linkedin_url": "https://linkedin.com/in/alicejohnson", + "city": "New York", + "state": "New York", + "country": "United States", + "seniority": "vp", + "organization": { + "id": "o1", + "name": "Company Inc", + "primary_domain": "company.com", + }, + }, + { + "id": "p2", + "first_name": "Bob", + "last_name": "Smith", + "name": "Bob Smith", + "title": "Director of Sales", + "email": "bob@another.com", + "email_status": "verified", + "linkedin_url": "https://linkedin.com/in/bobsmith", + "city": "Chicago", + "state": "Illinois", + "country": "United States", + "seniority": "director", + "organization": None, + }, + ], + } + mock_post.return_value = mock_response + + result = self.client.search_people( + titles=["VP Sales", "Director of Sales"], + seniorities=["vp", "director"], + company_sizes=["51-200", "201-500"], + limit=10, + ) + + mock_post.assert_called_once() + call_json = mock_post.call_args.kwargs["json"] + assert call_json["person_titles"] == ["VP Sales", "Director of Sales"] + assert call_json["person_seniorities"] == ["vp", "director"] + assert call_json["organization_num_employees_ranges"] == ["51-200", "201-500"] + assert call_json["per_page"] == 10 + + assert result["total"] == 150 + assert len(result["results"]) == 2 + assert result["results"][0]["title"] == "VP Sales" + assert result["results"][0]["organization"]["name"] == "Company Inc" + # Bob has no organization + assert result["results"][1]["organization"]["name"] is None + + @patch("aden_tools.tools.apollo_tool.apollo_tool.httpx.post") + def test_search_people_limit_capped(self, mock_post): + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"pagination": {}, "people": []} + mock_post.return_value = mock_response + + self.client.search_people(limit=200) + + call_json = mock_post.call_args.kwargs["json"] + assert call_json["per_page"] == 100 + + @patch("aden_tools.tools.apollo_tool.apollo_tool.httpx.post") + def test_search_companies(self, mock_post): + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "pagination": {"total_entries": 50, "page": 1, "per_page": 10}, + "organizations": [ + { + "id": "o1", + "name": "Tech Startup", + "primary_domain": "techstartup.io", + "website_url": "https://techstartup.io", + "linkedin_url": "https://linkedin.com/company/techstartup", + "industry": "Technology", + "estimated_num_employees": 75, + "employee_count_range": "51-200", + "annual_revenue_printed": "$10M", + "city": "Austin", + "state": "Texas", + "country": "United States", + "short_description": "A tech startup", + }, + ], + } + mock_post.return_value = mock_response + + result = self.client.search_companies( + industries=["technology"], + employee_counts=["51-200"], + technologies=["kubernetes"], + limit=10, + ) + + mock_post.assert_called_once() + call_json = mock_post.call_args.kwargs["json"] + assert call_json["organization_industry_tag_ids"] == ["technology"] + assert call_json["organization_num_employees_ranges"] == ["51-200"] + assert call_json["currently_using_any_of_technology_uids"] == ["kubernetes"] + + assert result["total"] == 50 + assert len(result["results"]) == 1 + assert result["results"][0]["name"] == "Tech Startup" + assert result["results"][0]["industry"] == "Technology" + + +# --- MCP tool registration and credential tests --- + + +class TestToolRegistration: + def test_register_tools_registers_all_tools(self): + mcp = MagicMock() + mcp.tool.return_value = lambda fn: fn + register_tools(mcp) + assert mcp.tool.call_count == 4 + + def test_no_credentials_returns_error(self): + mcp = MagicMock() + registered_fns = [] + mcp.tool.return_value = lambda fn: registered_fns.append(fn) or fn + + with patch.dict("os.environ", {}, clear=True): + register_tools(mcp, credentials=None) + + enrich_fn = next(fn for fn in registered_fns if fn.__name__ == "apollo_enrich_person") + result = enrich_fn(email="test@test.com") + assert "error" in result + assert "not configured" in result["error"] + + def test_credentials_from_credential_manager(self): + mcp = MagicMock() + registered_fns = [] + mcp.tool.return_value = lambda fn: registered_fns.append(fn) or fn + + cred_manager = MagicMock() + cred_manager.get.return_value = "test-api-key" + + register_tools(mcp, credentials=cred_manager) + + enrich_fn = next(fn for fn in registered_fns if fn.__name__ == "apollo_enrich_company") + + with patch("aden_tools.tools.apollo_tool.apollo_tool.httpx.post") as mock_post: + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"organization": {"id": "123", "name": "Test"}} + mock_post.return_value = mock_response + + result = enrich_fn(domain="test.com") + + cred_manager.get.assert_called_with("apollo") + assert result["match_found"] is True + + def test_credentials_from_env_var(self): + mcp = MagicMock() + registered_fns = [] + mcp.tool.return_value = lambda fn: registered_fns.append(fn) or fn + + register_tools(mcp, credentials=None) + + enrich_fn = next(fn for fn in registered_fns if fn.__name__ == "apollo_enrich_company") + + with ( + patch.dict("os.environ", {"APOLLO_API_KEY": "env-api-key"}), + patch("aden_tools.tools.apollo_tool.apollo_tool.httpx.post") as mock_post, + ): + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"organization": {"id": "123", "name": "Test"}} + mock_post.return_value = mock_response + + result = enrich_fn(domain="test.com") + + assert result["match_found"] is True + # Verify API key was used in X-Api-Key header + call_headers = mock_post.call_args.kwargs["headers"] + assert call_headers["X-Api-Key"] == "env-api-key" + + +# --- Individual tool function tests --- + + +class TestEnrichPersonTool: + def setup_method(self): + self.mcp = MagicMock() + self.fns = [] + self.mcp.tool.return_value = lambda fn: self.fns.append(fn) or fn + cred = MagicMock() + cred.get.return_value = "test-key" + register_tools(self.mcp, credentials=cred) + + def _fn(self, name): + return next(f for f in self.fns if f.__name__ == name) + + def test_enrich_person_requires_email_or_linkedin(self): + result = self._fn("apollo_enrich_person")() + assert "error" in result + assert "Invalid search criteria" in result["error"] + + @patch("aden_tools.tools.apollo_tool.apollo_tool.httpx.post") + def test_enrich_person_success(self, mock_post): + mock_post.return_value = MagicMock( + status_code=200, + json=MagicMock( + return_value={ + "person": { + "id": "p1", + "first_name": "John", + "last_name": "Doe", + "title": "CEO", + "organization": {}, + } + } + ), + ) + result = self._fn("apollo_enrich_person")(email="john@acme.com") + assert result["match_found"] is True + assert result["person"]["title"] == "CEO" + + @patch("aden_tools.tools.apollo_tool.apollo_tool.httpx.post") + def test_enrich_person_timeout(self, mock_post): + mock_post.side_effect = httpx.TimeoutException("timed out") + result = self._fn("apollo_enrich_person")(email="test@test.com") + assert "error" in result + assert "timed out" in result["error"] + + @patch("aden_tools.tools.apollo_tool.apollo_tool.httpx.post") + def test_enrich_person_network_error(self, mock_post): + mock_post.side_effect = httpx.RequestError("connection failed") + result = self._fn("apollo_enrich_person")(email="test@test.com") + assert "error" in result + assert "Network error" in result["error"] + + +class TestEnrichCompanyTool: + def setup_method(self): + self.mcp = MagicMock() + self.fns = [] + self.mcp.tool.return_value = lambda fn: self.fns.append(fn) or fn + cred = MagicMock() + cred.get.return_value = "test-key" + register_tools(self.mcp, credentials=cred) + + def _fn(self, name): + return next(f for f in self.fns if f.__name__ == name) + + @patch("aden_tools.tools.apollo_tool.apollo_tool.httpx.post") + def test_enrich_company_success(self, mock_post): + mock_post.return_value = MagicMock( + status_code=200, + json=MagicMock( + return_value={ + "organization": { + "id": "o1", + "name": "Acme Inc", + "industry": "Technology", + "estimated_num_employees": 500, + } + } + ), + ) + result = self._fn("apollo_enrich_company")(domain="acme.com") + assert result["match_found"] is True + assert result["organization"]["name"] == "Acme Inc" + + @patch("aden_tools.tools.apollo_tool.apollo_tool.httpx.post") + def test_enrich_company_not_found(self, mock_post): + mock_post.return_value = MagicMock( + status_code=200, json=MagicMock(return_value={"organization": None}) + ) + result = self._fn("apollo_enrich_company")(domain="notreal.xyz") + assert result["match_found"] is False + + +class TestSearchPeopleTool: + def setup_method(self): + self.mcp = MagicMock() + self.fns = [] + self.mcp.tool.return_value = lambda fn: self.fns.append(fn) or fn + cred = MagicMock() + cred.get.return_value = "test-key" + register_tools(self.mcp, credentials=cred) + + def _fn(self, name): + return next(f for f in self.fns if f.__name__ == name) + + @patch("aden_tools.tools.apollo_tool.apollo_tool.httpx.post") + def test_search_people_success(self, mock_post): + mock_post.return_value = MagicMock( + status_code=200, + json=MagicMock( + return_value={ + "pagination": {"total_entries": 100}, + "people": [{"id": "p1", "name": "Alice", "title": "VP Sales"}], + } + ), + ) + result = self._fn("apollo_search_people")(titles=["VP Sales"]) + assert result["total"] == 100 + assert len(result["results"]) == 1 + + @patch("aden_tools.tools.apollo_tool.apollo_tool.httpx.post") + def test_search_people_with_all_filters(self, mock_post): + mock_post.return_value = MagicMock( + status_code=200, json=MagicMock(return_value={"pagination": {}, "people": []}) + ) + self._fn("apollo_search_people")( + titles=["CEO"], + seniorities=["c_suite"], + locations=["San Francisco"], + company_sizes=["51-200"], + industries=["technology"], + technologies=["salesforce"], + limit=25, + ) + call_json = mock_post.call_args.kwargs["json"] + assert call_json["person_titles"] == ["CEO"] + assert call_json["person_seniorities"] == ["c_suite"] + assert call_json["person_locations"] == ["San Francisco"] + assert call_json["organization_num_employees_ranges"] == ["51-200"] + + +class TestSearchCompaniesTool: + def setup_method(self): + self.mcp = MagicMock() + self.fns = [] + self.mcp.tool.return_value = lambda fn: self.fns.append(fn) or fn + cred = MagicMock() + cred.get.return_value = "test-key" + register_tools(self.mcp, credentials=cred) + + def _fn(self, name): + return next(f for f in self.fns if f.__name__ == name) + + @patch("aden_tools.tools.apollo_tool.apollo_tool.httpx.post") + def test_search_companies_success(self, mock_post): + mock_post.return_value = MagicMock( + status_code=200, + json=MagicMock( + return_value={ + "pagination": {"total_entries": 50}, + "organizations": [{"id": "o1", "name": "Tech Corp", "industry": "Technology"}], + } + ), + ) + result = self._fn("apollo_search_companies")(industries=["technology"]) + assert result["total"] == 50 + assert len(result["results"]) == 1 + assert result["results"][0]["industry"] == "Technology" + + @patch("aden_tools.tools.apollo_tool.apollo_tool.httpx.post") + def test_search_companies_with_all_filters(self, mock_post): + mock_post.return_value = MagicMock( + status_code=200, json=MagicMock(return_value={"pagination": {}, "organizations": []}) + ) + self._fn("apollo_search_companies")( + industries=["finance"], + employee_counts=["201-500"], + locations=["New York"], + technologies=["aws"], + limit=15, + ) + call_json = mock_post.call_args.kwargs["json"] + assert call_json["organization_industry_tag_ids"] == ["finance"] + assert call_json["organization_num_employees_ranges"] == ["201-500"] + assert call_json["organization_locations"] == ["New York"] + assert call_json["currently_using_any_of_technology_uids"] == ["aws"] + assert call_json["per_page"] == 15 + + +# --- Credential spec tests --- + + +class TestCredentialSpec: + def test_apollo_credential_spec_exists(self): + from aden_tools.credentials import CREDENTIAL_SPECS + + assert "apollo" in CREDENTIAL_SPECS + + def test_apollo_spec_env_var(self): + from aden_tools.credentials import CREDENTIAL_SPECS + + spec = CREDENTIAL_SPECS["apollo"] + assert spec.env_var == "APOLLO_API_KEY" + + def test_apollo_spec_tools(self): + from aden_tools.credentials import CREDENTIAL_SPECS + + spec = CREDENTIAL_SPECS["apollo"] + assert "apollo_enrich_person" in spec.tools + assert "apollo_enrich_company" in spec.tools + assert "apollo_search_people" in spec.tools + assert "apollo_search_companies" in spec.tools + assert len(spec.tools) == 4