Merge pull request #21 from bryanadenhq/feat/tools

Add aden-tools: FastMCP server with core tools
This commit is contained in:
Timothy @aden
2026-01-19 18:52:41 -08:00
committed by GitHub
38 changed files with 1902 additions and 0 deletions
+9
View File
@@ -43,10 +43,19 @@ pnpm-debug.log*
# Testing
coverage/
.nyc_output/
.pytest_cache/
# TypeScript
*.tsbuildinfo
# Python
__pycache__/
*.py[cod]
*$py.class
*.egg-info/
.eggs/
*.egg
# Misc
*.local
.cache/
+186
View File
@@ -0,0 +1,186 @@
# Building Tools for Aden
This guide explains how to create new tools for the Aden agent framework using FastMCP.
## Quick Start Checklist
1. Create folder under `src/aden_tools/tools/<tool_name>/`
2. Implement a `register_tools(mcp: FastMCP)` function using the `@mcp.tool()` decorator
3. Add a `README.md` documenting your tool
4. Register in `src/aden_tools/tools/__init__.py`
5. Add tests in `tests/tools/`
## Tool Structure
Each tool lives in its own folder:
```
src/aden_tools/tools/my_tool/
├── __init__.py # Export register_tools function
├── my_tool.py # Tool implementation
└── README.md # Documentation
```
## Implementation Pattern
Tools use FastMCP's native decorator pattern:
```python
from fastmcp import FastMCP
def register_tools(mcp: FastMCP) -> None:
"""Register my tools with the MCP server."""
@mcp.tool()
def my_tool(
query: str,
limit: int = 10,
) -> dict:
"""
Search for items matching a query.
Use this when you need to find specific information.
Args:
query: The search query (1-500 chars)
limit: Maximum number of results (1-100)
Returns:
Dict with search results or error dict
"""
# Validate inputs
if not query or len(query) > 500:
return {"error": "Query must be 1-500 characters"}
if limit < 1 or limit > 100:
limit = max(1, min(100, limit))
try:
# Your implementation here
results = do_search(query, limit)
return {
"query": query,
"results": results,
"total": len(results),
}
except Exception as e:
return {"error": f"Search failed: {str(e)}"}
```
## Exporting the Tool
In `src/aden_tools/tools/my_tool/__init__.py`:
```python
from .my_tool import register_tools
__all__ = ["register_tools"]
```
In `src/aden_tools/tools/__init__.py`, add to `_TOOL_MODULES`:
```python
_TOOL_MODULES = [
# ... existing tools
"my_tool",
]
```
## Environment Variables
For tools requiring API keys or configuration, check environment variables at runtime:
```python
import os
def register_tools(mcp: FastMCP) -> None:
@mcp.tool()
def my_api_tool(query: str) -> dict:
"""Tool that requires an API key."""
api_key = os.getenv("MY_API_KEY")
if not api_key:
return {
"error": "MY_API_KEY environment variable not set",
"help": "Get an API key at https://example.com/api",
}
# Use the API key...
```
## Best Practices
### Error Handling
Return error dicts instead of raising exceptions:
```python
@mcp.tool()
def my_tool(**kwargs) -> dict:
try:
result = do_work()
return {"success": True, "data": result}
except SpecificError as e:
return {"error": f"Failed to process: {str(e)}"}
except Exception as e:
return {"error": f"Unexpected error: {str(e)}"}
```
### Return Values
- Return dicts for structured data
- Include relevant metadata (query, total count, etc.)
- Use `{"error": "message"}` for errors
### Documentation
The docstring becomes the tool description in MCP. Include:
- What the tool does
- When to use it
- Args with types and constraints
- What it returns
Every tool folder needs a `README.md` with:
- Description and use cases
- Usage examples
- Argument table
- Environment variables (if any)
- Error handling notes
## Testing
Place tests in `tests/tools/test_my_tool.py`:
```python
import pytest
from fastmcp import FastMCP
from aden_tools.tools.my_tool import register_tools
@pytest.fixture
def mcp():
"""Create a FastMCP instance with tools registered."""
server = FastMCP("test")
register_tools(server)
return server
def test_my_tool_basic(mcp):
"""Test basic tool functionality."""
tool_fn = mcp._tool_manager._tools["my_tool"].fn
result = tool_fn(query="test")
assert "results" in result
def test_my_tool_validation(mcp):
"""Test input validation."""
tool_fn = mcp._tool_manager._tools["my_tool"].fn
result = tool_fn(query="")
assert "error" in result
```
Mock external APIs to keep tests fast and deterministic.
## Naming Conventions
- **Folder name**: `snake_case` with `_tool` suffix (e.g., `file_read_tool`)
- **Function name**: `snake_case` (e.g., `file_read`)
- **Tool description**: Clear, actionable docstring
+29
View File
@@ -0,0 +1,29 @@
# Aden Tools MCP Server
# Exposes aden-tools via Model Context Protocol
FROM python:3.11-slim
WORKDIR /app
# Copy project files
COPY pyproject.toml ./
COPY README.md ./
COPY src ./src
COPY mcp_server.py ./
# Install package with all dependencies
RUN pip install --no-cache-dir -e .
# Create non-root user for security
RUN useradd -m -u 1001 appuser && chown -R appuser:appuser /app
USER appuser
# Expose MCP server port
EXPOSE 4001
# Health check - verify server is responding
HEALTHCHECK --interval=30s --timeout=5s --start-period=10s --retries=3 \
CMD python -c "import httpx; httpx.get('http://localhost:4001/health').raise_for_status()" || exit 1
# Run MCP server with HTTP transport
CMD ["python", "mcp_server.py"]
+103
View File
@@ -0,0 +1,103 @@
# Aden Tools
Tool library for the Aden agent framework. Provides a collection of tools that AI agents can use to interact with external systems, process data, and perform actions via the Model Context Protocol (MCP).
## Installation
```bash
pip install -e aden-tools
```
For development:
```bash
pip install -e "aden-tools[dev]"
```
## Quick Start
### As an MCP Server
```python
from fastmcp import FastMCP
from aden_tools.tools import register_all_tools
mcp = FastMCP("aden-tools")
register_all_tools(mcp)
mcp.run()
```
Or run directly:
```bash
python mcp_server.py
```
## Available Tools
| Tool | Description |
|------|-------------|
| `example_tool` | Template tool demonstrating the pattern |
| `file_read` | Read contents of local files |
| `file_write` | Write content to local files |
| `web_search` | Search the web using Brave Search API |
| `web_scrape` | Scrape and extract content from webpages |
| `pdf_read` | Read and extract text from PDF files |
## Project Structure
```
aden-tools/
├── src/aden_tools/
│ ├── __init__.py # Main exports
│ ├── utils/ # Utility functions
│ └── tools/ # Tool implementations
│ ├── example_tool/
│ ├── file_read_tool/
│ ├── file_write_tool/
│ ├── web_search_tool/
│ ├── web_scrape_tool/
│ └── pdf_read_tool/
├── tests/ # Test suite
├── mcp_server.py # MCP server entry point
├── README.md
├── BUILDING_TOOLS.md # Tool development guide
└── pyproject.toml
```
## Creating Custom Tools
Tools use FastMCP's native decorator pattern:
```python
from fastmcp import FastMCP
def register_tools(mcp: FastMCP) -> None:
@mcp.tool()
def my_tool(query: str, limit: int = 10) -> dict:
"""
Search for items matching the query.
Args:
query: The search query
limit: Max results to return
Returns:
Dict with results or error
"""
try:
results = do_search(query, limit)
return {"results": results, "total": len(results)}
except Exception as e:
return {"error": str(e)}
```
See [BUILDING_TOOLS.md](BUILDING_TOOLS.md) for the full guide.
## Documentation
- [Building Tools Guide](BUILDING_TOOLS.md) - How to create new tools
- Individual tool READMEs in `src/aden_tools/tools/*/README.md`
## License
This project is licensed under the Apache License 2.0 - see the [LICENSE](../LICENSE) file for details.
+79
View File
@@ -0,0 +1,79 @@
#!/usr/bin/env python3
"""
Aden Tools MCP Server
Exposes all aden-tools via Model Context Protocol using FastMCP.
Usage:
# Run with HTTP transport (default, for Docker)
python mcp_server.py
# Run with custom port
python mcp_server.py --port 8001
# Run with STDIO transport (for local testing)
python mcp_server.py --stdio
Environment Variables:
MCP_PORT - Server port (default: 4001)
BRAVE_SEARCH_API_KEY - Required for web_search tool
"""
import argparse
import os
from fastmcp import FastMCP
from starlette.requests import Request
from starlette.responses import PlainTextResponse
mcp = FastMCP("aden-tools")
# Register all tools with the MCP server
from aden_tools.tools import register_all_tools
tools = register_all_tools(mcp)
print(f"[MCP] Registered {len(tools)} tools: {tools}")
@mcp.custom_route("/health", methods=["GET"])
async def health_check(request: Request) -> PlainTextResponse:
"""Health check endpoint for container orchestration."""
return PlainTextResponse("OK")
@mcp.custom_route("/", methods=["GET"])
async def index(request: Request) -> PlainTextResponse:
"""Landing page for browser visits."""
return PlainTextResponse("Welcome to the Hive MCP Server")
def main() -> None:
"""Entry point for the MCP server."""
parser = argparse.ArgumentParser(description="Aden Tools MCP Server")
parser.add_argument(
"--port",
type=int,
default=int(os.getenv("MCP_PORT", "4001")),
help="HTTP server port (default: 4001)",
)
parser.add_argument(
"--host",
default="0.0.0.0",
help="HTTP server host (default: 0.0.0.0)",
)
parser.add_argument(
"--stdio",
action="store_true",
help="Use STDIO transport instead of HTTP",
)
args = parser.parse_args()
if args.stdio:
print("[MCP] Starting with STDIO transport")
mcp.run(transport="stdio")
else:
print(f"[MCP] Starting HTTP server on {args.host}:{args.port}")
mcp.run(transport="http", host=args.host, port=args.port)
if __name__ == "__main__":
main()
+59
View File
@@ -0,0 +1,59 @@
[project]
name = "aden-tools"
version = "0.1.0"
description = "Tools library for the Aden agent framework"
readme = "README.md"
requires-python = ">=3.10"
license = { text = "Apache-2.0" }
authors = [
{ name = "Aden", email = "team@aden.ai" }
]
keywords = ["ai", "agents", "tools", "llm"]
classifiers = [
"Development Status :: 3 - Alpha",
"Intended Audience :: Developers",
"License :: OSI Approved :: Apache Software License",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
]
dependencies = [
"pydantic>=2.0.0",
"httpx>=0.27.0",
"beautifulsoup4>=4.12.0",
"pypdf>=4.0.0",
"pandas>=2.0.0",
"jsonpath-ng>=1.6.0",
"fastmcp>=2.0.0",
]
[project.optional-dependencies]
dev = [
"pytest>=7.0.0",
"pytest-asyncio>=0.21.0",
]
sandbox = [
"RestrictedPython>=7.0",
]
ocr = [
"pytesseract>=0.3.10",
"pillow>=10.0.0",
]
all = [
"RestrictedPython>=7.0",
"pytesseract>=0.3.10",
"pillow>=10.0.0",
]
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["src/aden_tools"]
[tool.pytest.ini_options]
testpaths = ["tests"]
asyncio_mode = "auto"
+30
View File
@@ -0,0 +1,30 @@
"""
Aden Tools - Tool library for the Aden agent framework.
Tools provide capabilities that AI agents can use to interact with
external systems, process data, and perform actions.
Usage:
from fastmcp import FastMCP
from aden_tools.tools import register_all_tools
mcp = FastMCP("my-server")
register_all_tools(mcp)
"""
__version__ = "0.1.0"
# Utilities
from .utils import get_env_var
# MCP registration
from .tools import register_all_tools
__all__ = [
# Version
"__version__",
# Utilities
"get_env_var",
# MCP registration
"register_all_tools",
]
@@ -0,0 +1,51 @@
"""
Aden Tools - Tool implementations for FastMCP.
Usage:
from fastmcp import FastMCP
from aden_tools.tools import register_all_tools
mcp = FastMCP("my-server")
register_all_tools(mcp)
"""
from typing import List
from fastmcp import FastMCP
# Import register_tools from each tool module
from .example_tool import register_tools as register_example
from .file_read_tool import register_tools as register_file_read
from .file_write_tool import register_tools as register_file_write
from .web_search_tool import register_tools as register_web_search
from .web_scrape_tool import register_tools as register_web_scrape
from .pdf_read_tool import register_tools as register_pdf_read
def register_all_tools(mcp: FastMCP) -> List[str]:
"""
Register all aden-tools with a FastMCP server.
Args:
mcp: FastMCP server instance
Returns:
List of registered tool names
"""
register_example(mcp)
register_file_read(mcp)
register_file_write(mcp)
register_web_search(mcp)
register_web_scrape(mcp)
register_pdf_read(mcp)
return [
"example_tool",
"file_read",
"file_write",
"web_search",
"web_scrape",
"pdf_read",
]
__all__ = ["register_all_tools"]
@@ -0,0 +1,26 @@
# Example Tool
A template tool demonstrating the Aden tools pattern.
## Description
This tool processes text messages with optional transformations. It serves as a reference implementation for creating new tools using the FastMCP decorator pattern.
## Arguments
| Argument | Type | Required | Default | Description |
|----------|------|----------|---------|-------------|
| `message` | str | Yes | - | The message to process (1-1000 chars) |
| `uppercase` | bool | No | `False` | Convert message to uppercase |
| `repeat` | int | No | `1` | Number of times to repeat (1-10) |
## Environment Variables
This tool does not require any environment variables.
## Error Handling
Returns error strings for validation issues:
- `Error: message must be 1-1000 characters` - Empty or too long message
- `Error: repeat must be 1-10` - Repeat value out of range
- `Error processing message: <error>` - Unexpected error
@@ -0,0 +1,4 @@
"""Example Tool package."""
from .example_tool import register_tools
__all__ = ["register_tools"]
@@ -0,0 +1,51 @@
"""
Example Tool - A simple text processing tool for FastMCP.
Demonstrates native FastMCP tool registration pattern.
"""
from __future__ import annotations
from fastmcp import FastMCP
def register_tools(mcp: FastMCP) -> None:
"""Register example tools with the MCP server."""
@mcp.tool()
def example_tool(
message: str,
uppercase: bool = False,
repeat: int = 1,
) -> str:
"""
A simple example tool that processes text messages.
Use this tool when you need to transform or repeat text.
Args:
message: The message to process (1-1000 chars)
uppercase: If True, convert the message to uppercase
repeat: Number of times to repeat the message (1-10)
Returns:
The processed message string
"""
try:
# Validate inputs
if not message or len(message) > 1000:
return "Error: message must be 1-1000 characters"
if repeat < 1 or repeat > 10:
return "Error: repeat must be 1-10"
# Process the message
result = message
if uppercase:
result = result.upper()
# Repeat if requested
if repeat > 1:
result = " ".join([result] * repeat)
return result
except Exception as e:
return f"Error processing message: {str(e)}"
@@ -0,0 +1,28 @@
# File Read Tool
Read contents of local files with encoding support.
## Description
Use for reading configs, data files, source code, logs, or any text file. Returns file content along with path, name, size, and encoding metadata.
## Arguments
| Argument | Type | Required | Default | Description |
|----------|------|----------|---------|-------------|
| `file_path` | str | Yes | - | Path to the file to read (absolute or relative) |
| `encoding` | str | No | `utf-8` | File encoding (utf-8, latin-1, etc.) |
| `max_size` | int | No | `10000000` | Maximum file size to read in bytes (default 10MB) |
## Environment Variables
This tool does not require any environment variables.
## Error Handling
Returns error dicts for common issues:
- `File not found: <path>` - File does not exist
- `Not a file: <path>` - Path points to a directory
- `File too large: <size> bytes (max: <max_size>)` - File exceeds max_size limit
- `Failed to decode file with encoding '<encoding>'` - Wrong encoding specified
- `Permission denied: <path>` - No read access to file
@@ -0,0 +1,4 @@
"""File Read Tool - Read contents of local files."""
from .file_read_tool import register_tools
__all__ = ["register_tools"]
@@ -0,0 +1,75 @@
"""
File Read Tool - Read contents of local files.
Supports reading text files with various encodings.
Returns file content along with metadata.
"""
from __future__ import annotations
from pathlib import Path
from fastmcp import FastMCP
def register_tools(mcp: FastMCP) -> None:
"""Register file read tools with the MCP server."""
@mcp.tool()
def file_read(
file_path: str,
encoding: str = "utf-8",
max_size: int = 10_000_000,
) -> dict:
"""
Read the contents of a local file.
Use for reading configs, data files, source code, logs, or any text file.
Returns file content along with path, name, size, and encoding.
Args:
file_path: Path to the file to read (absolute or relative)
encoding: File encoding (utf-8, latin-1, etc.)
max_size: Maximum file size to read in bytes (default 10MB)
Returns:
Dict with file content and metadata, or error dict
"""
try:
path = Path(file_path).resolve()
# Check if file exists
if not path.exists():
return {"error": f"File not found: {file_path}"}
# Check if it's a file (not directory)
if not path.is_file():
return {"error": f"Not a file: {file_path}"}
# Check file size
file_size = path.stat().st_size
if max_size > 0 and file_size > max_size:
return {
"error": f"File too large: {file_size} bytes (max: {max_size})",
"file_size": file_size,
}
# Read the file
content = path.read_text(encoding=encoding)
return {
"path": str(path),
"name": path.name,
"content": content,
"size": len(content),
"encoding": encoding,
}
except UnicodeDecodeError as e:
return {
"error": f"Failed to decode file with encoding '{encoding}': {str(e)}",
"suggestion": "Try a different encoding like 'latin-1' or 'cp1252'",
}
except PermissionError:
return {"error": f"Permission denied: {file_path}"}
except Exception as e:
return {"error": f"Failed to read file: {str(e)}"}
@@ -0,0 +1,29 @@
# File Write Tool
Write content to local files with encoding support.
## Description
Can create new files or overwrite/append to existing ones. Use for saving data, creating configs, writing reports, or exporting results. Optionally creates parent directories if they don't exist.
## Arguments
| Argument | Type | Required | Default | Description |
|----------|------|----------|---------|-------------|
| `file_path` | str | Yes | - | Path to the file to write (absolute or relative) |
| `content` | str | Yes | - | Content to write to the file |
| `encoding` | str | No | `utf-8` | File encoding (utf-8, latin-1, etc.) |
| `mode` | str | No | `write` | Write mode - 'write' (overwrite) or 'append' |
| `create_dirs` | bool | No | `True` | Create parent directories if they don't exist |
## Environment Variables
This tool does not require any environment variables.
## Error Handling
Returns error dicts for common issues:
- `Parent directory does not exist: <path>` - Parent dir missing and create_dirs=False
- `Invalid mode: <mode>. Use 'write' or 'append'.` - Invalid mode specified
- `Permission denied: <path>` - No write access to file/directory
- `OS error writing file: <error>` - Filesystem error
@@ -0,0 +1,4 @@
"""File Write Tool - Create or update local files."""
from .file_write_tool import register_tools
__all__ = ["register_tools"]
@@ -0,0 +1,83 @@
"""
File Write Tool - Create or update local files.
Supports writing text files with various encodings.
Can create directories if they don't exist.
"""
from __future__ import annotations
from pathlib import Path
from fastmcp import FastMCP
def register_tools(mcp: FastMCP) -> None:
"""Register file write tools with the MCP server."""
@mcp.tool()
def file_write(
file_path: str,
content: str,
encoding: str = "utf-8",
mode: str = "write",
create_dirs: bool = True,
) -> dict:
"""
Write content to a local file.
Can create new files or overwrite/append to existing ones.
Use for saving data, creating configs, writing reports, or exporting results.
Args:
file_path: Path to the file to write (absolute or relative)
content: Content to write to the file
encoding: File encoding (utf-8, latin-1, etc.)
mode: Write mode - 'write' (overwrite) or 'append'
create_dirs: Create parent directories if they don't exist
Returns:
Dict with write result or error dict
"""
try:
path = Path(file_path).resolve()
# Create parent directories if requested
if create_dirs:
path.parent.mkdir(parents=True, exist_ok=True)
elif not path.parent.exists():
return {"error": f"Parent directory does not exist: {path.parent}"}
# Determine write mode
if mode == "append":
write_mode = "a"
elif mode == "write":
write_mode = "w"
else:
return {"error": f"Invalid mode: {mode}. Use 'write' or 'append'."}
# Check if we're overwriting
existed = path.exists()
previous_size = path.stat().st_size if existed else 0
# Write the file
with open(path, write_mode, encoding=encoding) as f:
f.write(content)
new_size = path.stat().st_size
return {
"path": str(path),
"name": path.name,
"bytes_written": len(content.encode(encoding)),
"total_size": new_size,
"mode": mode,
"created": not existed,
"previous_size": previous_size if existed else None,
}
except PermissionError:
return {"error": f"Permission denied: {file_path}"}
except OSError as e:
return {"error": f"OS error writing file: {str(e)}"}
except Exception as e:
return {"error": f"Failed to write file: {str(e)}"}
@@ -0,0 +1,37 @@
# PDF Read Tool
Read and extract text content from PDF files.
## Description
Returns text content with page markers and optional metadata. Use for reading PDFs, reports, documents, or any PDF file.
## Arguments
| Argument | Type | Required | Default | Description |
|----------|------|----------|---------|-------------|
| `file_path` | str | Yes | - | Path to the PDF file to read (absolute or relative) |
| `pages` | str | No | `None` | Page range - 'all'/None for all, '5' for single, '1-10' for range, '1,3,5' for specific |
| `max_pages` | int | No | `100` | Maximum pages to process (1-1000, for memory safety) |
| `include_metadata` | bool | No | `True` | Include PDF metadata (author, title, creation date, etc.) |
## Environment Variables
This tool does not require any environment variables.
## Error Handling
Returns error dicts for common issues:
- `PDF file not found: <path>` - File does not exist
- `Not a file: <path>` - Path points to a directory
- `Not a PDF file (expected .pdf): <path>` - Wrong file extension
- `Cannot read encrypted PDF. Password required.` - PDF is password-protected
- `Page <num> out of range. PDF has <total> pages.` - Invalid page number
- `Invalid page format: '<pages>'` - Malformed page range string
- `Permission denied: <path>` - No read access to file
## Notes
- Page numbers in the `pages` argument are 1-indexed (first page is 1, not 0)
- Text is extracted with page markers: `--- Page N ---`
- Metadata includes: title, author, subject, creator, producer, created, modified
@@ -0,0 +1,4 @@
"""PDF Read Tool - Parse and extract text from PDF files."""
from .pdf_read_tool import register_tools
__all__ = ["register_tools"]
@@ -0,0 +1,157 @@
"""
PDF Read Tool - Parse and extract text from PDF files.
Uses pypdf to read PDF documents and extract text content
along with metadata.
"""
from __future__ import annotations
from pathlib import Path
from typing import Any, List
from fastmcp import FastMCP
from pypdf import PdfReader
def register_tools(mcp: FastMCP) -> None:
"""Register PDF read tools with the MCP server."""
def parse_page_range(
pages: str | None, total_pages: int, max_pages: int
) -> List[int] | dict:
"""
Parse page range string into list of 0-indexed page numbers.
Returns list of indices or error dict.
"""
if pages is None or pages.lower() == "all":
indices = list(range(min(total_pages, max_pages)))
return indices
try:
# Single page: "5"
if pages.isdigit():
page_num = int(pages)
if page_num < 1 or page_num > total_pages:
return {"error": f"Page {page_num} out of range. PDF has {total_pages} pages."}
return [page_num - 1]
# Range: "1-10"
if "-" in pages and "," not in pages:
start_str, end_str = pages.split("-", 1)
start, end = int(start_str), int(end_str)
if start > end:
return {"error": f"Invalid page range: {pages}. Start must be less than end."}
if start < 1:
return {"error": f"Page numbers start at 1, got {start}."}
if end > total_pages:
return {"error": f"Page {end} out of range. PDF has {total_pages} pages."}
indices = list(range(start - 1, min(end, start - 1 + max_pages)))
return indices
# Comma-separated: "1,3,5"
if "," in pages:
page_nums = [int(p.strip()) for p in pages.split(",")]
for p in page_nums:
if p < 1 or p > total_pages:
return {"error": f"Page {p} out of range. PDF has {total_pages} pages."}
indices = [p - 1 for p in page_nums[:max_pages]]
return indices
return {"error": f"Invalid page format: '{pages}'. Use 'all', '5', '1-10', or '1,3,5'."}
except ValueError as e:
return {"error": f"Invalid page format: '{pages}'. {str(e)}"}
@mcp.tool()
def pdf_read(
file_path: str,
pages: str | None = None,
max_pages: int = 100,
include_metadata: bool = True,
) -> dict:
"""
Read and extract text content from a PDF file.
Returns text content with page markers and optional metadata.
Use for reading PDFs, reports, documents, or any PDF file.
Args:
file_path: Path to the PDF file to read (absolute or relative)
pages: Page range to extract - 'all'/None for all, '5' for single, '1-10' for range, '1,3,5' for specific
max_pages: Maximum number of pages to process (1-1000, memory safety)
include_metadata: Include PDF metadata (author, title, creation date, etc.)
Returns:
Dict with extracted text and metadata, or error dict
"""
try:
path = Path(file_path).resolve()
# Validate file exists
if not path.exists():
return {"error": f"PDF file not found: {file_path}"}
if not path.is_file():
return {"error": f"Not a file: {file_path}"}
# Check extension
if path.suffix.lower() != ".pdf":
return {"error": f"Not a PDF file (expected .pdf): {file_path}"}
# Validate max_pages
if max_pages < 1:
max_pages = 1
elif max_pages > 1000:
max_pages = 1000
# Open and read PDF
reader = PdfReader(path)
# Check for encryption
if reader.is_encrypted:
return {"error": "Cannot read encrypted PDF. Password required."}
total_pages = len(reader.pages)
# Parse page range
page_indices = parse_page_range(pages, total_pages, max_pages)
if isinstance(page_indices, dict): # Error dict
return page_indices
# Extract text from pages
content_parts = []
for i in page_indices:
page_text = reader.pages[i].extract_text() or ""
content_parts.append(f"--- Page {i + 1} ---\n{page_text}")
content = "\n\n".join(content_parts)
result: dict[str, Any] = {
"path": str(path),
"name": path.name,
"total_pages": total_pages,
"pages_extracted": len(page_indices),
"content": content,
"char_count": len(content),
}
# Add metadata if requested
if include_metadata and reader.metadata:
meta = reader.metadata
result["metadata"] = {
"title": meta.get("/Title"),
"author": meta.get("/Author"),
"subject": meta.get("/Subject"),
"creator": meta.get("/Creator"),
"producer": meta.get("/Producer"),
"created": str(meta.get("/CreationDate")) if meta.get("/CreationDate") else None,
"modified": str(meta.get("/ModDate")) if meta.get("/ModDate") else None,
}
return result
except PermissionError:
return {"error": f"Permission denied: {file_path}"}
except Exception as e:
return {"error": f"Failed to read PDF: {str(e)}"}
@@ -0,0 +1,36 @@
# Web Scrape Tool
Scrape and extract text content from webpages.
## Description
Use when you need to read the content of a specific URL, extract data from a website, or read articles/documentation. Automatically removes noise elements (scripts, navigation, footers) and extracts the main content.
## Arguments
| Argument | Type | Required | Default | Description |
|----------|------|----------|---------|-------------|
| `url` | str | Yes | - | URL of the webpage to scrape |
| `selector` | str | No | `None` | CSS selector to target specific content (e.g., 'article', '.main-content') |
| `include_links` | bool | No | `False` | Include extracted links in the response |
| `max_length` | int | No | `50000` | Maximum length of extracted text (1000-500000) |
## Environment Variables
This tool does not require any environment variables.
## Error Handling
Returns error dicts for common issues:
- `HTTP <status>: Failed to fetch URL` - Server returned error status
- `No elements found matching selector: <selector>` - CSS selector matched nothing
- `Request timed out` - Request exceeded 30s timeout
- `Network error: <error>` - Connection or DNS issues
- `Scraping failed: <error>` - HTML parsing or other error
## Notes
- URLs without protocol are automatically prefixed with `https://`
- Follows redirects automatically
- Removes script, style, nav, footer, header, aside, noscript, and iframe elements
- Auto-detects main content using article, main, or common content class selectors
@@ -0,0 +1,4 @@
"""Web Scrape Tool - Extract content from web pages."""
from .web_scrape_tool import register_tools
__all__ = ["register_tools"]
@@ -0,0 +1,134 @@
"""
Web Scrape Tool - Extract content from web pages.
Uses httpx for requests and BeautifulSoup for HTML parsing.
Returns clean text content from web pages.
"""
from __future__ import annotations
from typing import Any, List
import httpx
from bs4 import BeautifulSoup
from fastmcp import FastMCP
def register_tools(mcp: FastMCP) -> None:
"""Register web scrape tools with the MCP server."""
@mcp.tool()
def web_scrape(
url: str,
selector: str | None = None,
include_links: bool = False,
max_length: int = 50000,
) -> dict:
"""
Scrape and extract text content from a webpage.
Use when you need to read the content of a specific URL,
extract data from a website, or read articles/documentation.
Args:
url: URL of the webpage to scrape
selector: CSS selector to target specific content (e.g., 'article', '.main-content')
include_links: Include extracted links in the response
max_length: Maximum length of extracted text (1000-500000)
Returns:
Dict with scraped content (url, title, description, content, length) or error dict
"""
try:
# Validate URL
if not url.startswith(("http://", "https://")):
url = "https://" + url
# Validate max_length
if max_length < 1000:
max_length = 1000
elif max_length > 500000:
max_length = 500000
# Make request
response = httpx.get(
url,
headers={
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
},
follow_redirects=True,
timeout=30.0,
)
if response.status_code != 200:
return {"error": f"HTTP {response.status_code}: Failed to fetch URL"}
# Parse HTML
soup = BeautifulSoup(response.text, "html.parser")
# Remove noise elements
for tag in soup(["script", "style", "nav", "footer", "header", "aside", "noscript", "iframe"]):
tag.decompose()
# Get title and description
title = ""
title_tag = soup.find("title")
if title_tag:
title = title_tag.get_text(strip=True)
description = ""
meta_desc = soup.find("meta", attrs={"name": "description"})
if meta_desc:
description = meta_desc.get("content", "")
# Target content
if selector:
content_elem = soup.select_one(selector)
if not content_elem:
return {"error": f"No elements found matching selector: {selector}"}
text = content_elem.get_text(separator=" ", strip=True)
else:
# Auto-detect main content
main_content = (
soup.find("article")
or soup.find("main")
or soup.find(attrs={"role": "main"})
or soup.find(class_=["content", "post", "entry", "article-body"])
or soup.find("body")
)
text = main_content.get_text(separator=" ", strip=True) if main_content else ""
# Clean up whitespace
text = " ".join(text.split())
# Truncate if needed
if len(text) > max_length:
text = text[:max_length] + "..."
result: dict[str, Any] = {
"url": str(response.url),
"title": title,
"description": description,
"content": text,
"length": len(text),
}
# Extract links if requested
if include_links:
links: List[dict[str, str]] = []
for a in soup.find_all("a", href=True)[:50]:
href = a["href"]
link_text = a.get_text(strip=True)
if link_text and href:
links.append({"text": link_text, "href": href})
result["links"] = links
return result
except httpx.TimeoutException:
return {"error": "Request timed out"}
except httpx.RequestError as e:
return {"error": f"Network error: {str(e)}"}
except Exception as e:
return {"error": f"Scraping failed: {str(e)}"}
@@ -0,0 +1,31 @@
# Web Search Tool
Search the web using the Brave Search API.
## Description
Returns titles, URLs, and snippets for search results. Use when you need current information, research topics, or find websites.
## Arguments
| Argument | Type | Required | Default | Description |
|----------|------|----------|---------|-------------|
| `query` | str | Yes | - | The search query (1-500 chars) |
| `num_results` | int | No | `10` | Number of results to return (1-20) |
| `country` | str | No | `us` | Country code for localized results (us, uk, de, etc.) |
## Environment Variables
| Variable | Required | Description |
|----------|----------|-------------|
| `BRAVE_SEARCH_API_KEY` | Yes | API key from [Brave Search API](https://brave.com/search/api/) |
## Error Handling
Returns error dicts for common issues:
- `BRAVE_SEARCH_API_KEY environment variable not set` - Missing API key
- `Query must be 1-500 characters` - Empty or too long query
- `Invalid API key` - API key rejected (HTTP 401)
- `Rate limit exceeded. Try again later.` - Too many requests (HTTP 429)
- `Search request timed out` - Request exceeded 30s timeout
- `Network error: <error>` - Connection or DNS issues
@@ -0,0 +1,4 @@
"""Web Search Tool - Search the web using Brave Search API."""
from .web_search_tool import register_tools
__all__ = ["register_tools"]
@@ -0,0 +1,100 @@
"""
Web Search Tool - Search the web using Brave Search API.
Requires BRAVE_SEARCH_API_KEY environment variable.
Returns search results with titles, URLs, and snippets.
"""
from __future__ import annotations
import os
import httpx
from fastmcp import FastMCP
def register_tools(mcp: FastMCP) -> None:
"""Register web search tools with the MCP server."""
@mcp.tool()
def web_search(
query: str,
num_results: int = 10,
country: str = "us",
) -> dict:
"""
Search the web for information using Brave Search API.
Returns titles, URLs, and snippets. Use when you need current
information, research, or to find websites.
Requires BRAVE_SEARCH_API_KEY environment variable.
Args:
query: The search query (1-500 chars)
num_results: Number of results to return (1-20)
country: Country code for localized results (us, uk, de, etc.)
Returns:
Dict with search results or error dict
"""
api_key = os.getenv("BRAVE_SEARCH_API_KEY")
if not api_key:
return {
"error": "BRAVE_SEARCH_API_KEY environment variable not set",
"help": "Get an API key at https://brave.com/search/api/",
}
# Validate inputs
if not query or len(query) > 500:
return {"error": "Query must be 1-500 characters"}
if num_results < 1 or num_results > 20:
num_results = max(1, min(20, num_results))
try:
# Make request to Brave Search API
response = httpx.get(
"https://api.search.brave.com/res/v1/web/search",
params={
"q": query,
"count": num_results,
"country": country,
},
headers={
"X-Subscription-Token": api_key,
"Accept": "application/json",
},
timeout=30.0,
)
if response.status_code == 401:
return {"error": "Invalid API key"}
elif response.status_code == 429:
return {"error": "Rate limit exceeded. Try again later."}
elif response.status_code != 200:
return {"error": f"API request failed: HTTP {response.status_code}"}
data = response.json()
# Extract results
results = []
web_results = data.get("web", {}).get("results", [])
for item in web_results[:num_results]:
results.append({
"title": item.get("title", ""),
"url": item.get("url", ""),
"snippet": item.get("description", ""),
})
return {
"query": query,
"results": results,
"total": len(results),
}
except httpx.TimeoutException:
return {"error": "Search request timed out"}
except httpx.RequestError as e:
return {"error": f"Network error: {str(e)}"}
except Exception as e:
return {"error": f"Search failed: {str(e)}"}
@@ -0,0 +1,6 @@
"""
Utility functions for Aden Tools.
"""
from .env_helpers import get_env_var
__all__ = ["get_env_var"]
@@ -0,0 +1,35 @@
"""
Environment variable helpers for Aden Tools.
"""
from __future__ import annotations
import os
from typing import Optional
def get_env_var(
name: str,
default: Optional[str] = None,
required: bool = False,
) -> Optional[str]:
"""
Get an environment variable with optional default and required validation.
Args:
name: Name of the environment variable
default: Default value if not set
required: If True, raises ValueError when not set and no default
Returns:
The environment variable value or default
Raises:
ValueError: If required=True and variable is not set with no default
"""
value = os.environ.get(name, default)
if required and value is None:
raise ValueError(
f"Required environment variable '{name}' is not set. "
f"Please set it before using this tool."
)
return value
+1
View File
@@ -0,0 +1 @@
"""Aden Tools test suite."""
+43
View File
@@ -0,0 +1,43 @@
"""Shared fixtures for aden-tools tests."""
import pytest
from pathlib import Path
from fastmcp import FastMCP
@pytest.fixture
def mcp() -> FastMCP:
"""Create a fresh FastMCP instance for testing."""
return FastMCP("test-server")
@pytest.fixture
def sample_text_file(tmp_path: Path) -> Path:
"""Create a simple text file for testing."""
txt_file = tmp_path / "test.txt"
txt_file.write_text("Hello, World!\nLine 2\nLine 3")
return txt_file
@pytest.fixture
def sample_csv(tmp_path: Path) -> Path:
"""Create a simple CSV file for testing."""
csv_file = tmp_path / "test.csv"
csv_file.write_text("name,age,city\nAlice,30,NYC\nBob,25,LA\nCharlie,35,Chicago\n")
return csv_file
@pytest.fixture
def sample_json(tmp_path: Path) -> Path:
"""Create a simple JSON file for testing."""
json_file = tmp_path / "test.json"
json_file.write_text('{"users": [{"name": "Alice", "age": 30}, {"name": "Bob", "age": 25}]}')
return json_file
@pytest.fixture
def large_text_file(tmp_path: Path) -> Path:
"""Create a large text file for size limit testing."""
large_file = tmp_path / "large.txt"
large_file.write_text("x" * 20_000_000) # 20MB
return large_file
+50
View File
@@ -0,0 +1,50 @@
"""Tests for environment variable helpers."""
import pytest
from aden_tools.utils import get_env_var
class TestGetEnvVar:
"""Tests for get_env_var function."""
def test_returns_value_when_set(self, monkeypatch):
"""Returns the environment variable value when set."""
monkeypatch.setenv("TEST_VAR", "test_value")
result = get_env_var("TEST_VAR")
assert result == "test_value"
def test_returns_default_when_not_set(self, monkeypatch):
"""Returns default value when variable is not set."""
monkeypatch.delenv("UNSET_VAR", raising=False)
result = get_env_var("UNSET_VAR", default="default_value")
assert result == "default_value"
def test_returns_none_when_not_set_and_no_default(self, monkeypatch):
"""Returns None when variable is not set and no default provided."""
monkeypatch.delenv("UNSET_VAR", raising=False)
result = get_env_var("UNSET_VAR")
assert result is None
def test_raises_when_required_and_missing(self, monkeypatch):
"""Raises ValueError when required=True and variable is missing."""
monkeypatch.delenv("REQUIRED_VAR", raising=False)
with pytest.raises(ValueError) as exc_info:
get_env_var("REQUIRED_VAR", required=True)
assert "REQUIRED_VAR" in str(exc_info.value)
assert "not set" in str(exc_info.value)
def test_returns_value_when_required_and_set(self, monkeypatch):
"""Returns value when required=True and variable is set."""
monkeypatch.setenv("REQUIRED_VAR", "my_value")
result = get_env_var("REQUIRED_VAR", required=True)
assert result == "my_value"
+1
View File
@@ -0,0 +1 @@
"""Tool-specific tests."""
@@ -0,0 +1,96 @@
"""Tests for file_read tool (FastMCP)."""
import pytest
from pathlib import Path
from fastmcp import FastMCP
from aden_tools.tools.file_read_tool import register_tools
@pytest.fixture
def file_read_fn(mcp: FastMCP):
"""Register and return the file_read tool function."""
register_tools(mcp)
# Access the registered tool's function directly
return mcp._tool_manager._tools["file_read"].fn
class TestFileReadTool:
"""Tests for file_read tool."""
def test_read_existing_file(self, file_read_fn, sample_text_file: Path):
"""Reading an existing file returns content and metadata."""
result = file_read_fn(file_path=str(sample_text_file))
assert "error" not in result
assert result["content"] == "Hello, World!\nLine 2\nLine 3"
assert result["name"] == "test.txt"
assert result["encoding"] == "utf-8"
assert "size" in result
def test_read_file_not_found(self, file_read_fn, tmp_path: Path):
"""Reading a non-existent file returns an error dict."""
missing_file = tmp_path / "does_not_exist.txt"
result = file_read_fn(file_path=str(missing_file))
assert "error" in result
assert "not found" in result["error"].lower()
def test_read_directory_returns_error(self, file_read_fn, tmp_path: Path):
"""Reading a directory (not a file) returns an error."""
result = file_read_fn(file_path=str(tmp_path))
assert "error" in result
assert "not a file" in result["error"].lower()
def test_read_file_too_large(self, file_read_fn, tmp_path: Path):
"""Reading a file exceeding max_size returns an error."""
large_file = tmp_path / "large.txt"
large_file.write_text("x" * 1000)
result = file_read_fn(file_path=str(large_file), max_size=100)
assert "error" in result
assert "too large" in result["error"].lower()
assert "file_size" in result
def test_read_with_no_size_limit(self, file_read_fn, tmp_path: Path):
"""Reading with max_size=0 allows any file size."""
large_file = tmp_path / "large.txt"
content = "x" * 100_000
large_file.write_text(content)
# max_size=0 means no limit in the implementation
result = file_read_fn(file_path=str(large_file), max_size=0)
assert "error" not in result
assert result["content"] == content
def test_read_with_different_encoding(self, file_read_fn, tmp_path: Path):
"""Reading with a specific encoding works."""
latin_file = tmp_path / "latin.txt"
# Write bytes directly with latin-1 encoding
latin_file.write_bytes("café".encode("latin-1"))
result = file_read_fn(file_path=str(latin_file), encoding="latin-1")
assert "error" not in result
assert result["content"] == "café"
assert result["encoding"] == "latin-1"
def test_read_with_wrong_encoding_returns_error(self, file_read_fn, tmp_path: Path):
"""Reading with wrong encoding returns helpful error."""
# Create a file with bytes that aren't valid UTF-8
binary_file = tmp_path / "binary.txt"
binary_file.write_bytes(b"\xff\xfe")
result = file_read_fn(file_path=str(binary_file), encoding="utf-8")
assert "error" in result
assert "suggestion" in result
def test_returns_absolute_path(self, file_read_fn, sample_text_file: Path):
"""Result includes the absolute path."""
result = file_read_fn(file_path=str(sample_text_file))
assert result["path"] == str(sample_text_file.resolve())
@@ -0,0 +1,99 @@
"""Tests for file_write tool (FastMCP)."""
import pytest
from pathlib import Path
from fastmcp import FastMCP
from aden_tools.tools.file_write_tool import register_tools
@pytest.fixture
def file_write_fn(mcp: FastMCP):
"""Register and return the file_write tool function."""
register_tools(mcp)
return mcp._tool_manager._tools["file_write"].fn
class TestFileWriteTool:
"""Tests for file_write tool."""
def test_write_creates_new_file(self, file_write_fn, tmp_path: Path):
"""Writing to a new file creates it with content."""
new_file = tmp_path / "new.txt"
result = file_write_fn(file_path=str(new_file), content="Hello, World!")
assert "error" not in result
assert result["created"] is True
assert result["name"] == "new.txt"
assert new_file.read_text() == "Hello, World!"
def test_write_overwrites_existing(self, file_write_fn, tmp_path: Path):
"""Writing to existing file overwrites by default."""
existing = tmp_path / "existing.txt"
existing.write_text("old content")
result = file_write_fn(file_path=str(existing), content="new content")
assert "error" not in result
assert result["created"] is False
assert result["previous_size"] is not None
assert existing.read_text() == "new content"
def test_write_appends_to_existing(self, file_write_fn, tmp_path: Path):
"""Writing with mode='append' adds to existing content."""
existing = tmp_path / "existing.txt"
existing.write_text("line1\n")
result = file_write_fn(file_path=str(existing), content="line2\n", mode="append")
assert "error" not in result
assert result["mode"] == "append"
assert existing.read_text() == "line1\nline2\n"
def test_write_creates_parent_dirs(self, file_write_fn, tmp_path: Path):
"""Writing with create_dirs=True creates missing directories."""
deep_path = tmp_path / "nested" / "dirs" / "file.txt"
result = file_write_fn(file_path=str(deep_path), content="content", create_dirs=True)
assert "error" not in result
assert deep_path.exists()
assert deep_path.read_text() == "content"
def test_write_fails_without_parent_dir(self, file_write_fn, tmp_path: Path):
"""Writing with create_dirs=False fails if parent doesn't exist."""
missing_dir = tmp_path / "missing" / "file.txt"
result = file_write_fn(file_path=str(missing_dir), content="content", create_dirs=False)
assert "error" in result
assert "parent directory" in result["error"].lower()
def test_write_invalid_mode(self, file_write_fn, tmp_path: Path):
"""Writing with invalid mode returns error."""
result = file_write_fn(
file_path=str(tmp_path / "test.txt"),
content="content",
mode="invalid"
)
assert "error" in result
assert "invalid mode" in result["error"].lower()
def test_write_returns_bytes_written(self, file_write_fn, tmp_path: Path):
"""Result includes accurate bytes_written count."""
content = "Hello, World!"
result = file_write_fn(file_path=str(tmp_path / "test.txt"), content=content)
assert result["bytes_written"] == len(content.encode("utf-8"))
def test_write_with_encoding(self, file_write_fn, tmp_path: Path):
"""Writing with specific encoding works."""
file_path = tmp_path / "latin.txt"
result = file_write_fn(file_path=str(file_path), content="café", encoding="latin-1")
assert "error" not in result
# Verify it was written with latin-1 encoding
assert file_path.read_bytes() == "café".encode("latin-1")
@@ -0,0 +1,80 @@
"""Tests for pdf_read tool (FastMCP)."""
import pytest
from pathlib import Path
from fastmcp import FastMCP
from aden_tools.tools.pdf_read_tool import register_tools
@pytest.fixture
def pdf_read_fn(mcp: FastMCP):
"""Register and return the pdf_read tool function."""
register_tools(mcp)
return mcp._tool_manager._tools["pdf_read"].fn
class TestPdfReadTool:
"""Tests for pdf_read tool."""
def test_read_pdf_file_not_found(self, pdf_read_fn, tmp_path: Path):
"""Reading non-existent PDF returns error."""
result = pdf_read_fn(file_path=str(tmp_path / "missing.pdf"))
assert "error" in result
assert "not found" in result["error"].lower()
def test_read_pdf_invalid_extension(self, pdf_read_fn, tmp_path: Path):
"""Reading non-PDF file returns error."""
txt_file = tmp_path / "test.txt"
txt_file.write_text("not a pdf")
result = pdf_read_fn(file_path=str(txt_file))
assert "error" in result
assert "not a pdf" in result["error"].lower()
def test_read_pdf_directory(self, pdf_read_fn, tmp_path: Path):
"""Reading a directory returns error."""
result = pdf_read_fn(file_path=str(tmp_path))
assert "error" in result
assert "not a file" in result["error"].lower()
def test_max_pages_clamped_low(self, pdf_read_fn, tmp_path: Path):
"""max_pages below 1 is clamped to 1."""
pdf_file = tmp_path / "test.pdf"
pdf_file.write_bytes(b"%PDF-1.4") # Minimal PDF header (will fail to parse)
result = pdf_read_fn(file_path=str(pdf_file), max_pages=0)
# Will error due to invalid PDF, but max_pages should be accepted
assert isinstance(result, dict)
def test_max_pages_clamped_high(self, pdf_read_fn, tmp_path: Path):
"""max_pages above 1000 is clamped to 1000."""
pdf_file = tmp_path / "test.pdf"
pdf_file.write_bytes(b"%PDF-1.4")
result = pdf_read_fn(file_path=str(pdf_file), max_pages=2000)
# Will error due to invalid PDF, but max_pages should be accepted
assert isinstance(result, dict)
def test_pages_parameter_accepted(self, pdf_read_fn, tmp_path: Path):
"""Various pages parameter formats are accepted."""
pdf_file = tmp_path / "test.pdf"
pdf_file.write_bytes(b"%PDF-1.4")
# Test different page formats - all should be accepted
for pages in ["all", "1", "1-5", "1,3,5", None]:
result = pdf_read_fn(file_path=str(pdf_file), pages=pages)
assert isinstance(result, dict)
def test_include_metadata_parameter(self, pdf_read_fn, tmp_path: Path):
"""include_metadata parameter is accepted."""
pdf_file = tmp_path / "test.pdf"
pdf_file.write_bytes(b"%PDF-1.4")
result = pdf_read_fn(file_path=str(pdf_file), include_metadata=False)
assert isinstance(result, dict)
result = pdf_read_fn(file_path=str(pdf_file), include_metadata=True)
assert isinstance(result, dict)
@@ -0,0 +1,52 @@
"""Tests for web_scrape tool (FastMCP)."""
import pytest
from fastmcp import FastMCP
from aden_tools.tools.web_scrape_tool import register_tools
@pytest.fixture
def web_scrape_fn(mcp: FastMCP):
"""Register and return the web_scrape tool function."""
register_tools(mcp)
return mcp._tool_manager._tools["web_scrape"].fn
class TestWebScrapeTool:
"""Tests for web_scrape tool."""
def test_url_auto_prefixed_with_https(self, web_scrape_fn):
"""URLs without scheme get https:// prefix."""
# This will fail to connect, but we can verify the behavior
result = web_scrape_fn(url="example.com")
# Should either succeed or have a network error (not a validation error)
assert isinstance(result, dict)
def test_max_length_clamped_low(self, web_scrape_fn):
"""max_length below 1000 is clamped to 1000."""
# Test with a very low max_length - implementation clamps to 1000
result = web_scrape_fn(url="https://example.com", max_length=500)
# Should not error due to invalid max_length
assert isinstance(result, dict)
def test_max_length_clamped_high(self, web_scrape_fn):
"""max_length above 500000 is clamped to 500000."""
# Test with a very high max_length - implementation clamps to 500000
result = web_scrape_fn(url="https://example.com", max_length=600000)
# Should not error due to invalid max_length
assert isinstance(result, dict)
def test_valid_max_length_accepted(self, web_scrape_fn):
"""Valid max_length values are accepted."""
result = web_scrape_fn(url="https://example.com", max_length=10000)
assert isinstance(result, dict)
def test_include_links_option(self, web_scrape_fn):
"""include_links parameter is accepted."""
result = web_scrape_fn(url="https://example.com", include_links=True)
assert isinstance(result, dict)
def test_selector_option(self, web_scrape_fn):
"""selector parameter is accepted."""
result = web_scrape_fn(url="https://example.com", selector=".content")
assert isinstance(result, dict)
@@ -0,0 +1,57 @@
"""Tests for web_search tool (FastMCP)."""
import pytest
from fastmcp import FastMCP
from aden_tools.tools.web_search_tool import register_tools
@pytest.fixture
def web_search_fn(mcp: FastMCP):
"""Register and return the web_search tool function."""
register_tools(mcp)
return mcp._tool_manager._tools["web_search"].fn
class TestWebSearchTool:
"""Tests for web_search tool."""
def test_search_missing_api_key(self, web_search_fn, monkeypatch):
"""Search without API key returns helpful error."""
monkeypatch.delenv("BRAVE_SEARCH_API_KEY", raising=False)
result = web_search_fn(query="test query")
assert "error" in result
assert "BRAVE_SEARCH_API_KEY" in result["error"]
assert "help" in result
def test_empty_query_returns_error(self, web_search_fn, monkeypatch):
"""Empty query returns error."""
monkeypatch.setenv("BRAVE_SEARCH_API_KEY", "test-key")
result = web_search_fn(query="")
assert "error" in result
assert "1-500" in result["error"].lower() or "character" in result["error"].lower()
def test_long_query_returns_error(self, web_search_fn, monkeypatch):
"""Query exceeding 500 chars returns error."""
monkeypatch.setenv("BRAVE_SEARCH_API_KEY", "test-key")
result = web_search_fn(query="x" * 501)
assert "error" in result
def test_num_results_clamped_to_valid_range(self, web_search_fn, monkeypatch):
"""num_results outside 1-20 is clamped (not error)."""
monkeypatch.setenv("BRAVE_SEARCH_API_KEY", "test-key")
# Test that the function handles out-of-range values gracefully
# The implementation clamps values, so we just verify it doesn't crash
# (actual API call would fail with invalid key, but that's expected)
result = web_search_fn(query="test", num_results=0)
# Should either clamp or error - both are acceptable
assert isinstance(result, dict)
result = web_search_fn(query="test", num_results=100)
assert isinstance(result, dict)
+25
View File
@@ -131,6 +131,31 @@ services:
networks:
- honeycomb-network
# Aden Tools MCP Server - Python tools via Model Context Protocol
aden-tools-mcp:
build:
context: ./aden-tools
container_name: honeycomb-aden-tools-mcp
ports:
- "${ADEN_TOOLS_MCP_PORT:-4001}:4001"
environment:
- MCP_PORT=4001
# Pass through tool-specific env vars
- BRAVE_SEARCH_API_KEY=${BRAVE_SEARCH_API_KEY:-}
volumes:
- .:/workspace:rw # Mount project root for file access
working_dir: /workspace # Set working directory so relative paths work
command: ["python", "/app/mcp_server.py"] # Use absolute path since working_dir changed
healthcheck:
test: ["CMD", "python", "-c", "import httpx; httpx.get('http://localhost:4001/health').raise_for_status()"]
interval: 30s
timeout: 5s
retries: 5
start_period: 10s
restart: unless-stopped
networks:
- honeycomb-network
networks:
honeycomb-network:
driver: bridge