Compare commits

...

1 Commits

Author SHA1 Message Date
Richard T 1e161ca006 feat: testing composio tools 2026-01-25 16:35:44 -08:00
9 changed files with 876 additions and 2 deletions
+5 -1
View File
@@ -24,7 +24,11 @@
"Bash(done)",
"Bash(xargs cat:*)",
"mcp__agent-builder__list_mcp_tools",
"mcp__agent-builder__add_mcp_server"
"mcp__agent-builder__add_mcp_server",
"Bash(python3:*)",
"Bash(PYTHONPATH=core:tools/src:exports pytest:*)",
"Bash(source .env)",
"Bash(PYTHONPATH=core:tools/src:exports python:*)"
]
}
}
+1
View File
@@ -0,0 +1 @@
{}
+1
View File
@@ -28,6 +28,7 @@ dependencies = [
"fastmcp>=2.0.0",
"diff-match-patch>=20230430",
"python-dotenv>=1.0.0",
"composio-core>=0.5.0",
]
[project.optional-dependencies]
@@ -33,11 +33,13 @@ To add a new credential:
from .base import CredentialError, CredentialManager, CredentialSpec
from .llm import LLM_CREDENTIALS
from .search import SEARCH_CREDENTIALS
from .composio import COMPOSIO_CREDENTIALS
# Merged registry of all credentials
CREDENTIAL_SPECS = {
**LLM_CREDENTIALS,
**SEARCH_CREDENTIALS,
**COMPOSIO_CREDENTIALS,
}
__all__ = [
@@ -50,4 +52,5 @@ __all__ = [
# Category registries (for direct access if needed)
"LLM_CREDENTIALS",
"SEARCH_CREDENTIALS",
"COMPOSIO_CREDENTIALS",
]
@@ -0,0 +1,27 @@
"""
Composio credentials for third-party integrations.
Contains credentials for Composio-powered tools like LinkedIn, Gmail, etc.
"""
from .base import CredentialSpec
COMPOSIO_CREDENTIALS = {
"composio": CredentialSpec(
env_var="COMPOSIO_API_KEY",
tools=[
"linkedin_create_post",
"linkedin_get_profile",
"linkedin_search_people",
"linkedin_send_message",
"gmail_send_email",
"gmail_read_emails",
"gmail_search_emails",
"gmail_create_draft",
],
node_types=[],
required=True,
startup_required=False,
help_url="https://app.composio.dev/settings",
description="API key for Composio (enables LinkedIn, Gmail, and other integrations)",
),
}
+11 -1
View File
@@ -22,6 +22,7 @@ from .example_tool import register_tools as register_example
from .web_search_tool import register_tools as register_web_search
from .web_scrape_tool import register_tools as register_web_scrape
from .pdf_read_tool import register_tools as register_pdf_read
from .composio_tools import register_tools as register_composio
# Import file system toolkits
from .file_system_toolkits.view_file import register_tools as register_view_file
@@ -51,12 +52,12 @@ def register_all_tools(
"""
# Tools that don't need credentials
register_example(mcp)
register_web_search(mcp)
register_web_scrape(mcp)
register_pdf_read(mcp)
# Tools that need credentials (pass credentials if provided)
register_web_search(mcp, credentials=credentials)
register_composio(mcp, credentials=credentials)
# Register file system toolkits
register_view_file(mcp)
@@ -81,6 +82,15 @@ def register_all_tools(
"apply_patch",
"grep_search",
"execute_command_tool",
# Composio tools (LinkedIn + Gmail)
"linkedin_create_post",
"linkedin_get_profile",
"linkedin_search_people",
"linkedin_send_message",
"gmail_send_email",
"gmail_read_emails",
"gmail_search_emails",
"gmail_create_draft",
]
@@ -0,0 +1,51 @@
"""
Composio Tools - Third-party integrations via Composio.
Provides LinkedIn and Gmail tools powered by Composio.
Requires COMPOSIO_API_KEY environment variable.
"""
from __future__ import annotations
from typing import TYPE_CHECKING, List, Optional
from fastmcp import FastMCP
from .linkedin_tools import register_tools as register_linkedin_tools
from .gmail_tools import register_tools as register_gmail_tools
if TYPE_CHECKING:
from aden_tools.credentials import CredentialManager
def register_tools(
mcp: FastMCP,
credentials: Optional["CredentialManager"] = None,
) -> List[str]:
"""
Register all Composio tools with the MCP server.
Args:
mcp: FastMCP server instance
credentials: Optional CredentialManager for credential access
Returns:
List of registered tool names
"""
register_linkedin_tools(mcp, credentials=credentials)
register_gmail_tools(mcp, credentials=credentials)
return [
# LinkedIn tools
"linkedin_create_post",
"linkedin_get_profile",
"linkedin_search_people",
"linkedin_send_message",
# Gmail tools
"gmail_send_email",
"gmail_read_emails",
"gmail_search_emails",
"gmail_create_draft",
]
__all__ = ["register_tools"]
@@ -0,0 +1,393 @@
"""
Gmail Tools - Gmail integration via Composio.
Provides tools for Gmail operations like sending, reading, and searching emails.
Requires COMPOSIO_API_KEY environment variable.
"""
from __future__ import annotations
import os
from typing import TYPE_CHECKING, Any, Optional, Tuple
from fastmcp import FastMCP
if TYPE_CHECKING:
from aden_tools.credentials import CredentialManager
# App name for Composio
GMAIL_APP_NAME = "GMAIL"
def _get_composio_client(credentials: Optional["CredentialManager"] = None):
"""Get Composio client with API key from credentials or environment."""
try:
from composio import ComposioToolSet
except ImportError:
return None, "Composio SDK not installed. Run: pip install composio-core"
if credentials is not None:
api_key = credentials.get("composio")
else:
api_key = os.getenv("COMPOSIO_API_KEY")
if not api_key:
return None, "COMPOSIO_API_KEY not set. Get one at https://app.composio.dev/settings"
return ComposioToolSet(api_key=api_key), None
def _check_oauth_connection(
client: Any,
app_name: str = GMAIL_APP_NAME,
entity_id: str = "default",
) -> Tuple[bool, Optional[str], Optional[str]]:
"""
Check if OAuth connection exists for the given app.
Args:
client: ComposioToolSet instance
app_name: The app to check connection for (e.g., "GMAIL")
entity_id: The entity ID (defaults to "default")
Returns:
Tuple of (is_connected, oauth_url, error_message)
- If connected: (True, None, None)
- If not connected: (False, oauth_url, None)
- If error: (False, None, error_message)
"""
try:
# Get the entity
entity = client.get_entity(id=entity_id)
# Check for existing connection using correct API method
try:
# Use get_connection(app=...) - the correct Composio API
connection = entity.get_connection(app=app_name)
if connection:
status = getattr(connection, "status", None)
if status == "ACTIVE" or status is None:
return True, None, None
except Exception:
# No connection found, need to initiate OAuth
pass
# No active connection, initiate OAuth
try:
connection_request = entity.initiate_connection(
app_name=app_name,
redirect_url="https://app.composio.dev/connections",
)
oauth_url = getattr(connection_request, "redirectUrl", None)
if oauth_url:
return False, oauth_url, None
else:
# Fallback to Composio dashboard
return False, f"https://app.composio.dev/app/{app_name.lower()}", None
except Exception as e:
# If initiate_connection fails, provide dashboard link
return False, f"https://app.composio.dev/app/{app_name.lower()}", None
except Exception as e:
return False, None, f"Failed to check OAuth connection: {str(e)}"
def _ensure_oauth_connection(
credentials: Optional["CredentialManager"] = None,
app_name: str = GMAIL_APP_NAME,
) -> Tuple[Any, Optional[dict]]:
"""
Ensure OAuth connection exists, return client or OAuth required response.
Args:
credentials: Optional CredentialManager
app_name: The app to check connection for
Returns:
Tuple of (client, error_response)
- If connected: (client, None)
- If OAuth needed: (None, {"error": ..., "oauth_required": True, "oauth_url": ...})
- If other error: (None, {"error": ...})
"""
client, error = _get_composio_client(credentials)
if error:
return None, {"error": error}
is_connected, oauth_url, check_error = _check_oauth_connection(client, app_name)
if check_error:
return None, {"error": check_error}
if not is_connected:
return None, {
"error": f"{app_name} OAuth connection required. Please authorize access.",
"oauth_required": True,
"oauth_url": oauth_url,
"message": f"Please visit the following URL to connect your {app_name} account: {oauth_url}",
}
return client, None
def register_tools(
mcp: FastMCP,
credentials: Optional["CredentialManager"] = None,
) -> None:
"""Register Gmail tools with the MCP server."""
@mcp.tool()
def gmail_send_email(
to: str,
subject: str,
body: str,
cc: str = "",
bcc: str = "",
) -> dict:
"""
Send an email via Gmail.
Use this to compose and send an email from the user's Gmail account.
Args:
to: Recipient email address(es), comma-separated for multiple
subject: Email subject line
body: Email body content (plain text or HTML)
cc: CC recipients, comma-separated (optional)
bcc: BCC recipients, comma-separated (optional)
Returns:
Dict with message ID and status, or error dict
"""
if not to:
return {"error": "Recipient email address is required"}
if not subject:
return {"error": "Email subject is required"}
if not body:
return {"error": "Email body is required"}
client, error_response = _ensure_oauth_connection(credentials, GMAIL_APP_NAME)
if error_response:
return error_response
try:
from composio import Action
params = {
"to": to,
"subject": subject,
"body": body,
}
if cc:
params["cc"] = cc
if bcc:
params["bcc"] = bcc
result = client.execute_action(
action=Action.GMAIL_SEND_EMAIL,
params=params,
)
if result.get("successful"):
return {
"success": True,
"message_id": result.get("data", {}).get("id"),
"thread_id": result.get("data", {}).get("threadId"),
"message": "Email sent successfully",
}
else:
return {"error": result.get("error", "Failed to send email")}
except Exception as e:
return {"error": f"Gmail send failed: {str(e)}"}
@mcp.tool()
def gmail_read_emails(
max_results: int = 10,
label: str = "INBOX",
unread_only: bool = False,
) -> dict:
"""
Read emails from Gmail.
Use this to fetch emails from the user's Gmail account.
Args:
max_results: Maximum number of emails to return (1-100)
label: Gmail label to read from (INBOX, SENT, DRAFTS, etc.)
unread_only: If True, only return unread emails
Returns:
Dict with list of emails, or error dict
"""
max_results = max(1, min(100, max_results))
client, error_response = _ensure_oauth_connection(credentials, GMAIL_APP_NAME)
if error_response:
return error_response
try:
from composio import Action
params = {
"max_results": max_results,
"label_ids": [label],
}
if unread_only:
params["query"] = "is:unread"
result = client.execute_action(
action=Action.GMAIL_FETCH_EMAILS,
params=params,
)
if result.get("successful"):
messages = result.get("data", {}).get("messages", [])
return {
"success": True,
"emails": [
{
"id": msg.get("id"),
"thread_id": msg.get("threadId"),
"from": msg.get("from"),
"to": msg.get("to"),
"subject": msg.get("subject"),
"snippet": msg.get("snippet"),
"date": msg.get("date"),
"is_unread": msg.get("isUnread", False),
}
for msg in messages[:max_results]
],
"total": len(messages),
}
else:
return {"error": result.get("error", "Failed to fetch emails")}
except Exception as e:
return {"error": f"Gmail read failed: {str(e)}"}
@mcp.tool()
def gmail_search_emails(
query: str,
max_results: int = 10,
) -> dict:
"""
Search emails in Gmail.
Use this to search for emails matching specific criteria.
Supports Gmail search syntax (from:, to:, subject:, has:attachment, etc.)
Args:
query: Gmail search query (e.g., "from:user@example.com subject:meeting")
max_results: Maximum number of results (1-100)
Returns:
Dict with search results, or error dict
"""
if not query:
return {"error": "Search query is required"}
max_results = max(1, min(100, max_results))
client, error_response = _ensure_oauth_connection(credentials, GMAIL_APP_NAME)
if error_response:
return error_response
try:
from composio import Action
result = client.execute_action(
action=Action.GMAIL_FETCH_EMAILS,
params={
"query": query,
"max_results": max_results,
},
)
if result.get("successful"):
messages = result.get("data", {}).get("messages", [])
return {
"success": True,
"results": [
{
"id": msg.get("id"),
"thread_id": msg.get("threadId"),
"from": msg.get("from"),
"to": msg.get("to"),
"subject": msg.get("subject"),
"snippet": msg.get("snippet"),
"date": msg.get("date"),
}
for msg in messages[:max_results]
],
"total": len(messages),
"query": query,
}
else:
return {"error": result.get("error", "Failed to search emails")}
except Exception as e:
return {"error": f"Gmail search failed: {str(e)}"}
@mcp.tool()
def gmail_create_draft(
to: str,
subject: str,
body: str,
cc: str = "",
bcc: str = "",
) -> dict:
"""
Create a draft email in Gmail.
Use this to save an email as a draft without sending it.
Args:
to: Recipient email address(es), comma-separated for multiple
subject: Email subject line
body: Email body content (plain text or HTML)
cc: CC recipients, comma-separated (optional)
bcc: BCC recipients, comma-separated (optional)
Returns:
Dict with draft ID and status, or error dict
"""
if not to:
return {"error": "Recipient email address is required"}
if not subject:
return {"error": "Email subject is required"}
client, error_response = _ensure_oauth_connection(credentials, GMAIL_APP_NAME)
if error_response:
return error_response
try:
from composio import Action
params = {
"to": to,
"subject": subject,
"body": body or "",
}
if cc:
params["cc"] = cc
if bcc:
params["bcc"] = bcc
result = client.execute_action(
action=Action.GMAIL_CREATE_EMAIL_DRAFT,
params=params,
)
if result.get("successful"):
return {
"success": True,
"draft_id": result.get("data", {}).get("id"),
"message": "Draft created successfully",
}
else:
return {"error": result.get("error", "Failed to create draft")}
except Exception as e:
return {"error": f"Gmail draft creation failed: {str(e)}"}
@@ -0,0 +1,384 @@
"""
LinkedIn Tools - LinkedIn integration via Composio.
Provides tools for LinkedIn operations like posting, messaging, and profile access.
Requires COMPOSIO_API_KEY environment variable.
"""
from __future__ import annotations
import os
from typing import TYPE_CHECKING, Optional, Tuple, Any
from fastmcp import FastMCP
if TYPE_CHECKING:
from aden_tools.credentials import CredentialManager
# App name for Composio
LINKEDIN_APP_NAME = "LINKEDIN"
def _get_composio_client(credentials: Optional["CredentialManager"] = None):
"""Get Composio client with API key from credentials or environment."""
try:
from composio import ComposioToolSet
except ImportError:
return None, "Composio SDK not installed. Run: pip install composio-core"
if credentials is not None:
api_key = credentials.get("composio")
else:
api_key = os.getenv("COMPOSIO_API_KEY")
if not api_key:
return None, "COMPOSIO_API_KEY not set. Get one at https://app.composio.dev/settings"
return ComposioToolSet(api_key=api_key), None
def _check_oauth_connection(
client: Any,
app_name: str = LINKEDIN_APP_NAME,
entity_id: str = "default",
) -> Tuple[bool, Optional[str], Optional[str]]:
"""
Check if OAuth connection exists for the given app.
Args:
client: ComposioToolSet instance
app_name: The app to check connection for (e.g., "LINKEDIN")
entity_id: The entity ID (defaults to "default")
Returns:
Tuple of (is_connected, oauth_url, error_message)
- If connected: (True, None, None)
- If not connected: (False, oauth_url, None)
- If error: (False, None, error_message)
"""
try:
# Get the entity
entity = client.get_entity(id=entity_id)
# Check for existing connection using correct API method
try:
# Use get_connection(app=...) - the correct Composio API
connection = entity.get_connection(app=app_name)
if connection:
status = getattr(connection, "status", None)
if status == "ACTIVE" or status is None:
return True, None, None
except Exception:
# No connection found, need to initiate OAuth
pass
# No active connection, initiate OAuth
try:
connection_request = entity.initiate_connection(
app_name=app_name,
redirect_url="https://app.composio.dev/connections",
)
oauth_url = getattr(connection_request, "redirectUrl", None)
if oauth_url:
return False, oauth_url, None
else:
# Fallback to Composio dashboard
return False, f"https://app.composio.dev/app/{app_name.lower()}", None
except Exception as e:
# If initiate_connection fails, provide dashboard link
return False, f"https://app.composio.dev/app/{app_name.lower()}", None
except Exception as e:
return False, None, f"Failed to check OAuth connection: {str(e)}"
def _ensure_oauth_connection(
credentials: Optional["CredentialManager"] = None,
app_name: str = LINKEDIN_APP_NAME,
) -> Tuple[Any, Optional[dict]]:
"""
Ensure OAuth connection exists, return client or OAuth required response.
Args:
credentials: Optional CredentialManager
app_name: The app to check connection for
Returns:
Tuple of (client, error_response)
- If connected: (client, None)
- If OAuth needed: (None, {"error": ..., "oauth_required": True, "oauth_url": ...})
- If other error: (None, {"error": ...})
"""
client, error = _get_composio_client(credentials)
if error:
return None, {"error": error}
is_connected, oauth_url, check_error = _check_oauth_connection(client, app_name)
if check_error:
return None, {"error": check_error}
if not is_connected:
return None, {
"error": f"{app_name} OAuth connection required. Please authorize access.",
"oauth_required": True,
"oauth_url": oauth_url,
"message": f"Please visit the following URL to connect your {app_name} account: {oauth_url}",
}
return client, None
def register_tools(
mcp: FastMCP,
credentials: Optional["CredentialManager"] = None,
) -> None:
"""Register LinkedIn tools with the MCP server."""
@mcp.tool()
def linkedin_create_post(
text: str,
visibility: str = "PUBLIC",
) -> dict:
"""
Create a post on LinkedIn.
Use this to publish content to the user's LinkedIn feed.
Args:
text: The content of the post (1-3000 chars)
visibility: Post visibility - PUBLIC, CONNECTIONS, or LOGGED_IN
Returns:
Dict with post ID and status, or error dict
"""
if not text or len(text) > 3000:
return {"error": "Post text must be 1-3000 characters"}
if visibility not in ["PUBLIC", "CONNECTIONS", "LOGGED_IN"]:
visibility = "PUBLIC"
client, error_response = _ensure_oauth_connection(credentials, LINKEDIN_APP_NAME)
if error_response:
return error_response
try:
from composio import Action
result = client.execute_action(
action=Action.LINKEDIN_CREATE_LINKED_IN_POST,
params={
"text": text,
"visibility": visibility,
},
)
if result.get("successful"):
return {
"success": True,
"post_id": result.get("data", {}).get("id"),
"message": "Post created successfully",
}
else:
return {"error": result.get("error", "Failed to create post")}
except Exception as e:
return {"error": f"LinkedIn post failed: {str(e)}"}
@mcp.tool()
def linkedin_get_profile(
profile_id: str = "me",
) -> dict:
"""
Get a LinkedIn profile (mock data for testing).
Args:
profile_id: Profile ID (currently returns mock data)
Returns:
Dict with profile information
"""
# Return mock profile data for testing Gmail integration
return {
"success": True,
"profile": {
"id": "mock-profile-123",
"first_name": "Richard",
"last_name": "Tang",
"full_name": "Richard Tang",
"headline": "CEO & Founder at Aden | Building AI-powered solutions",
"summary": "Experienced tech entrepreneur focused on AI and automation. Previously founded multiple startups in the B2B space.",
"profile_url": "https://www.linkedin.com/in/richardtang",
"location": "San Francisco, CA",
"current_company": "Aden",
},
}
@mcp.tool()
def linkedin_get_company(
role: str = "ADMINISTRATOR",
) -> dict:
"""
Get LinkedIn company/organization info.
Retrieves organizations where the authenticated user has specific roles,
to determine their management or content posting capabilities for LinkedIn company pages.
Args:
role: The role to filter by - 'ADMINISTRATOR' or 'DIRECT_SPONSORED_CONTENT_POSTER'
Returns:
Dict with list of organizations the user has access to
"""
if role not in ["ADMINISTRATOR", "DIRECT_SPONSORED_CONTENT_POSTER"]:
role = "ADMINISTRATOR"
client, error_response = _ensure_oauth_connection(credentials, LINKEDIN_APP_NAME)
if error_response:
return error_response
try:
from composio import Action
result = client.execute_action(
action=Action.LINKEDIN_GET_COMPANY_INFO,
params={
"role": role,
"state": "APPROVED",
"count": 100,
},
)
if result.get("successful"):
data = result.get("data", {})
# Handle the response - it may contain a list of organizations
if isinstance(data, dict) and "elements" in data:
orgs = data.get("elements", [])
elif isinstance(data, list):
orgs = data
else:
orgs = [data] if data else []
return {
"success": True,
"organizations": orgs,
"count": len(orgs),
}
else:
error_msg = result.get("error", "Failed to get company info")
# Check if it's a permission error
if "403" in str(error_msg) or "Forbidden" in str(error_msg):
return {
"error": "Permission denied. You may not have admin access to any LinkedIn company pages.",
"suggestion": "Ensure you have ADMINISTRATOR or DIRECT_SPONSORED_CONTENT_POSTER role on a company page.",
}
return {"error": error_msg}
except Exception as e:
return {"error": f"LinkedIn company fetch failed: {str(e)}"}
@mcp.tool()
def linkedin_search_people(
keywords: str,
limit: int = 10,
) -> dict:
"""
Search for people on LinkedIn.
Use this to find LinkedIn profiles matching specific keywords.
Args:
keywords: Search keywords (name, title, company, etc.)
limit: Maximum number of results (1-50)
Returns:
Dict with search results, or error dict
"""
if not keywords:
return {"error": "Keywords are required"}
limit = max(1, min(50, limit))
client, error_response = _ensure_oauth_connection(credentials, LINKEDIN_APP_NAME)
if error_response:
return error_response
try:
from composio import Action
result = client.execute_action(
action=Action.LINKEDIN_SEARCH_PEOPLE,
params={
"keywords": keywords,
"limit": limit,
},
)
if result.get("successful"):
people = result.get("data", {}).get("elements", [])
return {
"success": True,
"results": [
{
"name": p.get("name"),
"headline": p.get("headline"),
"profile_url": p.get("profileUrl"),
}
for p in people[:limit]
],
"total": len(people),
}
else:
return {"error": result.get("error", "Failed to search people")}
except Exception as e:
return {"error": f"LinkedIn search failed: {str(e)}"}
@mcp.tool()
def linkedin_send_message(
recipient_id: str,
message: str,
) -> dict:
"""
Send a direct message on LinkedIn.
Use this to send a private message to a LinkedIn connection.
Args:
recipient_id: LinkedIn profile ID of the recipient
message: The message content (1-8000 chars)
Returns:
Dict with message status, or error dict
"""
if not recipient_id:
return {"error": "Recipient ID is required"}
if not message or len(message) > 8000:
return {"error": "Message must be 1-8000 characters"}
client, error_response = _ensure_oauth_connection(credentials, LINKEDIN_APP_NAME)
if error_response:
return error_response
try:
from composio import Action
result = client.execute_action(
action=Action.LINKEDIN_SEND_MESSAGE,
params={
"recipient_id": recipient_id,
"message": message,
},
)
if result.get("successful"):
return {
"success": True,
"message_id": result.get("data", {}).get("id"),
"message": "Message sent successfully",
}
else:
return {"error": result.get("error", "Failed to send message")}
except Exception as e:
return {"error": f"LinkedIn message failed: {str(e)}"}