merge: incorporate Twitter/X community PR #3807

# Conflicts:
#	tools/src/aden_tools/credentials/__init__.py
#	tools/src/aden_tools/tools/__init__.py
#	tools/tests/test_credentials.py
This commit is contained in:
Timothy
2026-03-03 13:16:45 -08:00
7 changed files with 1275 additions and 0 deletions
+106
View File
@@ -0,0 +1,106 @@
"""
X (Twitter) tool credentials.
Contains credentials for X API v2 integration.
Bearer token for read-only operations, OAuth 1.0a keys for write operations.
"""
from .base import CredentialSpec
_X_TOOLS = [
"x_post_tweet",
"x_reply_tweet",
"x_delete_tweet",
"x_search_tweets",
"x_get_mentions",
"x_send_dm",
]
X_CREDENTIALS = {
"x_bearer_token": CredentialSpec(
env_var="X_BEARER_TOKEN",
tools=_X_TOOLS,
required=True,
startup_required=False,
help_url="https://developer.x.com/en/portal/dashboard",
description="X (Twitter) API v2 Bearer Token for read-only operations",
direct_api_key_supported=True,
api_key_instructions="""To get an X API Bearer Token:
1. Go to https://developer.x.com/en/portal/dashboard
2. Create a Project & App (or select existing)
3. Go to Keys & Tokens tab
4. Copy the Bearer Token
5. Set it as X_BEARER_TOKEN environment variable""",
health_check_endpoint="https://api.x.com/2/users/me",
health_check_method="GET",
credential_id="x_bearer_token",
credential_key="api_key",
credential_group="x",
),
"x_api_key": CredentialSpec(
env_var="X_API_KEY",
tools=_X_TOOLS,
required=False,
startup_required=False,
help_url="https://developer.x.com/en/portal/dashboard",
description="X (Twitter) API Consumer Key for OAuth 1.0a write operations",
direct_api_key_supported=True,
api_key_instructions="""To get your X API Consumer Key:
1. Go to https://developer.x.com/en/portal/dashboard
2. Select your app > Keys and Tokens
3. Under Consumer Keys, copy the API Key""",
credential_id="x_api_key",
credential_key="api_key",
credential_group="x",
),
"x_api_secret": CredentialSpec(
env_var="X_API_SECRET",
tools=_X_TOOLS,
required=False,
startup_required=False,
help_url="https://developer.x.com/en/portal/dashboard",
description="X (Twitter) API Consumer Secret for OAuth 1.0a write operations",
direct_api_key_supported=True,
api_key_instructions="""To get your X API Consumer Secret:
1. Go to https://developer.x.com/en/portal/dashboard
2. Select your app > Keys and Tokens
3. Under Consumer Keys, copy the API Secret""",
credential_id="x_api_secret",
credential_key="api_key",
credential_group="x",
),
"x_access_token": CredentialSpec(
env_var="X_ACCESS_TOKEN",
tools=_X_TOOLS,
required=False,
startup_required=False,
help_url="https://developer.x.com/en/portal/dashboard",
description="X (Twitter) User Access Token for OAuth 1.0a write operations",
direct_api_key_supported=True,
api_key_instructions="""To get your X Access Token:
1. Go to https://developer.x.com/en/portal/dashboard
2. Select your app > Keys and Tokens
3. Under Authentication Tokens, generate Access Token and Secret
4. Copy the Access Token""",
credential_id="x_access_token",
credential_key="api_key",
credential_group="x",
),
"x_access_token_secret": CredentialSpec(
env_var="X_ACCESS_TOKEN_SECRET",
tools=_X_TOOLS,
required=False,
startup_required=False,
help_url="https://developer.x.com/en/portal/dashboard",
description="X (Twitter) User Access Token Secret for OAuth 1.0a write operations",
direct_api_key_supported=True,
api_key_instructions="""To get your X Access Token Secret:
1. Go to https://developer.x.com/en/portal/dashboard
2. Select your app > Keys and Tokens
3. Under Authentication Tokens, generate Access Token and Secret
4. Copy the Access Token Secret""",
credential_id="x_access_token_secret",
credential_key="api_key",
credential_group="x",
),
}
@@ -0,0 +1,84 @@
# X (Twitter) Tool
Hive integration for the X (Twitter) API v2.
Enables agents to post tweets, reply to users, search recent tweets, and monitor mentions — allowing social automation workflows directly inside Hive.
## Features
- Post tweets
- Reply to tweets
- Delete tweets
- Search recent tweets
- Fetch user mentions
## Tools
| Tool | Description |
|--------|-------------|
| x_post_tweet | Post a new tweet |
| x_reply_tweet | Reply to an existing tweet |
| x_delete_tweet | Delete a tweet |
| x_search_tweets | Search recent tweets by query |
| x_get_mentions | Fetch mentions for a user |
## Authentication
This integration uses an **X API v2 Bearer Token**.
### Option 1 — Environment variable
export X_BEARER_TOKEN=your_token_here
### Option 2 — Hive credential store (recommended)
Configure credential id:
x
Hive will automatically inject credentials into all `x_*` tools.
## How to get a Bearer Token
1. Go to https://developer.x.com/
2. Create a Project & App
3. Enable API v2 access
4. Open **Keys & Tokens**
5. Copy the **Bearer Token**
## Example Usage
### Post a tweet
x_post_tweet("Hello from Hive 🚀")
### Reply to a tweet
x_reply_tweet(tweet_id="123456789", text="Thanks for the mention!")
### Search tweets
x_search_tweets(query="AI agents", max_results=5)
### Get mentions
x_get_mentions(user_id="2244994945")
## Notes
- Uses lightweight httpx client (no external SDK)
- Follows HubSpot tool architecture for consistency
- Compatible with Hive CredentialStoreAdapter
- Handles rate limits and common HTTP errors gracefully
- Max results capped at 100 per request (API limit)
## Development
Run tests:
pytest
Start MCP server:
python mcp_server.py
@@ -0,0 +1,11 @@
"""
X (Twitter) Tool - Post tweets, reply, search, and read mentions via X API v2.
Supports:
- Bearer tokens (X_BEARER_TOKEN)
- OAuth2 tokens via credential store
"""
from .x_tool import register_tools
__all__ = ["register_tools"]
@@ -0,0 +1,9 @@
from aden_tools.credentials.integrations import INTEGRATION_CREDENTIALS
def test_x_credential_spec_exists():
assert "x" in INTEGRATION_CREDENTIALS
spec = INTEGRATION_CREDENTIALS["x"]
assert spec.env_var == "X_BEARER_TOKEN"
assert "x_post_tweet" in spec.tools
@@ -0,0 +1,23 @@
import unittest
from unittest.mock import patch
import httpx
from aden_tools.tools.x_tool.x_tool import (
_XClient,
)
class TestXClient(unittest.TestCase):
@patch("httpx.request")
def test_post(self, mock_req):
mock_req.return_value = httpx.Response(200, json={"data": {"id": "1", "text": "hi"}})
client = _XClient("fake")
res = client.request("POST", "/tweets", json={"text": "hi"})
self.assertIn("data", res)
if __name__ == "__main__":
unittest.main()
+425
View File
@@ -0,0 +1,425 @@
"""
X (Twitter) Tool - Post tweets, reply, search, read mentions, and send DMs via X API v2.
Authentication:
- Bearer token (X_BEARER_TOKEN): read-only operations (search, mentions).
- OAuth 1.0a User Context: write operations (post, reply, delete, DM).
Requires X_API_KEY, X_API_SECRET, X_ACCESS_TOKEN, X_ACCESS_TOKEN_SECRET.
API Reference: https://developer.x.com/en/docs/twitter-api
"""
from __future__ import annotations
import hashlib
import hmac
import os
import time
import urllib.parse
import uuid
from typing import TYPE_CHECKING, Any
import httpx
from fastmcp import FastMCP
if TYPE_CHECKING:
from aden_tools.credentials import CredentialStoreAdapter
X_API_BASE = "https://api.x.com/2"
class _XClient:
"""Internal client wrapping X API v2 calls with Bearer token auth."""
def __init__(self, bearer_token: str):
self._bearer_token = bearer_token
@property
def _headers(self) -> dict[str, str]:
return {
"Authorization": f"Bearer {self._bearer_token}",
"Content-Type": "application/json",
"Accept": "application/json",
}
def _handle_response(self, response: httpx.Response) -> dict[str, Any]:
if response.status_code == 401:
return {"error": "Invalid or expired X access token"}
if response.status_code == 403:
return {
"error": "Insufficient permissions — this operation may require OAuth 1.0a "
"user context authentication (not just a Bearer token).",
"help": "Set X_API_KEY, X_API_SECRET, X_ACCESS_TOKEN, and "
"X_ACCESS_TOKEN_SECRET for write operations.",
}
if response.status_code == 404:
return {"error": "Resource not found"}
if response.status_code == 429:
return {"error": "Rate limit exceeded. Try again later."}
if response.status_code >= 400:
try:
detail = response.json()
except Exception:
detail = response.text
return {"error": f"X API error (HTTP {response.status_code}): {detail}"}
return response.json()
def get(self, endpoint: str, params: dict | None = None) -> dict[str, Any]:
"""Make a GET request with Bearer auth."""
response = httpx.get(
f"{X_API_BASE}{endpoint}",
headers=self._headers,
params=params,
timeout=30.0,
)
return self._handle_response(response)
class _XOAuthClient:
"""Internal client wrapping X API v2 calls with OAuth 1.0a user context auth."""
def __init__(
self,
api_key: str,
api_secret: str,
access_token: str,
access_token_secret: str,
):
self._api_key = api_key
self._api_secret = api_secret
self._access_token = access_token
self._access_token_secret = access_token_secret
def _generate_oauth_signature(
self,
method: str,
url: str,
oauth_params: dict[str, str],
body_params: dict[str, str] | None = None,
) -> str:
"""Generate OAuth 1.0a HMAC-SHA1 signature."""
all_params = {**oauth_params}
if body_params:
all_params.update(body_params)
# Sort and encode params
sorted_params = sorted(all_params.items())
param_string = "&".join(
f"{urllib.parse.quote(k, safe='')}={urllib.parse.quote(str(v), safe='')}"
for k, v in sorted_params
)
# Create signature base string
base_string = (
f"{method.upper()}&"
f"{urllib.parse.quote(url, safe='')}&"
f"{urllib.parse.quote(param_string, safe='')}"
)
# Create signing key
signing_key = (
f"{urllib.parse.quote(self._api_secret, safe='')}&"
f"{urllib.parse.quote(self._access_token_secret, safe='')}"
)
# HMAC-SHA1
import base64
signature = base64.b64encode(
hmac.new(
signing_key.encode("utf-8"),
base_string.encode("utf-8"),
hashlib.sha1,
).digest()
).decode("utf-8")
return signature
def _build_auth_header(self, method: str, url: str) -> str:
"""Build the OAuth 1.0a Authorization header."""
oauth_params = {
"oauth_consumer_key": self._api_key,
"oauth_nonce": uuid.uuid4().hex,
"oauth_signature_method": "HMAC-SHA1",
"oauth_timestamp": str(int(time.time())),
"oauth_token": self._access_token,
"oauth_version": "1.0",
}
signature = self._generate_oauth_signature(method, url, oauth_params)
oauth_params["oauth_signature"] = signature
header_parts = [
f'{urllib.parse.quote(k, safe="")}="{urllib.parse.quote(v, safe="")}"'
for k, v in sorted(oauth_params.items())
]
return "OAuth " + ", ".join(header_parts)
def _handle_response(self, response: httpx.Response) -> dict[str, Any]:
if response.status_code == 401:
return {
"error": "OAuth 1.0a authentication failed — check your API keys and tokens",
"help": "Verify X_API_KEY, X_API_SECRET, X_ACCESS_TOKEN, "
"and X_ACCESS_TOKEN_SECRET are correct.",
}
if response.status_code == 403:
try:
detail = response.json()
except Exception:
detail = response.text
return {
"error": f"Insufficient permissions (HTTP 403): {detail}",
"help": "Ensure your X app has Read+Write+Direct Message permissions "
"and tokens were regenerated AFTER enabling them.",
}
if response.status_code == 404:
return {"error": "Resource not found"}
if response.status_code == 429:
return {"error": "Rate limit exceeded. Try again later."}
if response.status_code >= 400:
try:
detail = response.json()
except Exception:
detail = response.text
return {"error": f"X API error (HTTP {response.status_code}): {detail}"}
return response.json()
def post(self, endpoint: str, json_body: dict | None = None) -> dict[str, Any]:
"""Make a POST request with OAuth 1.0a auth."""
url = f"{X_API_BASE}{endpoint}"
auth_header = self._build_auth_header("POST", url)
headers = {
"Authorization": auth_header,
"Content-Type": "application/json",
"Accept": "application/json",
}
response = httpx.post(url, headers=headers, json=json_body, timeout=30.0)
return self._handle_response(response)
def delete(self, endpoint: str) -> dict[str, Any]:
"""Make a DELETE request with OAuth 1.0a auth."""
url = f"{X_API_BASE}{endpoint}"
auth_header = self._build_auth_header("DELETE", url)
headers = {
"Authorization": auth_header,
"Accept": "application/json",
}
response = httpx.delete(url, headers=headers, timeout=30.0)
return self._handle_response(response)
def register_tools(
mcp: FastMCP,
credentials: CredentialStoreAdapter | None = None,
) -> None:
"""Register X (Twitter) tools with the MCP server."""
def _get_credential(env_var: str, cred_name: str) -> str | None:
"""Get a credential from the credential manager or environment."""
if credentials is not None:
val = credentials.get(cred_name)
if val is not None and not isinstance(val, str):
raise TypeError(
f"Expected string for credential '{cred_name}', got {type(val).__name__}"
)
return val
return os.getenv(env_var)
def _get_bearer_client() -> _XClient | dict[str, str]:
"""Get a Bearer-token client for read-only operations."""
token = _get_credential("X_BEARER_TOKEN", "x_bearer_token")
if not token:
return {
"error": "X Bearer token not configured",
"help": "Set X_BEARER_TOKEN environment variable. "
"Get it from https://developer.x.com/ > Keys & Tokens.",
}
return _XClient(token)
def _get_oauth_client() -> _XOAuthClient | dict[str, str]:
"""Get an OAuth 1.0a client for write operations."""
api_key = _get_credential("X_API_KEY", "x_api_key")
api_secret = _get_credential("X_API_SECRET", "x_api_secret")
access_token = _get_credential("X_ACCESS_TOKEN", "x_access_token")
access_secret = _get_credential("X_ACCESS_TOKEN_SECRET", "x_access_token_secret")
if not all([api_key, api_secret, access_token, access_secret]):
missing = []
if not api_key:
missing.append("X_API_KEY")
if not api_secret:
missing.append("X_API_SECRET")
if not access_token:
missing.append("X_ACCESS_TOKEN")
if not access_secret:
missing.append("X_ACCESS_TOKEN_SECRET")
return {
"error": f"X OAuth credentials not configured: {', '.join(missing)}",
"help": "Write operations (post, reply, delete, DM) require OAuth 1.0a. "
"Set all 4 env vars from https://developer.x.com/ > Keys & Tokens.",
}
return _XOAuthClient(api_key, api_secret, access_token, access_secret)
# ── Read-only tools (Bearer token) ──────────────────────────────
@mcp.tool()
def x_search_tweets(query: str, max_results: int = 10) -> dict:
"""Search recent tweets by keyword or query.
Uses Bearer token authentication (read-only).
Args:
query: Search query string (supports X search operators).
max_results: Number of results to return (1-100, default 10).
Returns:
Dict with matching tweets or error details.
"""
client = _get_bearer_client()
if isinstance(client, dict):
return client
params = {"query": query, "max_results": min(max(max_results, 1), 100)}
try:
return client.get("/tweets/search/recent", params=params)
except httpx.TimeoutException:
return {"error": "X API request timed out"}
except httpx.RequestError as e:
return {"error": f"Network error: {e}"}
@mcp.tool()
def x_get_mentions(user_id: str, max_results: int = 10) -> dict:
"""Fetch recent mentions for a user.
Uses Bearer token authentication (read-only).
Args:
user_id: The X user ID to fetch mentions for.
max_results: Number of results to return (1-100, default 10).
Returns:
Dict with mention tweets or error details.
"""
client = _get_bearer_client()
if isinstance(client, dict):
return client
params = {"max_results": min(max(max_results, 1), 100)}
try:
return client.get(f"/users/{user_id}/mentions", params=params)
except httpx.TimeoutException:
return {"error": "X API request timed out"}
except httpx.RequestError as e:
return {"error": f"Network error: {e}"}
# ── Write tools (OAuth 1.0a required) ───────────────────────────
@mcp.tool()
def x_post_tweet(text: str) -> dict:
"""Post a new tweet.
Requires OAuth 1.0a authentication (X_API_KEY, X_API_SECRET,
X_ACCESS_TOKEN, X_ACCESS_TOKEN_SECRET).
Args:
text: Tweet text content (max 280 characters).
Returns:
Dict with created tweet data or error details.
"""
if not text or not text.strip():
return {"error": "Tweet text cannot be empty"}
if len(text) > 280:
return {"error": f"Tweet text exceeds 280 characters ({len(text)} chars)"}
client = _get_oauth_client()
if isinstance(client, dict):
return client
try:
return client.post("/tweets", json_body={"text": text})
except httpx.TimeoutException:
return {"error": "X API request timed out"}
except httpx.RequestError as e:
return {"error": f"Network error: {e}"}
@mcp.tool()
def x_reply_tweet(tweet_id: str, text: str) -> dict:
"""Reply to an existing tweet.
Requires OAuth 1.0a authentication (X_API_KEY, X_API_SECRET,
X_ACCESS_TOKEN, X_ACCESS_TOKEN_SECRET).
Args:
tweet_id: The ID of the tweet to reply to.
text: Reply text content (max 280 characters).
Returns:
Dict with created reply data or error details.
"""
if not text or not text.strip():
return {"error": "Reply text cannot be empty"}
if len(text) > 280:
return {"error": f"Reply text exceeds 280 characters ({len(text)} chars)"}
client = _get_oauth_client()
if isinstance(client, dict):
return client
body = {"text": text, "reply": {"in_reply_to_tweet_id": tweet_id}}
try:
return client.post("/tweets", json_body=body)
except httpx.TimeoutException:
return {"error": "X API request timed out"}
except httpx.RequestError as e:
return {"error": f"Network error: {e}"}
@mcp.tool()
def x_delete_tweet(tweet_id: str) -> dict:
"""Delete a tweet.
Requires OAuth 1.0a authentication (X_API_KEY, X_API_SECRET,
X_ACCESS_TOKEN, X_ACCESS_TOKEN_SECRET).
Args:
tweet_id: The ID of the tweet to delete.
Returns:
Dict with deletion confirmation or error details.
"""
client = _get_oauth_client()
if isinstance(client, dict):
return client
try:
return client.delete(f"/tweets/{tweet_id}")
except httpx.TimeoutException:
return {"error": "X API request timed out"}
except httpx.RequestError as e:
return {"error": f"Network error: {e}"}
@mcp.tool()
def x_send_dm(participant_id: str, text: str) -> dict:
"""Send a direct message to a user on X.
Requires OAuth 1.0a authentication (X_API_KEY, X_API_SECRET,
X_ACCESS_TOKEN, X_ACCESS_TOKEN_SECRET). Your X app must have
Direct Message permissions enabled.
Args:
participant_id: The X user ID of the DM recipient.
text: Message text content.
Returns:
Dict with DM event data or error details.
"""
if not text or not text.strip():
return {"error": "DM text cannot be empty"}
client = _get_oauth_client()
if isinstance(client, dict):
return client
body = {"text": text}
try:
return client.post(f"/dm_conversations/with/{participant_id}/messages", json_body=body)
except httpx.TimeoutException:
return {"error": "X API request timed out"}
except httpx.RequestError as e:
return {"error": f"Network error: {e}"}
+617
View File
@@ -0,0 +1,617 @@
"""Tests for the X (Twitter) tool."""
from __future__ import annotations
from unittest.mock import MagicMock, patch
import httpx
import pytest
from fastmcp import FastMCP
from aden_tools.tools.x_tool.x_tool import (
X_API_BASE,
_XClient,
_XOAuthClient,
register_tools,
)
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def mcp_with_x(monkeypatch):
"""MCP server with all X credentials set."""
monkeypatch.setenv("X_BEARER_TOKEN", "test-bearer-token")
monkeypatch.setenv("X_API_KEY", "test-api-key")
monkeypatch.setenv("X_API_SECRET", "test-api-secret")
monkeypatch.setenv("X_ACCESS_TOKEN", "test-access-token")
monkeypatch.setenv("X_ACCESS_TOKEN_SECRET", "test-access-secret")
mcp = FastMCP("test-x")
register_tools(mcp)
return mcp
@pytest.fixture
def mcp_bearer_only(monkeypatch):
"""MCP server with only Bearer token set (no OAuth)."""
monkeypatch.setenv("X_BEARER_TOKEN", "test-bearer-token")
monkeypatch.delenv("X_API_KEY", raising=False)
monkeypatch.delenv("X_API_SECRET", raising=False)
monkeypatch.delenv("X_ACCESS_TOKEN", raising=False)
monkeypatch.delenv("X_ACCESS_TOKEN_SECRET", raising=False)
mcp = FastMCP("test-x-bearer-only")
register_tools(mcp)
return mcp
@pytest.fixture
def mcp_no_creds(monkeypatch):
"""MCP server with no X credentials set."""
monkeypatch.delenv("X_BEARER_TOKEN", raising=False)
monkeypatch.delenv("X_API_KEY", raising=False)
monkeypatch.delenv("X_API_SECRET", raising=False)
monkeypatch.delenv("X_ACCESS_TOKEN", raising=False)
monkeypatch.delenv("X_ACCESS_TOKEN_SECRET", raising=False)
mcp = FastMCP("test-x-no-creds")
register_tools(mcp)
return mcp
def _get_tool_fn(mcp, tool_name):
"""Get a tool function from the MCP server."""
return mcp._tool_manager._tools[tool_name].fn
def _make_response(status_code=200, json_data=None):
"""Create a mock httpx response."""
mock_resp = MagicMock()
mock_resp.status_code = status_code
mock_resp.json.return_value = json_data or {}
mock_resp.text = "{}"
return mock_resp
# ---------------------------------------------------------------------------
# TestXClient (Bearer token client)
# ---------------------------------------------------------------------------
class TestXClient:
"""Test the Bearer token client."""
def setup_method(self):
self.client = _XClient("test-bearer")
def test_headers(self):
headers = self.client._headers
assert headers["Authorization"] == "Bearer test-bearer"
assert headers["Content-Type"] == "application/json"
@pytest.mark.parametrize(
"status_code,expected_substring",
[
(401, "invalid or expired"),
(403, "insufficient permissions"),
(404, "not found"),
(429, "rate limit"),
],
)
def test_handle_response_errors(self, status_code, expected_substring):
response = _make_response(status_code)
result = self.client._handle_response(response)
assert "error" in result
assert expected_substring in result["error"].lower()
def test_handle_response_success(self):
response = _make_response(200, {"data": [{"id": "1"}]})
result = self.client._handle_response(response)
assert result == {"data": [{"id": "1"}]}
@patch("aden_tools.tools.x_tool.x_tool.httpx.get")
def test_get_request(self, mock_get):
mock_get.return_value = _make_response(200, {"data": []})
result = self.client.get("/tweets/search/recent", params={"query": "test"})
mock_get.assert_called_once_with(
f"{X_API_BASE}/tweets/search/recent",
headers=self.client._headers,
params={"query": "test"},
timeout=30.0,
)
assert result == {"data": []}
def test_handle_generic_4xx_error(self):
response = _make_response(400, {"detail": "Bad request"})
result = self.client._handle_response(response)
assert "error" in result
assert "400" in result["error"]
# ---------------------------------------------------------------------------
# TestXOAuthClient
# ---------------------------------------------------------------------------
class TestXOAuthClient:
"""Test the OAuth 1.0a client."""
def setup_method(self):
self.client = _XOAuthClient(
api_key="test-key",
api_secret="test-secret",
access_token="test-access",
access_token_secret="test-access-secret",
)
def test_oauth_signature_generation(self):
oauth_params = {
"oauth_consumer_key": "test-key",
"oauth_nonce": "testnonce",
"oauth_signature_method": "HMAC-SHA1",
"oauth_timestamp": "1234567890",
"oauth_token": "test-access",
"oauth_version": "1.0",
}
sig = self.client._generate_oauth_signature(
"POST", "https://api.x.com/2/tweets", oauth_params
)
# Should be a non-empty base64 string
assert sig
assert len(sig) > 10
def test_build_auth_header(self):
header = self.client._build_auth_header("POST", "https://api.x.com/2/tweets")
assert header.startswith("OAuth ")
assert "oauth_consumer_key" in header
assert "oauth_signature" in header
assert "oauth_token" in header
@patch("aden_tools.tools.x_tool.x_tool.httpx.post")
def test_post_request(self, mock_post):
mock_post.return_value = _make_response(200, {"data": {"id": "123", "text": "hi"}})
result = self.client.post("/tweets", json_body={"text": "hi"})
assert mock_post.called
assert result == {"data": {"id": "123", "text": "hi"}}
@patch("aden_tools.tools.x_tool.x_tool.httpx.delete")
def test_delete_request(self, mock_delete):
mock_delete.return_value = _make_response(200, {"data": {"deleted": True}})
result = self.client.delete("/tweets/123")
assert mock_delete.called
assert result == {"data": {"deleted": True}}
@pytest.mark.parametrize(
"status_code,expected_substring",
[
(401, "oauth 1.0a authentication failed"),
(403, "insufficient permissions"),
(404, "not found"),
(429, "rate limit"),
],
)
def test_handle_response_errors(self, status_code, expected_substring):
response = _make_response(status_code)
result = self.client._handle_response(response)
assert "error" in result
assert expected_substring in result["error"].lower()
# ---------------------------------------------------------------------------
# TestCredentials
# ---------------------------------------------------------------------------
class TestCredentials:
"""Test credential validation for all tools."""
@pytest.mark.parametrize(
"tool_name",
["x_search_tweets", "x_get_mentions"],
)
def test_bearer_tools_missing_creds(self, mcp_no_creds, tool_name):
fn = _get_tool_fn(mcp_no_creds, tool_name)
if tool_name == "x_search_tweets":
result = fn(query="test")
else:
result = fn(user_id="123")
assert "error" in result
assert "bearer" in result["error"].lower()
assert "help" in result
@pytest.mark.parametrize(
"tool_name",
["x_post_tweet", "x_reply_tweet", "x_delete_tweet", "x_send_dm"],
)
def test_oauth_tools_missing_creds(self, mcp_bearer_only, tool_name):
"""Write tools should fail when OAuth creds are missing (even with bearer)."""
fn = _get_tool_fn(mcp_bearer_only, tool_name)
if tool_name == "x_post_tweet":
result = fn(text="test")
elif tool_name == "x_reply_tweet":
result = fn(tweet_id="1", text="test")
elif tool_name == "x_delete_tweet":
result = fn(tweet_id="1")
elif tool_name == "x_send_dm":
result = fn(participant_id="1", text="test")
assert "error" in result
assert "oauth" in result["error"].lower()
assert "help" in result
# ---------------------------------------------------------------------------
# TestSearchTweets
# ---------------------------------------------------------------------------
class TestSearchTweets:
"""Test x_search_tweets tool."""
@patch("aden_tools.tools.x_tool.x_tool.httpx.get")
def test_search_success(self, mock_get, mcp_with_x):
mock_get.return_value = _make_response(
200,
{
"data": [{"id": "1", "text": "AI is cool"}],
"meta": {"result_count": 1},
},
)
fn = _get_tool_fn(mcp_with_x, "x_search_tweets")
result = fn(query="AI agents", max_results=5)
assert "data" in result
call_params = mock_get.call_args.kwargs["params"]
assert call_params["query"] == "AI agents"
assert call_params["max_results"] == 5
@patch("aden_tools.tools.x_tool.x_tool.httpx.get")
def test_search_max_results_capped(self, mock_get, mcp_with_x):
mock_get.return_value = _make_response(200, {"data": []})
fn = _get_tool_fn(mcp_with_x, "x_search_tweets")
fn(query="test", max_results=999)
call_params = mock_get.call_args.kwargs["params"]
assert call_params["max_results"] == 100
@patch("aden_tools.tools.x_tool.x_tool.httpx.get")
def test_search_timeout(self, mock_get, mcp_with_x):
mock_get.side_effect = httpx.TimeoutException("timeout")
fn = _get_tool_fn(mcp_with_x, "x_search_tweets")
result = fn(query="test")
assert "error" in result
assert "timed out" in result["error"].lower()
@patch("aden_tools.tools.x_tool.x_tool.httpx.get")
def test_search_network_error(self, mock_get, mcp_with_x):
mock_get.side_effect = httpx.ConnectError("Connection refused")
fn = _get_tool_fn(mcp_with_x, "x_search_tweets")
result = fn(query="test")
assert "error" in result
assert "network error" in result["error"].lower()
# ---------------------------------------------------------------------------
# TestGetMentions
# ---------------------------------------------------------------------------
class TestGetMentions:
"""Test x_get_mentions tool."""
@patch("aden_tools.tools.x_tool.x_tool.httpx.get")
def test_mentions_success(self, mock_get, mcp_with_x):
mock_get.return_value = _make_response(
200,
{
"data": [{"id": "1", "text": "@user hello"}],
},
)
fn = _get_tool_fn(mcp_with_x, "x_get_mentions")
result = fn(user_id="12345", max_results=5)
assert "data" in result
assert mock_get.call_args.kwargs["params"]["max_results"] == 5
@patch("aden_tools.tools.x_tool.x_tool.httpx.get")
def test_mentions_min_clamped(self, mock_get, mcp_with_x):
mock_get.return_value = _make_response(200, {"data": []})
fn = _get_tool_fn(mcp_with_x, "x_get_mentions")
fn(user_id="12345", max_results=-5)
assert mock_get.call_args.kwargs["params"]["max_results"] == 1
# ---------------------------------------------------------------------------
# TestPostTweet
# ---------------------------------------------------------------------------
class TestPostTweet:
"""Test x_post_tweet tool."""
@patch("aden_tools.tools.x_tool.x_tool.httpx.post")
def test_post_success(self, mock_post, mcp_with_x):
mock_post.return_value = _make_response(
200,
{
"data": {"id": "111", "text": "Hello world"},
},
)
fn = _get_tool_fn(mcp_with_x, "x_post_tweet")
result = fn(text="Hello world")
assert "data" in result
assert result["data"]["id"] == "111"
def test_post_empty_text(self, mcp_with_x):
fn = _get_tool_fn(mcp_with_x, "x_post_tweet")
result = fn(text="")
assert "error" in result
assert "empty" in result["error"].lower()
def test_post_too_long(self, mcp_with_x):
fn = _get_tool_fn(mcp_with_x, "x_post_tweet")
result = fn(text="a" * 281)
assert "error" in result
assert "280" in result["error"]
@patch("aden_tools.tools.x_tool.x_tool.httpx.post")
def test_post_timeout(self, mock_post, mcp_with_x):
mock_post.side_effect = httpx.TimeoutException("timeout")
fn = _get_tool_fn(mcp_with_x, "x_post_tweet")
result = fn(text="Hello")
assert "error" in result
assert "timed out" in result["error"].lower()
# ---------------------------------------------------------------------------
# TestReplyTweet
# ---------------------------------------------------------------------------
class TestReplyTweet:
"""Test x_reply_tweet tool."""
@patch("aden_tools.tools.x_tool.x_tool.httpx.post")
def test_reply_success(self, mock_post, mcp_with_x):
mock_post.return_value = _make_response(
200,
{
"data": {"id": "222", "text": "Great point!"},
},
)
fn = _get_tool_fn(mcp_with_x, "x_reply_tweet")
result = fn(tweet_id="111", text="Great point!")
assert "data" in result
def test_reply_empty_text(self, mcp_with_x):
fn = _get_tool_fn(mcp_with_x, "x_reply_tweet")
result = fn(tweet_id="111", text="")
assert "error" in result
def test_reply_too_long(self, mcp_with_x):
fn = _get_tool_fn(mcp_with_x, "x_reply_tweet")
result = fn(tweet_id="111", text="b" * 281)
assert "error" in result
assert "280" in result["error"]
# ---------------------------------------------------------------------------
# TestDeleteTweet
# ---------------------------------------------------------------------------
class TestDeleteTweet:
"""Test x_delete_tweet tool."""
@patch("aden_tools.tools.x_tool.x_tool.httpx.delete")
def test_delete_success(self, mock_delete, mcp_with_x):
mock_delete.return_value = _make_response(
200,
{
"data": {"deleted": True},
},
)
fn = _get_tool_fn(mcp_with_x, "x_delete_tweet")
result = fn(tweet_id="111")
assert result["data"]["deleted"] is True
@patch("aden_tools.tools.x_tool.x_tool.httpx.delete")
def test_delete_not_found(self, mock_delete, mcp_with_x):
mock_delete.return_value = _make_response(404)
fn = _get_tool_fn(mcp_with_x, "x_delete_tweet")
result = fn(tweet_id="999")
assert "error" in result
assert "not found" in result["error"].lower()
# ---------------------------------------------------------------------------
# TestSendDM
# ---------------------------------------------------------------------------
class TestSendDM:
"""Test x_send_dm tool."""
@patch("aden_tools.tools.x_tool.x_tool.httpx.post")
def test_dm_success(self, mock_post, mcp_with_x):
mock_post.return_value = _make_response(
200,
{
"data": {"dm_event_id": "999", "text": "Hey there!"},
},
)
fn = _get_tool_fn(mcp_with_x, "x_send_dm")
result = fn(participant_id="12345", text="Hey there!")
assert "data" in result
assert result["data"]["dm_event_id"] == "999"
# Verify correct v2 1:1 endpoint usage
call_args = mock_post.call_args
assert call_args[0][0] == f"{X_API_BASE}/dm_conversations/with/12345/messages"
assert call_args[1]["json"] == {"text": "Hey there!"}
def test_dm_empty_text(self, mcp_with_x):
fn = _get_tool_fn(mcp_with_x, "x_send_dm")
result = fn(participant_id="12345", text="")
assert "error" in result
assert "empty" in result["error"].lower()
@patch("aden_tools.tools.x_tool.x_tool.httpx.post")
def test_dm_network_error(self, mock_post, mcp_with_x):
mock_post.side_effect = httpx.ConnectError("Connection refused")
fn = _get_tool_fn(mcp_with_x, "x_send_dm")
result = fn(participant_id="12345", text="Hello")
assert "error" in result
assert "network error" in result["error"].lower()
def test_dm_missing_oauth_creds(self, mcp_bearer_only):
fn = _get_tool_fn(mcp_bearer_only, "x_send_dm")
result = fn(participant_id="12345", text="Hello")
assert "error" in result
assert "oauth" in result["error"].lower()
# ---------------------------------------------------------------------------
# TestAPIErrorHandling
# ---------------------------------------------------------------------------
class TestAPIErrorHandling:
"""Test HTTP error code handling across tools."""
@pytest.mark.parametrize(
"status_code,expected_substring",
[
(401, "invalid or expired"),
(403, "insufficient permissions"),
(404, "not found"),
(429, "rate limit"),
],
)
@patch("aden_tools.tools.x_tool.x_tool.httpx.get")
def test_bearer_error_codes(self, mock_get, status_code, expected_substring, mcp_with_x):
mock_get.return_value = _make_response(status_code)
fn = _get_tool_fn(mcp_with_x, "x_search_tweets")
result = fn(query="test")
assert "error" in result
assert expected_substring in result["error"].lower()
@pytest.mark.parametrize(
"status_code,expected_substring",
[
(401, "oauth 1.0a authentication failed"),
(403, "insufficient permissions"),
(404, "not found"),
(429, "rate limit"),
],
)
@patch("aden_tools.tools.x_tool.x_tool.httpx.post")
def test_oauth_error_codes(self, mock_post, status_code, expected_substring, mcp_with_x):
mock_post.return_value = _make_response(status_code)
fn = _get_tool_fn(mcp_with_x, "x_post_tweet")
result = fn(text="test")
assert "error" in result
assert expected_substring in result["error"].lower()
# ---------------------------------------------------------------------------
# TestToolRegistration
# ---------------------------------------------------------------------------
class TestToolRegistration:
"""Test that all tools are properly registered."""
def test_all_tools_registered(self, mcp_with_x):
tools = mcp_with_x._tool_manager._tools
expected = [
"x_search_tweets",
"x_get_mentions",
"x_post_tweet",
"x_reply_tweet",
"x_delete_tweet",
"x_send_dm",
]
for name in expected:
assert name in tools, f"Tool '{name}' not registered"
def test_tool_count(self, mcp_with_x):
tools = mcp_with_x._tool_manager._tools
x_tools = [name for name in tools if name.startswith("x_")]
assert len(x_tools) == 6
# ---------------------------------------------------------------------------
# TestCredentialSpecs
# ---------------------------------------------------------------------------
class TestCredentialSpecs:
"""Test credential spec definitions."""
def test_x_credential_specs_exist(self):
from aden_tools.credentials.x import X_CREDENTIALS
expected_keys = [
"x_bearer_token",
"x_api_key",
"x_api_secret",
"x_access_token",
"x_access_token_secret",
]
for key in expected_keys:
assert key in X_CREDENTIALS, f"Missing credential spec: {key}"
def test_bearer_token_spec(self):
from aden_tools.credentials.x import X_CREDENTIALS
spec = X_CREDENTIALS["x_bearer_token"]
assert spec.env_var == "X_BEARER_TOKEN"
assert "x_search_tweets" in spec.tools
assert "x_post_tweet" in spec.tools
assert "x_send_dm" in spec.tools
assert spec.credential_group == "x"
def test_oauth_specs_are_optional(self):
from aden_tools.credentials.x import X_CREDENTIALS
for key in ["x_api_key", "x_api_secret", "x_access_token", "x_access_token_secret"]:
assert X_CREDENTIALS[key].required is False
def test_specs_in_merged_registry(self):
from aden_tools.credentials import CREDENTIAL_SPECS
assert "x_bearer_token" in CREDENTIAL_SPECS
assert "x_api_key" in CREDENTIAL_SPECS