Merge pull request #27 from RichardTang-Aden/feat-incorporate-file-system-tools

Feat incorporate file system tools
This commit is contained in:
Timothy @aden
2026-01-20 16:23:54 -08:00
committed by GitHub
34 changed files with 2133 additions and 9 deletions
+1
View File
@@ -5,6 +5,7 @@ node_modules/
# Build outputs
dist/
build/
workdir/
.next/
out/
+10 -6
View File
@@ -181,12 +181,16 @@ Choose other frameworks when you need:
```
hive/
├── honeycomb/ # Frontend (React + TypeScript + Vite)
├── hive/ # Backend (Node.js + TypeScript + Express)
├── docs/ # Documentation
├── scripts/ # Build and utility scripts
├── config.yaml.example # Configuration template
── docker-compose.yml # Container orchestration
├── honeycomb/ # Frontend Dashboard
├── hive/ # Backend API Server
├── aden-tools/ # MCP Tools Package - 19 tools for agent capabilities
├── docs/ # Documentation and guides
├── scripts/ # Build and utility scripts
── config.yaml.example # Configuration template
├── docker-compose.yml # Container orchestration
├── DEVELOPER.md # Developer guide
├── CONTRIBUTING.md # Contribution guidelines
└── ROADMAP.md # Product roadmap
```
## Development
+2 -2
View File
@@ -146,13 +146,13 @@ Every tool folder needs a `README.md` with:
## Testing
Place tests in `tests/tools/test_my_tool.py`:
Place tests in `tests/tools/test_{{tool_name}}.py`:
```python
import pytest
from fastmcp import FastMCP
from aden_tools.tools.my_tool import register_tools
from aden_tools.tools.{{tool_name}} import register_tools
@pytest.fixture
+10 -1
View File
@@ -15,9 +15,18 @@ COPY mcp_server.py ./
RUN pip install --no-cache-dir -e .
# Create non-root user for security
RUN useradd -m -u 1001 appuser && chown -R appuser:appuser /app
RUN useradd -m -u 1001 appuser
# Create workspaces directory for file system tools persistence
# This directory will be mounted as a volume
RUN mkdir -p /app/workdir/workspaces && \
chown -R appuser:appuser /app
USER appuser
# Declare volume for workspace persistence across container runs
VOLUME ["/app/workdir/workspaces"]
# Expose MCP server port
EXPOSE 4001
+1
View File
@@ -27,6 +27,7 @@ dependencies = [
"pandas>=2.0.0",
"jsonpath-ng>=1.6.0",
"fastmcp>=2.0.0",
"diff-match-patch>=20230430",
]
[project.optional-dependencies]
@@ -20,6 +20,16 @@ from .web_search_tool import register_tools as register_web_search
from .web_scrape_tool import register_tools as register_web_scrape
from .pdf_read_tool import register_tools as register_pdf_read
# Import file system toolkits
from .file_system_toolkits.view_file import register_tools as register_view_file
from .file_system_toolkits.write_to_file import register_tools as register_write_to_file
from .file_system_toolkits.list_dir import register_tools as register_list_dir
from .file_system_toolkits.replace_file_content import register_tools as register_replace_file_content
from .file_system_toolkits.apply_diff import register_tools as register_apply_diff
from .file_system_toolkits.apply_patch import register_tools as register_apply_patch
from .file_system_toolkits.grep_search import register_tools as register_grep_search
from .file_system_toolkits.execute_command_tool import register_tools as register_execute_command
def register_all_tools(mcp: FastMCP) -> List[str]:
"""
@@ -38,6 +48,16 @@ def register_all_tools(mcp: FastMCP) -> List[str]:
register_web_scrape(mcp)
register_pdf_read(mcp)
# Register file system toolkits
register_view_file(mcp)
register_write_to_file(mcp)
register_list_dir(mcp)
register_replace_file_content(mcp)
register_apply_diff(mcp)
register_apply_patch(mcp)
register_grep_search(mcp)
register_execute_command(mcp)
return [
"example_tool",
"file_read",
@@ -45,6 +65,14 @@ def register_all_tools(mcp: FastMCP) -> List[str]:
"web_search",
"web_scrape",
"pdf_read",
"view_file",
"write_to_file",
"list_dir",
"replace_file_content",
"apply_diff",
"apply_patch",
"grep_search",
"execute_command_tool",
]
@@ -0,0 +1,109 @@
# Apply Diff Tool
Applies a unified diff patch to a file within the secure session sandbox.
## Description
The `apply_diff` tool applies structured diff patches to files, enabling precise modifications using the diff-match-patch algorithm. It can apply multiple patches in a single operation and reports success status for each patch.
## Use Cases
- Applying code review suggestions
- Implementing automated refactoring
- Synchronizing file changes from version control
- Making precise, contextual file modifications
## Usage
```python
apply_diff(
path="src/main.py",
diff_text="@@ -1,3 +1,3 @@\n import os\n-import sys\n+import json\n from typing import List",
workspace_id="workspace-123",
agent_id="agent-456",
session_id="session-789"
)
```
## Arguments
| Argument | Type | Required | Default | Description |
|----------|------|----------|---------|-------------|
| `path` | str | Yes | - | The path to the file (relative to session root) |
| `diff_text` | str | Yes | - | The diff patch text to apply |
| `workspace_id` | str | Yes | - | The ID of the workspace |
| `agent_id` | str | Yes | - | The ID of the agent |
| `session_id` | str | Yes | - | The ID of the current session |
## Returns
Returns a dictionary with the following structure:
**Success (all patches applied):**
```python
{
"success": True,
"path": "src/main.py",
"patches_applied": 3,
"all_successful": True
}
```
**Partial success (some patches failed):**
```python
{
"success": False,
"path": "src/main.py",
"patches_applied": 2,
"patches_failed": 1,
"error": "Failed to apply 1 of 3 patches"
}
```
**Error:**
```python
{
"error": "File not found at src/main.py"
}
```
## Error Handling
- Returns an error dict if the file doesn't exist
- Returns partial success if some patches fail to apply
- Returns an error dict if the diff text is malformed
- Uses diff-match-patch library for intelligent fuzzy matching
## Examples
### Applying a single-line change
```python
diff = "@@ -10,1 +10,1 @@\n- old_code()\n+ new_code()"
result = apply_diff(
path="module.py",
diff_text=diff,
workspace_id="ws-1",
agent_id="agent-1",
session_id="session-1"
)
# Returns: {"success": True, "path": "module.py", "patches_applied": 1, "all_successful": True}
```
### Handling patch failures
```python
result = apply_diff(
path="outdated.py",
diff_text="@@ -1,1 +1,1 @@\n-nonexistent line\n+new line",
workspace_id="ws-1",
agent_id="agent-1",
session_id="session-1"
)
# Returns: {"success": False, "path": "outdated.py", "patches_applied": 0, "patches_failed": 1, ...}
```
## Notes
- Uses the diff-match-patch library for patch application
- Supports fuzzy matching for more robust patching
- Patches are applied atomically (all or nothing for file write)
- The file is only modified if at least one patch succeeds
@@ -0,0 +1,3 @@
from .apply_diff import register_tools
__all__ = ["register_tools"]
@@ -0,0 +1,58 @@
import os
import diff_match_patch as dmp_module
from mcp.server.fastmcp import FastMCP
from ..security import get_secure_path
def register_tools(mcp: FastMCP) -> None:
"""Register diff application tools with the MCP server."""
@mcp.tool()
def apply_diff(path: str, diff_text: str, workspace_id: str, agent_id: str, session_id: str) -> dict:
"""
Apply a diff to a file within the session sandbox.
Use this when you need to apply structured diff patches to modify file content.
Args:
path: The path to the file (relative to session root)
diff_text: The diff patch text to apply
workspace_id: The ID of the workspace
agent_id: The ID of the agent
session_id: The ID of the current session
Returns:
Dict with application status and patch results, or error dict
"""
try:
secure_path = get_secure_path(path, workspace_id, agent_id, session_id)
if not os.path.exists(secure_path):
return {"error": f"File not found at {path}"}
dmp = dmp_module.diff_match_patch()
patches = dmp.patch_fromText(diff_text)
with open(secure_path, "r", encoding="utf-8") as f:
content = f.read()
new_content, results = dmp.patch_apply(patches, content)
if all(results):
with open(secure_path, "w", encoding="utf-8") as f:
f.write(new_content)
return {
"success": True,
"path": path,
"patches_applied": len(patches),
"all_successful": True
}
else:
failed_count = sum(1 for r in results if not r)
return {
"success": False,
"path": path,
"patches_applied": len([r for r in results if r]),
"patches_failed": failed_count,
"error": f"Failed to apply {failed_count} of {len(patches)} patches"
}
except Exception as e:
return {"error": f"Failed to apply diff: {str(e)}"}
@@ -0,0 +1,97 @@
# Apply Patch Tool
Applies a patch (unified diff) to a file within the secure session sandbox.
## Description
The `apply_patch` tool is an alias for `apply_diff` that applies structured diff patches to files. It provides the same functionality with alternative naming for user preference.
## Use Cases
- Applying code review suggestions
- Implementing automated refactoring
- Synchronizing file changes from version control
- Making precise, contextual file modifications
## Usage
```python
apply_patch(
path="src/main.py",
patch_text="@@ -1,3 +1,3 @@\n import os\n-import sys\n+import json\n from typing import List",
workspace_id="workspace-123",
agent_id="agent-456",
session_id="session-789"
)
```
## Arguments
| Argument | Type | Required | Default | Description |
|----------|------|----------|---------|-------------|
| `path` | str | Yes | - | The path to the file (relative to session root) |
| `patch_text` | str | Yes | - | The patch text to apply |
| `workspace_id` | str | Yes | - | The ID of the workspace |
| `agent_id` | str | Yes | - | The ID of the agent |
| `session_id` | str | Yes | - | The ID of the current session |
## Returns
Returns a dictionary with the following structure:
**Success (all patches applied):**
```python
{
"success": True,
"path": "src/main.py",
"patches_applied": 3,
"all_successful": True
}
```
**Partial success (some patches failed):**
```python
{
"success": False,
"path": "src/main.py",
"patches_applied": 2,
"patches_failed": 1,
"error": "Failed to apply 1 of 3 patches"
}
```
**Error:**
```python
{
"error": "File not found at src/main.py"
}
```
## Error Handling
- Returns an error dict if the file doesn't exist
- Returns partial success if some patches fail to apply
- Returns an error dict if the patch text is malformed
- Uses diff-match-patch library for intelligent fuzzy matching
## Examples
### Applying a patch
```python
patch = "@@ -10,1 +10,1 @@\n- old_code()\n+ new_code()"
result = apply_patch(
path="module.py",
patch_text=patch,
workspace_id="ws-1",
agent_id="agent-1",
session_id="session-1"
)
# Returns: {"success": True, "path": "module.py", "patches_applied": 1, "all_successful": True}
```
## Notes
- This is an alias for the `apply_diff` tool with identical functionality
- Uses the diff-match-patch library for patch application
- Supports fuzzy matching for more robust patching
- The implementation is duplicated for atomic isolation (not a simple function call)
@@ -0,0 +1,3 @@
from .apply_patch import register_tools
__all__ = ["register_tools"]
@@ -0,0 +1,60 @@
import os
import diff_match_patch as dmp_module
from mcp.server.fastmcp import FastMCP
from ..security import get_secure_path
def register_tools(mcp: FastMCP) -> None:
"""Register patch application tools with the MCP server."""
@mcp.tool()
def apply_patch(path: str, patch_text: str, workspace_id: str, agent_id: str, session_id: str) -> dict:
"""
Apply a patch to a file within the session sandbox.
Use this when you need to apply patch-formatted changes to a file.
This is an alias for apply_diff with the same functionality.
Args:
path: The path to the file (relative to session root)
patch_text: The patch text to apply
workspace_id: The ID of the workspace
agent_id: The ID of the agent
session_id: The ID of the current session
Returns:
Dict with application status and patch results, or error dict
"""
# Logic duplicated from apply_diff for atomic isolation
try:
secure_path = get_secure_path(path, workspace_id, agent_id, session_id)
if not os.path.exists(secure_path):
return {"error": f"File not found at {path}"}
dmp = dmp_module.diff_match_patch()
patches = dmp.patch_fromText(patch_text)
with open(secure_path, "r", encoding="utf-8") as f:
content = f.read()
new_content, results = dmp.patch_apply(patches, content)
if all(results):
with open(secure_path, "w", encoding="utf-8") as f:
f.write(new_content)
return {
"success": True,
"path": path,
"patches_applied": len(patches),
"all_successful": True
}
else:
failed_count = sum(1 for r in results if not r)
return {
"success": False,
"path": path,
"patches_applied": len([r for r in results if r]),
"patches_failed": failed_count,
"error": f"Failed to apply {failed_count} of {len(patches)} patches"
}
except Exception as e:
return {"error": f"Failed to apply patch: {str(e)}"}
@@ -0,0 +1,152 @@
# Execute Command Tool
Executes shell commands within the secure session sandbox.
## Description
The `execute_command_tool` allows you to run arbitrary shell commands in a sandboxed environment. Commands are executed with a 60-second timeout and capture both stdout and stderr output.
## Use Cases
- Running build commands (npm build, make, etc.)
- Executing tests
- Running linters or formatters
- Performing git operations
- Installing dependencies
## Usage
```python
execute_command_tool(
command="npm install",
workspace_id="workspace-123",
agent_id="agent-456",
session_id="session-789",
cwd="project"
)
```
## Arguments
| Argument | Type | Required | Default | Description |
|----------|------|----------|---------|-------------|
| `command` | str | Yes | - | The shell command to execute |
| `workspace_id` | str | Yes | - | The ID of the workspace |
| `agent_id` | str | Yes | - | The ID of the agent |
| `session_id` | str | Yes | - | The ID of the current session |
| `cwd` | str | No | "." | The working directory for the command (relative to session root) |
## Returns
Returns a dictionary with the following structure:
**Success:**
```python
{
"success": True,
"command": "npm install",
"return_code": 0,
"stdout": "added 42 packages in 3s",
"stderr": "",
"cwd": "project"
}
```
**Command failure (non-zero exit):**
```python
{
"success": True, # Command executed successfully, but exited with error code
"command": "npm test",
"return_code": 1,
"stdout": "",
"stderr": "Error: Tests failed",
"cwd": "."
}
```
**Timeout:**
```python
{
"error": "Command timed out after 60 seconds"
}
```
**Error:**
```python
{
"error": "Failed to execute command: [error message]"
}
```
## Error Handling
- Returns an error dict if the command times out (60 second limit)
- Returns an error dict if the command cannot be executed
- Returns success with non-zero return_code if command runs but fails
- Commands are executed in a sandboxed session environment
- Working directory defaults to session root if not specified
## Security Considerations
- Commands are executed within the session sandbox only
- File access is restricted to the session directory
- Network access depends on sandbox configuration
- Commands run with the permissions of the session user
- Use with caution as shell injection is possible
## Examples
### Running a build command
```python
result = execute_command_tool(
command="npm run build",
workspace_id="ws-1",
agent_id="agent-1",
session_id="session-1",
cwd="frontend"
)
# Returns: {"success": True, "return_code": 0, "stdout": "Build complete", ...}
```
### Running tests with output
```python
result = execute_command_tool(
command="pytest -v",
workspace_id="ws-1",
agent_id="agent-1",
session_id="session-1"
)
# Returns: {"success": True, "return_code": 0, "stdout": "test output...", "stderr": ""}
```
### Handling command failures
```python
result = execute_command_tool(
command="nonexistent-command",
workspace_id="ws-1",
agent_id="agent-1",
session_id="session-1"
)
# Returns: {"success": True, "return_code": 127, "stderr": "command not found", ...}
```
### Running git commands
```python
result = execute_command_tool(
command="git status",
workspace_id="ws-1",
agent_id="agent-1",
session_id="session-1",
cwd="repo"
)
# Returns: {"success": True, "return_code": 0, "stdout": "On branch main...", ...}
```
## Notes
- 60-second timeout for all commands
- Commands are executed using shell=True (supports pipes, redirects, etc.)
- Both stdout and stderr are captured separately
- Return code 0 typically indicates success
- Working directory is created if it doesn't exist
- Command output is returned as text (UTF-8 encoding)
@@ -0,0 +1,3 @@
from .execute_command_tool import register_tools
__all__ = ["register_tools"]
@@ -0,0 +1,58 @@
import os
import subprocess
from typing import Optional
from mcp.server.fastmcp import FastMCP
from ..security import get_secure_path, WORKSPACES_DIR
def register_tools(mcp: FastMCP) -> None:
"""Register command execution tools with the MCP server."""
@mcp.tool()
def execute_command_tool(command: str, workspace_id: str, agent_id: str, session_id: str, cwd: Optional[str] = None) -> dict:
"""
Execute a shell command within the session sandbox.
Use this when you need to run shell commands safely within the sandboxed environment.
Commands are executed with a 60-second timeout.
Args:
command: The shell command to execute
workspace_id: The ID of the workspace
agent_id: The ID of the agent
session_id: The ID of the current session
cwd: The working directory for the command (relative to session root, optional)
Returns:
Dict with command output and execution details, or error dict
"""
try:
# Default cwd is the session root
session_root = os.path.join(WORKSPACES_DIR, workspace_id, agent_id, session_id)
os.makedirs(session_root, exist_ok=True)
if cwd:
secure_cwd = get_secure_path(cwd, workspace_id, agent_id, session_id)
else:
secure_cwd = session_root
result = subprocess.run(
command,
shell=True,
cwd=secure_cwd,
capture_output=True,
text=True,
timeout=60
)
return {
"success": True,
"command": command,
"return_code": result.returncode,
"stdout": result.stdout,
"stderr": result.stderr,
"cwd": cwd or "."
}
except subprocess.TimeoutExpired:
return {"error": "Command timed out after 60 seconds"}
except Exception as e:
return {"error": f"Failed to execute command: {str(e)}"}
@@ -0,0 +1,140 @@
# Grep Search Tool
Searches for regex patterns in files or directories within the secure session sandbox.
## Description
The `grep_search` tool provides powerful pattern matching capabilities across files and directories. It uses Python's regex engine to find matches and returns detailed results including file paths, line numbers, and matched content.
## Use Cases
- Finding function or variable definitions
- Searching for TODO comments or specific patterns
- Analyzing code for security issues or patterns
- Locating configuration values across multiple files
## Usage
```python
grep_search(
path="src",
pattern="def \\w+\\(",
workspace_id="workspace-123",
agent_id="agent-456",
session_id="session-789",
recursive=True
)
```
## Arguments
| Argument | Type | Required | Default | Description |
|----------|------|----------|---------|-------------|
| `path` | str | Yes | - | The path to search in (file or directory, relative to session root) |
| `pattern` | str | Yes | - | The regex pattern to search for |
| `workspace_id` | str | Yes | - | The ID of the workspace |
| `agent_id` | str | Yes | - | The ID of the agent |
| `session_id` | str | Yes | - | The ID of the current session |
| `recursive` | bool | No | False | Whether to search recursively in subdirectories |
## Returns
Returns a dictionary with the following structure:
**Success:**
```python
{
"success": True,
"pattern": "def \\w+\\(",
"path": "src",
"recursive": True,
"matches": [
{
"file": "src/main.py",
"line_number": 10,
"line_content": "def process_data(args):"
},
{
"file": "src/utils.py",
"line_number": 5,
"line_content": "def helper_function():"
}
],
"total_matches": 2
}
```
**No matches:**
```python
{
"success": True,
"pattern": "nonexistent",
"path": "src",
"recursive": False,
"matches": [],
"total_matches": 0
}
```
**Error:**
```python
{
"error": "Failed to perform grep search: [error message]"
}
```
## Error Handling
- Returns an error dict if the path doesn't exist
- Skips files that cannot be decoded (binary files, encoding errors)
- Skips files with permission errors
- Returns empty matches list if no matches found
- Handles invalid regex patterns with error message
## Examples
### Searching for function definitions
```python
result = grep_search(
path="src",
pattern="^def ",
workspace_id="ws-1",
agent_id="agent-1",
session_id="session-1",
recursive=True
)
# Returns: {"success": True, "pattern": "^def ", "matches": [...], "total_matches": 15}
```
### Searching a single file
```python
result = grep_search(
path="config.py",
pattern="API_KEY",
workspace_id="ws-1",
agent_id="agent-1",
session_id="session-1"
)
# Returns: {"success": True, "pattern": "API_KEY", "matches": [{...}], "total_matches": 1}
```
### Case-insensitive search using regex flags
```python
result = grep_search(
path="docs",
pattern="(?i)todo",
workspace_id="ws-1",
agent_id="agent-1",
session_id="session-1",
recursive=True
)
# Finds "TODO", "todo", "Todo", etc.
```
## Notes
- Uses Python's `re` module for regex matching
- Binary files and files with encoding errors are automatically skipped
- Line numbers start at 1
- Returned file paths are relative to the session root
- For non-recursive directory searches, only files in the immediate directory are searched
@@ -0,0 +1,3 @@
from .grep_search import register_tools
__all__ = ["register_tools"]
@@ -0,0 +1,70 @@
import os
import re
from mcp.server.fastmcp import FastMCP
from ..security import get_secure_path, WORKSPACES_DIR
def register_tools(mcp: FastMCP) -> None:
"""Register grep search tools with the MCP server."""
@mcp.tool()
def grep_search(path: str, pattern: str, workspace_id: str, agent_id: str, session_id: str, recursive: bool = False) -> dict:
"""
Search for a pattern in a file or directory within the session sandbox.
Use this when you need to find specific content or patterns in files using regex.
Set recursive=True to search through all subdirectories.
Args:
path: The path to search in (file or directory, relative to session root)
pattern: The regex pattern to search for
workspace_id: The ID of the workspace
agent_id: The ID of the agent
session_id: The ID of the current session
recursive: Whether to search recursively in directories (default: False)
Returns:
Dict with search results and match details, or error dict
"""
try:
secure_path = get_secure_path(path, workspace_id, agent_id, session_id)
# Use session dir root for relative path calculations
session_root = os.path.join(WORKSPACES_DIR, workspace_id, agent_id, session_id)
matches = []
regex = re.compile(pattern)
if os.path.isfile(secure_path):
files = [secure_path]
elif recursive:
files = []
for root, _, filenames in os.walk(secure_path):
for filename in filenames:
files.append(os.path.join(root, filename))
else:
files = [os.path.join(secure_path, f) for f in os.listdir(secure_path) if os.path.isfile(os.path.join(secure_path, f))]
for file_path in files:
# Calculate relative path for display
display_path = os.path.relpath(file_path, session_root)
try:
with open(file_path, "r", encoding="utf-8") as f:
for i, line in enumerate(f, 1):
if regex.search(line):
matches.append({
"file": display_path,
"line_number": i,
"line_content": line.strip()
})
except (UnicodeDecodeError, PermissionError):
continue
return {
"success": True,
"pattern": pattern,
"path": path,
"recursive": recursive,
"matches": matches,
"total_matches": len(matches)
}
except Exception as e:
return {"error": f"Failed to perform grep search: {str(e)}"}
@@ -0,0 +1,88 @@
# List Dir Tool
Lists the contents of a directory within the secure session sandbox.
## Description
The `list_dir` tool allows you to explore directory contents, viewing all files and subdirectories with their metadata. It provides a structured view of the filesystem hierarchy.
## Use Cases
- Exploring project structure
- Finding specific files
- Checking for file existence
- Understanding directory organization
## Usage
```python
list_dir(
path="src",
workspace_id="workspace-123",
agent_id="agent-456",
session_id="session-789"
)
```
## Arguments
| Argument | Type | Required | Default | Description |
|----------|------|----------|---------|-------------|
| `path` | str | Yes | - | The directory path (relative to session root) |
| `workspace_id` | str | Yes | - | The ID of the workspace |
| `agent_id` | str | Yes | - | The ID of the agent |
| `session_id` | str | Yes | - | The ID of the current session |
## Returns
Returns a dictionary with the following structure:
**Success:**
```python
{
"success": True,
"path": "src",
"entries": [
{"name": "main.py", "type": "file", "size_bytes": 1024},
{"name": "utils", "type": "directory", "size_bytes": null}
],
"total_count": 2
}
```
**Error:**
```python
{
"error": "Directory not found at src"
}
```
## Error Handling
- Returns an error dict if the directory doesn't exist
- Returns an error dict if the path points to a file instead of a directory
- Returns an error dict if the directory cannot be read (permission issues, etc.)
## Examples
### Listing directory contents
```python
result = list_dir(
path=".",
workspace_id="ws-1",
agent_id="agent-1",
session_id="session-1"
)
# Returns: {"success": True, "path": ".", "entries": [...], "total_count": 5}
```
### Checking an empty directory
```python
result = list_dir(
path="empty_folder",
workspace_id="ws-1",
agent_id="agent-1",
session_id="session-1"
)
# Returns: {"success": True, "path": "empty_folder", "entries": [], "total_count": 0}
```
@@ -0,0 +1,3 @@
from .list_dir import register_tools
__all__ = ["register_tools"]
@@ -0,0 +1,49 @@
import os
from mcp.server.fastmcp import FastMCP
from ..security import get_secure_path
def register_tools(mcp: FastMCP) -> None:
"""Register directory listing tools with the MCP server."""
@mcp.tool()
def list_dir(path: str, workspace_id: str, agent_id: str, session_id: str) -> dict:
"""
List the contents of a directory within the session sandbox.
Use this when you need to explore directory contents and see what files
and subdirectories exist.
Args:
path: The directory path (relative to session root)
workspace_id: The ID of the workspace
agent_id: The ID of the agent
session_id: The ID of the current session
Returns:
Dict with directory contents and metadata, or error dict
"""
try:
secure_path = get_secure_path(path, workspace_id, agent_id, session_id)
if not os.path.exists(secure_path):
return {"error": f"Directory not found at {path}"}
items = os.listdir(secure_path)
entries = []
for item in items:
full_path = os.path.join(secure_path, item)
is_dir = os.path.isdir(full_path)
entry = {
"name": item,
"type": "directory" if is_dir else "file",
"size_bytes": os.path.getsize(full_path) if not is_dir else None
}
entries.append(entry)
return {
"success": True,
"path": path,
"entries": entries,
"total_count": len(entries)
}
except Exception as e:
return {"error": f"Failed to list directory: {str(e)}"}
@@ -0,0 +1,102 @@
# Replace File Content Tool
Replaces specific string occurrences in a file within the secure session sandbox.
## Description
The `replace_file_content` tool performs find-and-replace operations on file content. It replaces all occurrences of a target string with a replacement string, providing details about the number of replacements made.
## Use Cases
- Updating configuration values
- Refactoring code (renaming variables, functions)
- Batch text replacements
- Updating version numbers or URLs
## Usage
```python
replace_file_content(
path="config/settings.json",
target='"debug": false',
replacement='"debug": true',
workspace_id="workspace-123",
agent_id="agent-456",
session_id="session-789"
)
```
## Arguments
| Argument | Type | Required | Default | Description |
|----------|------|----------|---------|-------------|
| `path` | str | Yes | - | The path to the file (relative to session root) |
| `target` | str | Yes | - | The string to search for and replace |
| `replacement` | str | Yes | - | The string to replace it with |
| `workspace_id` | str | Yes | - | The ID of the workspace |
| `agent_id` | str | Yes | - | The ID of the agent |
| `session_id` | str | Yes | - | The ID of the current session |
## Returns
Returns a dictionary with the following structure:
**Success:**
```python
{
"success": True,
"path": "config/settings.json",
"occurrences_replaced": 3,
"target_length": 15,
"replacement_length": 14
}
```
**Error:**
```python
{
"error": "Target string not found in config/settings.json"
}
```
## Error Handling
- Returns an error dict if the file doesn't exist
- Returns an error dict if the target string is not found in the file
- Returns an error dict if the file cannot be read or written
- All occurrences of the target string are replaced
## Examples
### Replacing a configuration value
```python
result = replace_file_content(
path="app.config",
target="localhost",
replacement="production.example.com",
workspace_id="ws-1",
agent_id="agent-1",
session_id="session-1"
)
# Returns: {"success": True, "path": "app.config", "occurrences_replaced": 2, "target_length": 9, "replacement_length": 23}
```
### Handling missing target string
```python
result = replace_file_content(
path="README.md",
target="nonexistent text",
replacement="new text",
workspace_id="ws-1",
agent_id="agent-1",
session_id="session-1"
)
# Returns: {"error": "Target string not found in README.md"}
```
## Notes
- This operation replaces **all** occurrences of the target string
- The replacement is case-sensitive
- For regex-based replacements, consider using a different tool
- The file is overwritten with the new content
@@ -0,0 +1,3 @@
from .replace_file_content import register_tools
__all__ = ["register_tools"]
@@ -0,0 +1,51 @@
import os
from mcp.server.fastmcp import FastMCP
from ..security import get_secure_path
def register_tools(mcp: FastMCP) -> None:
"""Register file content replacement tools with the MCP server."""
@mcp.tool()
def replace_file_content(path: str, target: str, replacement: str, workspace_id: str, agent_id: str, session_id: str) -> dict:
"""
Replace content in a file within the session sandbox.
Use this when you need to perform find-and-replace operations on file content.
All occurrences of the target string will be replaced.
Args:
path: The path to the file (relative to session root)
target: The string to search for and replace
replacement: The string to replace it with
workspace_id: The ID of the workspace
agent_id: The ID of the agent
session_id: The ID of the current session
Returns:
Dict with replacement count and status, or error dict
"""
try:
secure_path = get_secure_path(path, workspace_id, agent_id, session_id)
if not os.path.exists(secure_path):
return {"error": f"File not found at {path}"}
with open(secure_path, "r", encoding="utf-8") as f:
content = f.read()
if target not in content:
return {"error": f"Target string not found in {path}"}
occurrences = content.count(target)
new_content = content.replace(target, replacement)
with open(secure_path, "w", encoding="utf-8") as f:
f.write(new_content)
return {
"success": True,
"path": path,
"occurrences_replaced": occurrences,
"target_length": len(target),
"replacement_length": len(replacement)
}
except Exception as e:
return {"error": f"Failed to replace content: {str(e)}"}
@@ -0,0 +1,27 @@
import os
WORKSPACES_DIR = os.path.abspath(os.path.join(os.getcwd(), "workdir/workspaces"))
def get_secure_path(path: str, workspace_id: str, agent_id: str, session_id: str) -> str:
"""Resolve and verify a path within a 3-layer sandbox (workspace/agent/session)."""
if not workspace_id or not agent_id or not session_id:
raise ValueError("workspace_id, agent_id, and session_id are all required")
# Ensure session directory exists: runtime/workspace_id/agent_id/session_id
session_dir = os.path.join(WORKSPACES_DIR, workspace_id, agent_id, session_id)
os.makedirs(session_dir, exist_ok=True)
# Resolve absolute path
if os.path.isabs(path):
# Treat absolute paths as relative to the session root if they start with /
rel_path = path.lstrip(os.sep)
final_path = os.path.abspath(os.path.join(session_dir, rel_path))
else:
final_path = os.path.abspath(os.path.join(session_dir, path))
# Verify path is within session_dir
common_prefix = os.path.commonpath([final_path, session_dir])
if common_prefix != session_dir:
raise ValueError(f"Access denied: Path '{path}' is outside the session sandbox.")
return final_path
@@ -0,0 +1,86 @@
# View File Tool
Reads the content of a file within the secure session sandbox.
## Description
The `view_file` tool allows you to read and retrieve the complete content of files within a sandboxed session environment. It provides metadata about the file along with its content.
## Use Cases
- Reading configuration files
- Viewing source code
- Inspecting log files
- Retrieving data files for processing
## Usage
```python
view_file(
path="config/settings.json",
workspace_id="workspace-123",
agent_id="agent-456",
session_id="session-789"
)
```
## Arguments
| Argument | Type | Required | Default | Description |
|----------|------|----------|---------|-------------|
| `path` | str | Yes | - | The path to the file (relative to session root) |
| `workspace_id` | str | Yes | - | The ID of the workspace |
| `agent_id` | str | Yes | - | The ID of the agent |
| `session_id` | str | Yes | - | The ID of the current session |
## Returns
Returns a dictionary with the following structure:
**Success:**
```python
{
"success": True,
"path": "config/settings.json",
"content": "{\"debug\": true}",
"size_bytes": 16,
"lines": 1
}
```
**Error:**
```python
{
"error": "File not found at config/settings.json"
}
```
## Error Handling
- Returns an error dict if the file doesn't exist
- Returns an error dict if the file cannot be read (permission issues, encoding errors, etc.)
- Handles binary files gracefully by returning appropriate error messages
## Examples
### Reading a text file
```python
result = view_file(
path="README.md",
workspace_id="ws-1",
agent_id="agent-1",
session_id="session-1"
)
# Returns: {"success": True, "path": "README.md", "content": "# My Project\n...", "size_bytes": 1024, "lines": 42}
```
### Handling missing files
```python
result = view_file(
path="nonexistent.txt",
workspace_id="ws-1",
agent_id="agent-1",
session_id="session-1"
)
# Returns: {"error": "File not found at nonexistent.txt"}
```
@@ -0,0 +1,3 @@
from .view_file import register_tools
__all__ = ["register_tools"]
@@ -0,0 +1,40 @@
import os
from mcp.server.fastmcp import FastMCP
from ..security import get_secure_path
def register_tools(mcp: FastMCP) -> None:
"""Register file view tools with the MCP server."""
@mcp.tool()
def view_file(path: str, workspace_id: str, agent_id: str, session_id: str) -> dict:
"""
Read the content of a file within the session sandbox.
Use this when you need to view the contents of an existing file.
Args:
path: The path to the file (relative to session root)
workspace_id: The ID of the workspace
agent_id: The ID of the agent
session_id: The ID of the current session
Returns:
Dict with file content and metadata, or error dict
"""
try:
secure_path = get_secure_path(path, workspace_id, agent_id, session_id)
if not os.path.exists(secure_path):
return {"error": f"File not found at {path}"}
with open(secure_path, "r", encoding="utf-8") as f:
content = f.read()
return {
"success": True,
"path": path,
"content": content,
"size_bytes": len(content.encode("utf-8")),
"lines": len(content.splitlines())
}
except Exception as e:
return {"error": f"Failed to read file: {str(e)}"}
@@ -0,0 +1,92 @@
# Write to File Tool
Writes content to a file within the secure session sandbox. Supports both overwriting and appending modes.
## Description
The `write_to_file` tool allows you to create new files or modify existing files within a sandboxed session environment. It automatically creates parent directories if they don't exist and provides flexible write modes.
## Use Cases
- Creating new configuration files
- Writing generated code or data
- Appending logs or output to existing files
- Saving processed results to disk
## Usage
```python
write_to_file(
path="config/settings.json",
content='{"debug": true}',
workspace_id="workspace-123",
agent_id="agent-456",
session_id="session-789",
append=False
)
```
## Arguments
| Argument | Type | Required | Default | Description |
|----------|------|----------|---------|-------------|
| `path` | str | Yes | - | The path to the file (relative to session root) |
| `content` | str | Yes | - | The content to write to the file |
| `workspace_id` | str | Yes | - | The ID of the workspace |
| `agent_id` | str | Yes | - | The ID of the agent |
| `session_id` | str | Yes | - | The ID of the current session |
| `append` | bool | No | False | Whether to append to the file instead of overwriting |
## Returns
Returns a dictionary with the following structure:
**Success:**
```python
{
"success": True,
"path": "config/settings.json",
"mode": "written", # or "appended"
"bytes_written": 18
}
```
**Error:**
```python
{
"error": "Failed to write to file: [error message]"
}
```
## Error Handling
- Returns an error dict if the file cannot be written (permission issues, invalid path, etc.)
- Automatically creates parent directories if they don't exist
- Handles encoding errors gracefully
## Examples
### Creating a new file
```python
result = write_to_file(
path="data/output.txt",
content="Hello, world!",
workspace_id="ws-1",
agent_id="agent-1",
session_id="session-1"
)
# Returns: {"success": True, "path": "data/output.txt", "mode": "written", "bytes_written": 13}
```
### Appending to a file
```python
result = write_to_file(
path="logs/activity.log",
content="\n[INFO] Task completed",
workspace_id="ws-1",
agent_id="agent-1",
session_id="session-1",
append=True
)
# Returns: {"success": True, "path": "logs/activity.log", "mode": "appended", "bytes_written": 24}
```
@@ -0,0 +1,3 @@
from .write_to_file import register_tools
__all__ = ["register_tools"]
@@ -0,0 +1,40 @@
import os
from mcp.server.fastmcp import FastMCP
from ..security import get_secure_path
def register_tools(mcp: FastMCP) -> None:
"""Register file write tools with the MCP server."""
@mcp.tool()
def write_to_file(path: str, content: str, workspace_id: str, agent_id: str, session_id: str, append: bool = False) -> dict:
"""
Write content to a file within the session sandbox.
Use this when you need to create a new file or overwrite an existing file.
Set append=True to add content to the end of an existing file.
Args:
path: The path to the file (relative to session root)
content: The content to write to the file
workspace_id: The ID of the workspace
agent_id: The ID of the agent
session_id: The ID of the current session
append: Whether to append to the file instead of overwriting (default: False)
Returns:
Dict with success status and path, or error dict
"""
try:
secure_path = get_secure_path(path, workspace_id, agent_id, session_id)
os.makedirs(os.path.dirname(secure_path), exist_ok=True)
mode = "a" if append else "w"
with open(secure_path, mode, encoding="utf-8") as f:
f.write(content)
return {
"success": True,
"path": path,
"mode": "appended" if append else "written",
"bytes_written": len(content.encode("utf-8"))
}
except Exception as e:
return {"error": f"Failed to write to file: {str(e)}"}
@@ -0,0 +1,731 @@
"""Tests for file_system_toolkits tools (FastMCP)."""
import os
import pytest
from pathlib import Path
from unittest.mock import Mock, patch
from fastmcp import FastMCP
@pytest.fixture
def mcp():
"""Create a FastMCP instance."""
return FastMCP("test-server")
@pytest.fixture
def mock_workspace():
"""Mock workspace, agent, and session IDs."""
return {
"workspace_id": "test-workspace",
"agent_id": "test-agent",
"session_id": "test-session"
}
@pytest.fixture
def mock_secure_path(tmp_path):
"""Mock get_secure_path to return temp directory paths."""
def _get_secure_path(path, workspace_id, agent_id, session_id):
return os.path.join(tmp_path, path)
with patch("aden_tools.tools.file_system_toolkits.view_file.view_file.get_secure_path", side_effect=_get_secure_path):
with patch("aden_tools.tools.file_system_toolkits.write_to_file.write_to_file.get_secure_path", side_effect=_get_secure_path):
with patch("aden_tools.tools.file_system_toolkits.list_dir.list_dir.get_secure_path", side_effect=_get_secure_path):
with patch("aden_tools.tools.file_system_toolkits.replace_file_content.replace_file_content.get_secure_path", side_effect=_get_secure_path):
with patch("aden_tools.tools.file_system_toolkits.apply_diff.apply_diff.get_secure_path", side_effect=_get_secure_path):
with patch("aden_tools.tools.file_system_toolkits.apply_patch.apply_patch.get_secure_path", side_effect=_get_secure_path):
with patch("aden_tools.tools.file_system_toolkits.grep_search.grep_search.get_secure_path", side_effect=_get_secure_path):
with patch("aden_tools.tools.file_system_toolkits.grep_search.grep_search.WORKSPACES_DIR", str(tmp_path)):
with patch("aden_tools.tools.file_system_toolkits.execute_command_tool.execute_command_tool.get_secure_path", side_effect=_get_secure_path):
with patch("aden_tools.tools.file_system_toolkits.execute_command_tool.execute_command_tool.WORKSPACES_DIR", str(tmp_path)):
yield
class TestViewFileTool:
"""Tests for view_file tool."""
@pytest.fixture
def view_file_fn(self, mcp):
from aden_tools.tools.file_system_toolkits.view_file import register_tools
register_tools(mcp)
return mcp._tool_manager._tools["view_file"].fn
def test_view_existing_file(self, view_file_fn, mock_workspace, mock_secure_path, tmp_path):
"""Viewing an existing file returns content and metadata."""
test_file = tmp_path / "test.txt"
test_file.write_text("Hello, World!")
result = view_file_fn(path="test.txt", **mock_workspace)
assert result["success"] is True
assert result["content"] == "Hello, World!"
assert result["size_bytes"] == len("Hello, World!".encode("utf-8"))
assert result["lines"] == 1
def test_view_nonexistent_file(self, view_file_fn, mock_workspace, mock_secure_path):
"""Viewing a non-existent file returns an error."""
result = view_file_fn(path="nonexistent.txt", **mock_workspace)
assert "error" in result
assert "not found" in result["error"].lower()
def test_view_multiline_file(self, view_file_fn, mock_workspace, mock_secure_path, tmp_path):
"""Viewing a multiline file returns correct line count."""
test_file = tmp_path / "multiline.txt"
content = "Line 1\nLine 2\nLine 3\nLine 4\n"
test_file.write_text(content)
result = view_file_fn(path="multiline.txt", **mock_workspace)
assert result["success"] is True
assert result["content"] == content
assert result["lines"] == 4
def test_view_empty_file(self, view_file_fn, mock_workspace, mock_secure_path, tmp_path):
"""Viewing an empty file returns empty content."""
test_file = tmp_path / "empty.txt"
test_file.write_text("")
result = view_file_fn(path="empty.txt", **mock_workspace)
assert result["success"] is True
assert result["content"] == ""
assert result["size_bytes"] == 0
assert result["lines"] == 0
def test_view_file_with_unicode(self, view_file_fn, mock_workspace, mock_secure_path, tmp_path):
"""Viewing a file with unicode characters works correctly."""
test_file = tmp_path / "unicode.txt"
content = "Hello 世界! 🌍 émoji"
test_file.write_text(content, encoding="utf-8")
result = view_file_fn(path="unicode.txt", **mock_workspace)
assert result["success"] is True
assert result["content"] == content
assert result["size_bytes"] == len(content.encode("utf-8"))
def test_view_nested_file(self, view_file_fn, mock_workspace, mock_secure_path, tmp_path):
"""Viewing a file in a nested directory works correctly."""
nested = tmp_path / "nested" / "dir"
nested.mkdir(parents=True)
test_file = nested / "file.txt"
test_file.write_text("nested content")
result = view_file_fn(path="nested/dir/file.txt", **mock_workspace)
assert result["success"] is True
assert result["content"] == "nested content"
class TestWriteToFileTool:
"""Tests for write_to_file tool."""
@pytest.fixture
def write_to_file_fn(self, mcp):
from aden_tools.tools.file_system_toolkits.write_to_file import register_tools
register_tools(mcp)
return mcp._tool_manager._tools["write_to_file"].fn
def test_write_new_file(self, write_to_file_fn, mock_workspace, mock_secure_path, tmp_path):
"""Writing to a new file creates it successfully."""
result = write_to_file_fn(
path="new_file.txt",
content="Test content",
**mock_workspace
)
assert result["success"] is True
assert result["mode"] == "written"
assert result["bytes_written"] > 0
# Verify file was created
created_file = tmp_path / "new_file.txt"
assert created_file.exists()
assert created_file.read_text() == "Test content"
def test_write_append_mode(self, write_to_file_fn, mock_workspace, mock_secure_path, tmp_path):
"""Writing with append=True appends to existing file."""
test_file = tmp_path / "append_test.txt"
test_file.write_text("Line 1\n")
result = write_to_file_fn(
path="append_test.txt",
content="Line 2\n",
append=True,
**mock_workspace
)
assert result["success"] is True
assert result["mode"] == "appended"
assert test_file.read_text() == "Line 1\nLine 2\n"
def test_write_overwrite_existing(self, write_to_file_fn, mock_workspace, mock_secure_path, tmp_path):
"""Writing to existing file overwrites it by default."""
test_file = tmp_path / "overwrite.txt"
test_file.write_text("Original content")
result = write_to_file_fn(
path="overwrite.txt",
content="New content",
**mock_workspace
)
assert result["success"] is True
assert result["mode"] == "written"
assert test_file.read_text() == "New content"
def test_write_creates_parent_directories(self, write_to_file_fn, mock_workspace, mock_secure_path, tmp_path):
"""Writing creates parent directories if they don't exist."""
result = write_to_file_fn(
path="nested/dir/file.txt",
content="Test",
**mock_workspace
)
assert result["success"] is True
created_file = tmp_path / "nested" / "dir" / "file.txt"
assert created_file.exists()
assert created_file.read_text() == "Test"
def test_write_empty_content(self, write_to_file_fn, mock_workspace, mock_secure_path, tmp_path):
"""Writing empty content creates empty file."""
result = write_to_file_fn(
path="empty.txt",
content="",
**mock_workspace
)
assert result["success"] is True
assert result["bytes_written"] == 0
created_file = tmp_path / "empty.txt"
assert created_file.exists()
assert created_file.read_text() == ""
class TestListDirTool:
"""Tests for list_dir tool."""
@pytest.fixture
def list_dir_fn(self, mcp):
from aden_tools.tools.file_system_toolkits.list_dir import register_tools
register_tools(mcp)
return mcp._tool_manager._tools["list_dir"].fn
def test_list_directory(self, list_dir_fn, mock_workspace, mock_secure_path, tmp_path):
"""Listing a directory returns all entries."""
# Create test files and directories
(tmp_path / "file1.txt").write_text("content")
(tmp_path / "file2.txt").write_text("content")
(tmp_path / "subdir").mkdir()
result = list_dir_fn(path=".", **mock_workspace)
assert result["success"] is True
assert result["total_count"] == 3
assert len(result["entries"]) == 3
# Check that entries have correct structure
for entry in result["entries"]:
assert "name" in entry
assert "type" in entry
assert entry["type"] in ["file", "directory"]
def test_list_empty_directory(self, list_dir_fn, mock_workspace, mock_secure_path, tmp_path):
"""Listing an empty directory returns empty list."""
empty_dir = tmp_path / "empty"
empty_dir.mkdir()
result = list_dir_fn(path="empty", **mock_workspace)
assert result["success"] is True
assert result["total_count"] == 0
assert result["entries"] == []
def test_list_nonexistent_directory(self, list_dir_fn, mock_workspace, mock_secure_path):
"""Listing a non-existent directory returns error."""
result = list_dir_fn(path="nonexistent_dir", **mock_workspace)
assert "error" in result
assert "not found" in result["error"].lower()
def test_list_directory_with_file_sizes(self, list_dir_fn, mock_workspace, mock_secure_path, tmp_path):
"""Listing a directory returns file sizes for files."""
(tmp_path / "small.txt").write_text("hi")
(tmp_path / "larger.txt").write_text("hello world")
(tmp_path / "subdir").mkdir()
result = list_dir_fn(path=".", **mock_workspace)
assert result["success"] is True
# Find entries by name
entries_by_name = {e["name"]: e for e in result["entries"]}
# Files should have size_bytes
assert entries_by_name["small.txt"]["type"] == "file"
assert entries_by_name["small.txt"]["size_bytes"] == 2
assert entries_by_name["larger.txt"]["type"] == "file"
assert entries_by_name["larger.txt"]["size_bytes"] == 11
# Directories should have None for size_bytes
assert entries_by_name["subdir"]["type"] == "directory"
assert entries_by_name["subdir"]["size_bytes"] is None
class TestReplaceFileContentTool:
"""Tests for replace_file_content tool."""
@pytest.fixture
def replace_file_content_fn(self, mcp):
from aden_tools.tools.file_system_toolkits.replace_file_content import register_tools
register_tools(mcp)
return mcp._tool_manager._tools["replace_file_content"].fn
def test_replace_content(self, replace_file_content_fn, mock_workspace, mock_secure_path, tmp_path):
"""Replacing content in a file works correctly."""
test_file = tmp_path / "replace_test.txt"
test_file.write_text("Hello World! Hello again!")
result = replace_file_content_fn(
path="replace_test.txt",
target="Hello",
replacement="Hi",
**mock_workspace
)
assert result["success"] is True
assert result["occurrences_replaced"] == 2
assert test_file.read_text() == "Hi World! Hi again!"
def test_replace_target_not_found(self, replace_file_content_fn, mock_workspace, mock_secure_path, tmp_path):
"""Replacing non-existent target returns error."""
test_file = tmp_path / "test.txt"
test_file.write_text("Hello World")
result = replace_file_content_fn(
path="test.txt",
target="nonexistent",
replacement="new",
**mock_workspace
)
assert "error" in result
assert "not found" in result["error"].lower()
def test_replace_file_not_found(self, replace_file_content_fn, mock_workspace, mock_secure_path):
"""Replacing content in non-existent file returns error."""
result = replace_file_content_fn(
path="nonexistent.txt",
target="foo",
replacement="bar",
**mock_workspace
)
assert "error" in result
assert "not found" in result["error"].lower()
def test_replace_single_occurrence(self, replace_file_content_fn, mock_workspace, mock_secure_path, tmp_path):
"""Replacing content with single occurrence works correctly."""
test_file = tmp_path / "single.txt"
test_file.write_text("Hello World")
result = replace_file_content_fn(
path="single.txt",
target="Hello",
replacement="Hi",
**mock_workspace
)
assert result["success"] is True
assert result["occurrences_replaced"] == 1
assert test_file.read_text() == "Hi World"
def test_replace_multiline_content(self, replace_file_content_fn, mock_workspace, mock_secure_path, tmp_path):
"""Replacing content across multiple lines works correctly."""
test_file = tmp_path / "multiline.txt"
test_file.write_text("Line 1\nTODO: fix this\nLine 3\nTODO: add tests\n")
result = replace_file_content_fn(
path="multiline.txt",
target="TODO:",
replacement="DONE:",
**mock_workspace
)
assert result["success"] is True
assert result["occurrences_replaced"] == 2
assert test_file.read_text() == "Line 1\nDONE: fix this\nLine 3\nDONE: add tests\n"
class TestGrepSearchTool:
"""Tests for grep_search tool."""
@pytest.fixture
def grep_search_fn(self, mcp):
from aden_tools.tools.file_system_toolkits.grep_search import register_tools
register_tools(mcp)
return mcp._tool_manager._tools["grep_search"].fn
def test_grep_search_single_file(self, grep_search_fn, mock_workspace, mock_secure_path, tmp_path):
"""Searching a single file returns matches."""
test_file = tmp_path / "search_test.txt"
test_file.write_text("Line 1\nLine 2 with pattern\nLine 3")
result = grep_search_fn(
path="search_test.txt",
pattern="pattern",
**mock_workspace
)
assert result["success"] is True
assert result["total_matches"] == 1
assert len(result["matches"]) == 1
assert result["matches"][0]["line_number"] == 2
assert "pattern" in result["matches"][0]["line_content"]
def test_grep_search_no_matches(self, grep_search_fn, mock_workspace, mock_secure_path, tmp_path):
"""Searching with no matches returns empty list."""
test_file = tmp_path / "test.txt"
test_file.write_text("Hello World")
result = grep_search_fn(
path="test.txt",
pattern="nonexistent",
**mock_workspace
)
assert result["success"] is True
assert result["total_matches"] == 0
assert result["matches"] == []
def test_grep_search_directory_non_recursive(self, grep_search_fn, mock_workspace, mock_secure_path, tmp_path):
"""Searching directory non-recursively only searches immediate files."""
# Create files in root
(tmp_path / "file1.txt").write_text("pattern here")
(tmp_path / "file2.txt").write_text("no match here")
# Create nested directory with file
nested = tmp_path / "nested"
nested.mkdir()
(nested / "nested_file.txt").write_text("pattern in nested")
result = grep_search_fn(
path=".",
pattern="pattern",
recursive=False,
**mock_workspace
)
assert result["success"] is True
assert result["total_matches"] == 1 # Only finds pattern in root, not in nested
assert result["recursive"] is False
def test_grep_search_directory_recursive(self, grep_search_fn, mock_workspace, mock_secure_path, tmp_path):
"""Searching directory recursively finds matches in subdirectories."""
# Create files in root
(tmp_path / "file1.txt").write_text("pattern here")
# Create nested directory with file
nested = tmp_path / "nested"
nested.mkdir()
(nested / "nested_file.txt").write_text("pattern in nested")
result = grep_search_fn(
path=".",
pattern="pattern",
recursive=True,
**mock_workspace
)
assert result["success"] is True
assert result["total_matches"] == 2 # Finds pattern in both files
assert result["recursive"] is True
def test_grep_search_regex_pattern(self, grep_search_fn, mock_workspace, mock_secure_path, tmp_path):
"""Searching with regex pattern finds complex matches."""
test_file = tmp_path / "regex_test.txt"
test_file.write_text("foo123bar\nfoo456bar\nbaz789baz\n")
result = grep_search_fn(
path="regex_test.txt",
pattern=r"foo\d+bar",
**mock_workspace
)
assert result["success"] is True
assert result["total_matches"] == 2
assert result["matches"][0]["line_number"] == 1
assert result["matches"][1]["line_number"] == 2
def test_grep_search_multiple_matches_per_line(self, grep_search_fn, mock_workspace, mock_secure_path, tmp_path):
"""Searching returns one match per line even with multiple occurrences."""
test_file = tmp_path / "multi_match.txt"
test_file.write_text("hello hello hello\nworld\nhello again")
result = grep_search_fn(
path="multi_match.txt",
pattern="hello",
**mock_workspace
)
assert result["success"] is True
assert result["total_matches"] == 2 # Line 1 and Line 3
class TestExecuteCommandTool:
"""Tests for execute_command_tool."""
@pytest.fixture
def execute_command_fn(self, mcp):
from aden_tools.tools.file_system_toolkits.execute_command_tool import register_tools
register_tools(mcp)
return mcp._tool_manager._tools["execute_command_tool"].fn
def test_execute_simple_command(self, execute_command_fn, mock_workspace, mock_secure_path):
"""Executing a simple command returns output."""
result = execute_command_fn(
command="echo 'Hello World'",
**mock_workspace
)
assert result["success"] is True
assert result["return_code"] == 0
assert "Hello World" in result["stdout"]
def test_execute_failing_command(self, execute_command_fn, mock_workspace, mock_secure_path):
"""Executing a failing command returns non-zero exit code."""
result = execute_command_fn(
command="exit 1",
**mock_workspace
)
assert result["success"] is True
assert result["return_code"] == 1
def test_execute_command_with_stderr(self, execute_command_fn, mock_workspace, mock_secure_path):
"""Executing a command that writes to stderr captures it."""
result = execute_command_fn(
command="echo 'error message' >&2",
**mock_workspace
)
assert result["success"] is True
assert "error message" in result.get("stderr", "")
def test_execute_command_list_files(self, execute_command_fn, mock_workspace, mock_secure_path, tmp_path):
"""Executing ls command lists files."""
# Create a test file
(tmp_path / "testfile.txt").write_text("content")
result = execute_command_fn(
command=f"ls {tmp_path}",
**mock_workspace
)
assert result["success"] is True
assert result["return_code"] == 0
assert "testfile.txt" in result["stdout"]
def test_execute_command_with_pipe(self, execute_command_fn, mock_workspace, mock_secure_path):
"""Executing a command with pipe works correctly."""
result = execute_command_fn(
command="echo 'hello world' | tr 'a-z' 'A-Z'",
**mock_workspace
)
assert result["success"] is True
assert result["return_code"] == 0
assert "HELLO WORLD" in result["stdout"]
class TestApplyDiffTool:
"""Tests for apply_diff tool."""
@pytest.fixture
def apply_diff_fn(self, mcp):
from aden_tools.tools.file_system_toolkits.apply_diff import register_tools
register_tools(mcp)
return mcp._tool_manager._tools["apply_diff"].fn
def test_apply_diff_file_not_found(self, apply_diff_fn, mock_workspace, mock_secure_path):
"""Applying diff to non-existent file returns error."""
result = apply_diff_fn(
path="nonexistent.txt",
diff_text="some diff",
**mock_workspace
)
assert "error" in result
assert "not found" in result["error"].lower()
def test_apply_diff_successful(self, apply_diff_fn, mock_workspace, mock_secure_path, tmp_path):
"""Applying a valid diff successfully modifies the file."""
test_file = tmp_path / "diff_test.txt"
test_file.write_text("Hello World")
# Create a simple diff using diff_match_patch format
import diff_match_patch as dmp_module
dmp = dmp_module.diff_match_patch()
patches = dmp.patch_make("Hello World", "Hello Universe")
diff_text = dmp.patch_toText(patches)
result = apply_diff_fn(
path="diff_test.txt",
diff_text=diff_text,
**mock_workspace
)
assert result["success"] is True
assert result["all_successful"] is True
assert result["patches_applied"] > 0
assert test_file.read_text() == "Hello Universe"
def test_apply_diff_multiline(self, apply_diff_fn, mock_workspace, mock_secure_path, tmp_path):
"""Applying diff to multiline content works correctly."""
test_file = tmp_path / "multiline.txt"
original = "Line 1\nLine 2\nLine 3\n"
test_file.write_text(original)
import diff_match_patch as dmp_module
dmp = dmp_module.diff_match_patch()
modified = "Line 1\nModified Line 2\nLine 3\n"
patches = dmp.patch_make(original, modified)
diff_text = dmp.patch_toText(patches)
result = apply_diff_fn(
path="multiline.txt",
diff_text=diff_text,
**mock_workspace
)
assert result["success"] is True
assert result["all_successful"] is True
assert test_file.read_text() == modified
def test_apply_diff_invalid_patch(self, apply_diff_fn, mock_workspace, mock_secure_path, tmp_path):
"""Applying an invalid diff handles gracefully."""
test_file = tmp_path / "test.txt"
original_content = "Original content"
test_file.write_text(original_content)
# Invalid diff text
result = apply_diff_fn(
path="test.txt",
diff_text="invalid diff format",
**mock_workspace
)
# Should either error or show no patches applied
if "error" not in result:
assert result.get("patches_applied", 0) == 0
# File should remain unchanged
assert test_file.read_text() == original_content
class TestApplyPatchTool:
"""Tests for apply_patch tool."""
@pytest.fixture
def apply_patch_fn(self, mcp):
from aden_tools.tools.file_system_toolkits.apply_patch import register_tools
register_tools(mcp)
return mcp._tool_manager._tools["apply_patch"].fn
def test_apply_patch_file_not_found(self, apply_patch_fn, mock_workspace, mock_secure_path):
"""Applying patch to non-existent file returns error."""
result = apply_patch_fn(
path="nonexistent.txt",
patch_text="some patch",
**mock_workspace
)
assert "error" in result
assert "not found" in result["error"].lower()
def test_apply_patch_successful(self, apply_patch_fn, mock_workspace, mock_secure_path, tmp_path):
"""Applying a valid patch successfully modifies the file."""
test_file = tmp_path / "patch_test.txt"
test_file.write_text("Hello World")
# Create a simple patch using diff_match_patch format
import diff_match_patch as dmp_module
dmp = dmp_module.diff_match_patch()
patches = dmp.patch_make("Hello World", "Hello Python")
patch_text = dmp.patch_toText(patches)
result = apply_patch_fn(
path="patch_test.txt",
patch_text=patch_text,
**mock_workspace
)
assert result["success"] is True
assert result["all_successful"] is True
assert result["patches_applied"] > 0
assert test_file.read_text() == "Hello Python"
def test_apply_patch_multiline(self, apply_patch_fn, mock_workspace, mock_secure_path, tmp_path):
"""Applying patch to multiline content works correctly."""
test_file = tmp_path / "multiline.txt"
original = "Line 1\nLine 2\nLine 3\n"
test_file.write_text(original)
import diff_match_patch as dmp_module
dmp = dmp_module.diff_match_patch()
modified = "Line 1\nModified Line 2\nLine 3\n"
patches = dmp.patch_make(original, modified)
patch_text = dmp.patch_toText(patches)
result = apply_patch_fn(
path="multiline.txt",
patch_text=patch_text,
**mock_workspace
)
assert result["success"] is True
assert result["all_successful"] is True
assert test_file.read_text() == modified
def test_apply_patch_invalid_patch(self, apply_patch_fn, mock_workspace, mock_secure_path, tmp_path):
"""Applying an invalid patch handles gracefully."""
test_file = tmp_path / "test.txt"
original_content = "Original content"
test_file.write_text(original_content)
# Invalid patch text
result = apply_patch_fn(
path="test.txt",
patch_text="invalid patch format",
**mock_workspace
)
# Should either error or show no patches applied
if "error" not in result:
assert result.get("patches_applied", 0) == 0
# File should remain unchanged
assert test_file.read_text() == original_content
def test_apply_patch_multiple_changes(self, apply_patch_fn, mock_workspace, mock_secure_path, tmp_path):
"""Applying patch with multiple changes works correctly."""
test_file = tmp_path / "complex.txt"
original = "Function foo() {\n return 42;\n}\n"
test_file.write_text(original)
import diff_match_patch as dmp_module
dmp = dmp_module.diff_match_patch()
modified = "Function bar() {\n return 100;\n}\n"
patches = dmp.patch_make(original, modified)
patch_text = dmp.patch_toText(patches)
result = apply_patch_fn(
path="complex.txt",
patch_text=patch_text,
**mock_workspace
)
assert result["success"] is True
assert result["all_successful"] is True
assert test_file.read_text() == modified
+5
View File
@@ -4,6 +4,11 @@
"command": "python",
"args": ["-m", "framework.mcp.agent_builder_server"],
"cwd": "/home/timothy/oss/hive/core"
},
"aden-tools": {
"command": "python",
"args": ["-m", "aden_tools.mcp_server", "--stdio"],
"cwd": "/home/timothy/oss/hive/aden-tools"
}
}
}
+2
View File
@@ -144,6 +144,7 @@ services:
- BRAVE_SEARCH_API_KEY=${BRAVE_SEARCH_API_KEY:-}
volumes:
- .:/workspace:rw # Mount project root for file access
- aden_tools_workspaces:/app/workdir/workspaces # Persist file system tool workspaces
working_dir: /workspace # Set working directory so relative paths work
command: ["python", "/app/mcp_server.py"] # Use absolute path since working_dir changed
healthcheck:
@@ -164,3 +165,4 @@ volumes:
timescaledb_data:
mongodb_data:
redis_data:
aden_tools_workspaces: