feat: file sys tools

This commit is contained in:
Richard T
2026-01-19 20:08:34 -08:00
parent e362262a24
commit ed82e75799
26 changed files with 1633 additions and 0 deletions
@@ -20,6 +20,16 @@ from .web_search_tool import register_tools as register_web_search
from .web_scrape_tool import register_tools as register_web_scrape
from .pdf_read_tool import register_tools as register_pdf_read
# Import file system toolkits
from .file_system_toolkits.view_file import register_tools as register_view_file
from .file_system_toolkits.write_to_file import register_tools as register_write_to_file
from .file_system_toolkits.list_dir import register_tools as register_list_dir
from .file_system_toolkits.replace_file_content import register_tools as register_replace_file_content
from .file_system_toolkits.apply_diff import register_tools as register_apply_diff
from .file_system_toolkits.apply_patch import register_tools as register_apply_patch
from .file_system_toolkits.grep_search import register_tools as register_grep_search
from .file_system_toolkits.execute_command_tool import register_tools as register_execute_command
def register_all_tools(mcp: FastMCP) -> List[str]:
"""
@@ -38,6 +48,16 @@ def register_all_tools(mcp: FastMCP) -> List[str]:
register_web_scrape(mcp)
register_pdf_read(mcp)
# Register file system toolkits
register_view_file(mcp)
register_write_to_file(mcp)
register_list_dir(mcp)
register_replace_file_content(mcp)
register_apply_diff(mcp)
register_apply_patch(mcp)
register_grep_search(mcp)
register_execute_command(mcp)
return [
"example_tool",
"file_read",
@@ -45,6 +65,14 @@ def register_all_tools(mcp: FastMCP) -> List[str]:
"web_search",
"web_scrape",
"pdf_read",
"view_file",
"write_to_file",
"list_dir",
"replace_file_content",
"apply_diff",
"apply_patch",
"grep_search",
"execute_command_tool",
]
@@ -0,0 +1,109 @@
# Apply Diff Tool
Applies a unified diff patch to a file within the secure session sandbox.
## Description
The `apply_diff` tool applies structured diff patches to files, enabling precise modifications using the diff-match-patch algorithm. It can apply multiple patches in a single operation and reports success status for each patch.
## Use Cases
- Applying code review suggestions
- Implementing automated refactoring
- Synchronizing file changes from version control
- Making precise, contextual file modifications
## Usage
```python
apply_diff(
path="src/main.py",
diff_text="@@ -1,3 +1,3 @@\n import os\n-import sys\n+import json\n from typing import List",
workspace_id="workspace-123",
agent_id="agent-456",
session_id="session-789"
)
```
## Arguments
| Argument | Type | Required | Default | Description |
|----------|------|----------|---------|-------------|
| `path` | str | Yes | - | The path to the file (relative to session root) |
| `diff_text` | str | Yes | - | The diff patch text to apply |
| `workspace_id` | str | Yes | - | The ID of the workspace |
| `agent_id` | str | Yes | - | The ID of the agent |
| `session_id` | str | Yes | - | The ID of the current session |
## Returns
Returns a dictionary with the following structure:
**Success (all patches applied):**
```python
{
"success": True,
"path": "src/main.py",
"patches_applied": 3,
"all_successful": True
}
```
**Partial success (some patches failed):**
```python
{
"success": False,
"path": "src/main.py",
"patches_applied": 2,
"patches_failed": 1,
"error": "Failed to apply 1 of 3 patches"
}
```
**Error:**
```python
{
"error": "File not found at src/main.py"
}
```
## Error Handling
- Returns an error dict if the file doesn't exist
- Returns partial success if some patches fail to apply
- Returns an error dict if the diff text is malformed
- Uses diff-match-patch library for intelligent fuzzy matching
## Examples
### Applying a single-line change
```python
diff = "@@ -10,1 +10,1 @@\n- old_code()\n+ new_code()"
result = apply_diff(
path="module.py",
diff_text=diff,
workspace_id="ws-1",
agent_id="agent-1",
session_id="session-1"
)
# Returns: {"success": True, "path": "module.py", "patches_applied": 1, "all_successful": True}
```
### Handling patch failures
```python
result = apply_diff(
path="outdated.py",
diff_text="@@ -1,1 +1,1 @@\n-nonexistent line\n+new line",
workspace_id="ws-1",
agent_id="agent-1",
session_id="session-1"
)
# Returns: {"success": False, "path": "outdated.py", "patches_applied": 0, "patches_failed": 1, ...}
```
## Notes
- Uses the diff-match-patch library for patch application
- Supports fuzzy matching for more robust patching
- Patches are applied atomically (all or nothing for file write)
- The file is only modified if at least one patch succeeds
@@ -0,0 +1,3 @@
from .apply_diff import register_tools
__all__ = ["register_tools"]
@@ -0,0 +1,58 @@
import os
import diff_match_patch as dmp_module
from mcp.server.fastmcp import FastMCP
from file_server.security import get_secure_path
def register_tools(mcp: FastMCP) -> None:
"""Register diff application tools with the MCP server."""
@mcp.tool()
def apply_diff(path: str, diff_text: str, workspace_id: str, agent_id: str, session_id: str) -> dict:
"""
Apply a diff to a file within the session sandbox.
Use this when you need to apply structured diff patches to modify file content.
Args:
path: The path to the file (relative to session root)
diff_text: The diff patch text to apply
workspace_id: The ID of the workspace
agent_id: The ID of the agent
session_id: The ID of the current session
Returns:
Dict with application status and patch results, or error dict
"""
try:
secure_path = get_secure_path(path, workspace_id, agent_id, session_id)
if not os.path.exists(secure_path):
return {"error": f"File not found at {path}"}
dmp = dmp_module.diff_match_patch()
patches = dmp.patch_fromText(diff_text)
with open(secure_path, "r", encoding="utf-8") as f:
content = f.read()
new_content, results = dmp.patch_apply(patches, content)
if all(results):
with open(secure_path, "w", encoding="utf-8") as f:
f.write(new_content)
return {
"success": True,
"path": path,
"patches_applied": len(patches),
"all_successful": True
}
else:
failed_count = sum(1 for r in results if not r)
return {
"success": False,
"path": path,
"patches_applied": len([r for r in results if r]),
"patches_failed": failed_count,
"error": f"Failed to apply {failed_count} of {len(patches)} patches"
}
except Exception as e:
return {"error": f"Failed to apply diff: {str(e)}"}
@@ -0,0 +1,97 @@
# Apply Patch Tool
Applies a patch (unified diff) to a file within the secure session sandbox.
## Description
The `apply_patch` tool is an alias for `apply_diff` that applies structured diff patches to files. It provides the same functionality with alternative naming for user preference.
## Use Cases
- Applying code review suggestions
- Implementing automated refactoring
- Synchronizing file changes from version control
- Making precise, contextual file modifications
## Usage
```python
apply_patch(
path="src/main.py",
patch_text="@@ -1,3 +1,3 @@\n import os\n-import sys\n+import json\n from typing import List",
workspace_id="workspace-123",
agent_id="agent-456",
session_id="session-789"
)
```
## Arguments
| Argument | Type | Required | Default | Description |
|----------|------|----------|---------|-------------|
| `path` | str | Yes | - | The path to the file (relative to session root) |
| `patch_text` | str | Yes | - | The patch text to apply |
| `workspace_id` | str | Yes | - | The ID of the workspace |
| `agent_id` | str | Yes | - | The ID of the agent |
| `session_id` | str | Yes | - | The ID of the current session |
## Returns
Returns a dictionary with the following structure:
**Success (all patches applied):**
```python
{
"success": True,
"path": "src/main.py",
"patches_applied": 3,
"all_successful": True
}
```
**Partial success (some patches failed):**
```python
{
"success": False,
"path": "src/main.py",
"patches_applied": 2,
"patches_failed": 1,
"error": "Failed to apply 1 of 3 patches"
}
```
**Error:**
```python
{
"error": "File not found at src/main.py"
}
```
## Error Handling
- Returns an error dict if the file doesn't exist
- Returns partial success if some patches fail to apply
- Returns an error dict if the patch text is malformed
- Uses diff-match-patch library for intelligent fuzzy matching
## Examples
### Applying a patch
```python
patch = "@@ -10,1 +10,1 @@\n- old_code()\n+ new_code()"
result = apply_patch(
path="module.py",
patch_text=patch,
workspace_id="ws-1",
agent_id="agent-1",
session_id="session-1"
)
# Returns: {"success": True, "path": "module.py", "patches_applied": 1, "all_successful": True}
```
## Notes
- This is an alias for the `apply_diff` tool with identical functionality
- Uses the diff-match-patch library for patch application
- Supports fuzzy matching for more robust patching
- The implementation is duplicated for atomic isolation (not a simple function call)
@@ -0,0 +1,3 @@
from .apply_patch import register_tools
__all__ = ["register_tools"]
@@ -0,0 +1,60 @@
import os
import diff_match_patch as dmp_module
from mcp.server.fastmcp import FastMCP
from file_server.security import get_secure_path
def register_tools(mcp: FastMCP) -> None:
"""Register patch application tools with the MCP server."""
@mcp.tool()
def apply_patch(path: str, patch_text: str, workspace_id: str, agent_id: str, session_id: str) -> dict:
"""
Apply a patch to a file within the session sandbox.
Use this when you need to apply patch-formatted changes to a file.
This is an alias for apply_diff with the same functionality.
Args:
path: The path to the file (relative to session root)
patch_text: The patch text to apply
workspace_id: The ID of the workspace
agent_id: The ID of the agent
session_id: The ID of the current session
Returns:
Dict with application status and patch results, or error dict
"""
# Logic duplicated from apply_diff for atomic isolation
try:
secure_path = get_secure_path(path, workspace_id, agent_id, session_id)
if not os.path.exists(secure_path):
return {"error": f"File not found at {path}"}
dmp = dmp_module.diff_match_patch()
patches = dmp.patch_fromText(patch_text)
with open(secure_path, "r", encoding="utf-8") as f:
content = f.read()
new_content, results = dmp.patch_apply(patches, content)
if all(results):
with open(secure_path, "w", encoding="utf-8") as f:
f.write(new_content)
return {
"success": True,
"path": path,
"patches_applied": len(patches),
"all_successful": True
}
else:
failed_count = sum(1 for r in results if not r)
return {
"success": False,
"path": path,
"patches_applied": len([r for r in results if r]),
"patches_failed": failed_count,
"error": f"Failed to apply {failed_count} of {len(patches)} patches"
}
except Exception as e:
return {"error": f"Failed to apply patch: {str(e)}"}
@@ -0,0 +1,152 @@
# Execute Command Tool
Executes shell commands within the secure session sandbox.
## Description
The `execute_command_tool` allows you to run arbitrary shell commands in a sandboxed environment. Commands are executed with a 60-second timeout and capture both stdout and stderr output.
## Use Cases
- Running build commands (npm build, make, etc.)
- Executing tests
- Running linters or formatters
- Performing git operations
- Installing dependencies
## Usage
```python
execute_command_tool(
command="npm install",
workspace_id="workspace-123",
agent_id="agent-456",
session_id="session-789",
cwd="project"
)
```
## Arguments
| Argument | Type | Required | Default | Description |
|----------|------|----------|---------|-------------|
| `command` | str | Yes | - | The shell command to execute |
| `workspace_id` | str | Yes | - | The ID of the workspace |
| `agent_id` | str | Yes | - | The ID of the agent |
| `session_id` | str | Yes | - | The ID of the current session |
| `cwd` | str | No | "." | The working directory for the command (relative to session root) |
## Returns
Returns a dictionary with the following structure:
**Success:**
```python
{
"success": True,
"command": "npm install",
"return_code": 0,
"stdout": "added 42 packages in 3s",
"stderr": "",
"cwd": "project"
}
```
**Command failure (non-zero exit):**
```python
{
"success": True, # Command executed successfully, but exited with error code
"command": "npm test",
"return_code": 1,
"stdout": "",
"stderr": "Error: Tests failed",
"cwd": "."
}
```
**Timeout:**
```python
{
"error": "Command timed out after 60 seconds"
}
```
**Error:**
```python
{
"error": "Failed to execute command: [error message]"
}
```
## Error Handling
- Returns an error dict if the command times out (60 second limit)
- Returns an error dict if the command cannot be executed
- Returns success with non-zero return_code if command runs but fails
- Commands are executed in a sandboxed session environment
- Working directory defaults to session root if not specified
## Security Considerations
- Commands are executed within the session sandbox only
- File access is restricted to the session directory
- Network access depends on sandbox configuration
- Commands run with the permissions of the session user
- Use with caution as shell injection is possible
## Examples
### Running a build command
```python
result = execute_command_tool(
command="npm run build",
workspace_id="ws-1",
agent_id="agent-1",
session_id="session-1",
cwd="frontend"
)
# Returns: {"success": True, "return_code": 0, "stdout": "Build complete", ...}
```
### Running tests with output
```python
result = execute_command_tool(
command="pytest -v",
workspace_id="ws-1",
agent_id="agent-1",
session_id="session-1"
)
# Returns: {"success": True, "return_code": 0, "stdout": "test output...", "stderr": ""}
```
### Handling command failures
```python
result = execute_command_tool(
command="nonexistent-command",
workspace_id="ws-1",
agent_id="agent-1",
session_id="session-1"
)
# Returns: {"success": True, "return_code": 127, "stderr": "command not found", ...}
```
### Running git commands
```python
result = execute_command_tool(
command="git status",
workspace_id="ws-1",
agent_id="agent-1",
session_id="session-1",
cwd="repo"
)
# Returns: {"success": True, "return_code": 0, "stdout": "On branch main...", ...}
```
## Notes
- 60-second timeout for all commands
- Commands are executed using shell=True (supports pipes, redirects, etc.)
- Both stdout and stderr are captured separately
- Return code 0 typically indicates success
- Working directory is created if it doesn't exist
- Command output is returned as text (UTF-8 encoding)
@@ -0,0 +1,3 @@
from .execute_command_tool import register_tools
__all__ = ["register_tools"]
@@ -0,0 +1,58 @@
import os
import subprocess
from typing import Optional
from mcp.server.fastmcp import FastMCP
from file_server.security import get_secure_path, WORKSPACES_DIR
def register_tools(mcp: FastMCP) -> None:
"""Register command execution tools with the MCP server."""
@mcp.tool()
def execute_command_tool(command: str, workspace_id: str, agent_id: str, session_id: str, cwd: Optional[str] = None) -> dict:
"""
Execute a shell command within the session sandbox.
Use this when you need to run shell commands safely within the sandboxed environment.
Commands are executed with a 60-second timeout.
Args:
command: The shell command to execute
workspace_id: The ID of the workspace
agent_id: The ID of the agent
session_id: The ID of the current session
cwd: The working directory for the command (relative to session root, optional)
Returns:
Dict with command output and execution details, or error dict
"""
try:
# Default cwd is the session root
session_root = os.path.join(WORKSPACES_DIR, workspace_id, agent_id, session_id)
os.makedirs(session_root, exist_ok=True)
if cwd:
secure_cwd = get_secure_path(cwd, workspace_id, agent_id, session_id)
else:
secure_cwd = session_root
result = subprocess.run(
command,
shell=True,
cwd=secure_cwd,
capture_output=True,
text=True,
timeout=60
)
return {
"success": True,
"command": command,
"return_code": result.returncode,
"stdout": result.stdout,
"stderr": result.stderr,
"cwd": cwd or "."
}
except subprocess.TimeoutExpired:
return {"error": "Command timed out after 60 seconds"}
except Exception as e:
return {"error": f"Failed to execute command: {str(e)}"}
@@ -0,0 +1,140 @@
# Grep Search Tool
Searches for regex patterns in files or directories within the secure session sandbox.
## Description
The `grep_search` tool provides powerful pattern matching capabilities across files and directories. It uses Python's regex engine to find matches and returns detailed results including file paths, line numbers, and matched content.
## Use Cases
- Finding function or variable definitions
- Searching for TODO comments or specific patterns
- Analyzing code for security issues or patterns
- Locating configuration values across multiple files
## Usage
```python
grep_search(
path="src",
pattern="def \\w+\\(",
workspace_id="workspace-123",
agent_id="agent-456",
session_id="session-789",
recursive=True
)
```
## Arguments
| Argument | Type | Required | Default | Description |
|----------|------|----------|---------|-------------|
| `path` | str | Yes | - | The path to search in (file or directory, relative to session root) |
| `pattern` | str | Yes | - | The regex pattern to search for |
| `workspace_id` | str | Yes | - | The ID of the workspace |
| `agent_id` | str | Yes | - | The ID of the agent |
| `session_id` | str | Yes | - | The ID of the current session |
| `recursive` | bool | No | False | Whether to search recursively in subdirectories |
## Returns
Returns a dictionary with the following structure:
**Success:**
```python
{
"success": True,
"pattern": "def \\w+\\(",
"path": "src",
"recursive": True,
"matches": [
{
"file": "src/main.py",
"line_number": 10,
"line_content": "def process_data(args):"
},
{
"file": "src/utils.py",
"line_number": 5,
"line_content": "def helper_function():"
}
],
"total_matches": 2
}
```
**No matches:**
```python
{
"success": True,
"pattern": "nonexistent",
"path": "src",
"recursive": False,
"matches": [],
"total_matches": 0
}
```
**Error:**
```python
{
"error": "Failed to perform grep search: [error message]"
}
```
## Error Handling
- Returns an error dict if the path doesn't exist
- Skips files that cannot be decoded (binary files, encoding errors)
- Skips files with permission errors
- Returns empty matches list if no matches found
- Handles invalid regex patterns with error message
## Examples
### Searching for function definitions
```python
result = grep_search(
path="src",
pattern="^def ",
workspace_id="ws-1",
agent_id="agent-1",
session_id="session-1",
recursive=True
)
# Returns: {"success": True, "pattern": "^def ", "matches": [...], "total_matches": 15}
```
### Searching a single file
```python
result = grep_search(
path="config.py",
pattern="API_KEY",
workspace_id="ws-1",
agent_id="agent-1",
session_id="session-1"
)
# Returns: {"success": True, "pattern": "API_KEY", "matches": [{...}], "total_matches": 1}
```
### Case-insensitive search using regex flags
```python
result = grep_search(
path="docs",
pattern="(?i)todo",
workspace_id="ws-1",
agent_id="agent-1",
session_id="session-1",
recursive=True
)
# Finds "TODO", "todo", "Todo", etc.
```
## Notes
- Uses Python's `re` module for regex matching
- Binary files and files with encoding errors are automatically skipped
- Line numbers start at 1
- Returned file paths are relative to the session root
- For non-recursive directory searches, only files in the immediate directory are searched
@@ -0,0 +1,3 @@
from .grep_search import register_tools
__all__ = ["register_tools"]
@@ -0,0 +1,70 @@
import os
import re
from mcp.server.fastmcp import FastMCP
from file_server.security import get_secure_path, WORKSPACES_DIR
def register_tools(mcp: FastMCP) -> None:
"""Register grep search tools with the MCP server."""
@mcp.tool()
def grep_search(path: str, pattern: str, workspace_id: str, agent_id: str, session_id: str, recursive: bool = False) -> dict:
"""
Search for a pattern in a file or directory within the session sandbox.
Use this when you need to find specific content or patterns in files using regex.
Set recursive=True to search through all subdirectories.
Args:
path: The path to search in (file or directory, relative to session root)
pattern: The regex pattern to search for
workspace_id: The ID of the workspace
agent_id: The ID of the agent
session_id: The ID of the current session
recursive: Whether to search recursively in directories (default: False)
Returns:
Dict with search results and match details, or error dict
"""
try:
secure_path = get_secure_path(path, workspace_id, agent_id, session_id)
# Use session dir root for relative path calculations
session_root = os.path.join(WORKSPACES_DIR, workspace_id, agent_id, session_id)
matches = []
regex = re.compile(pattern)
if os.path.isfile(secure_path):
files = [secure_path]
elif recursive:
files = []
for root, _, filenames in os.walk(secure_path):
for filename in filenames:
files.append(os.path.join(root, filename))
else:
files = [os.path.join(secure_path, f) for f in os.listdir(secure_path) if os.path.isfile(os.path.join(secure_path, f))]
for file_path in files:
# Calculate relative path for display
display_path = os.path.relpath(file_path, session_root)
try:
with open(file_path, "r", encoding="utf-8") as f:
for i, line in enumerate(f, 1):
if regex.search(line):
matches.append({
"file": display_path,
"line_number": i,
"line_content": line.strip()
})
except (UnicodeDecodeError, PermissionError):
continue
return {
"success": True,
"pattern": pattern,
"path": path,
"recursive": recursive,
"matches": matches,
"total_matches": len(matches)
}
except Exception as e:
return {"error": f"Failed to perform grep search: {str(e)}"}
@@ -0,0 +1,88 @@
# List Dir Tool
Lists the contents of a directory within the secure session sandbox.
## Description
The `list_dir` tool allows you to explore directory contents, viewing all files and subdirectories with their metadata. It provides a structured view of the filesystem hierarchy.
## Use Cases
- Exploring project structure
- Finding specific files
- Checking for file existence
- Understanding directory organization
## Usage
```python
list_dir(
path="src",
workspace_id="workspace-123",
agent_id="agent-456",
session_id="session-789"
)
```
## Arguments
| Argument | Type | Required | Default | Description |
|----------|------|----------|---------|-------------|
| `path` | str | Yes | - | The directory path (relative to session root) |
| `workspace_id` | str | Yes | - | The ID of the workspace |
| `agent_id` | str | Yes | - | The ID of the agent |
| `session_id` | str | Yes | - | The ID of the current session |
## Returns
Returns a dictionary with the following structure:
**Success:**
```python
{
"success": True,
"path": "src",
"entries": [
{"name": "main.py", "type": "file", "size_bytes": 1024},
{"name": "utils", "type": "directory", "size_bytes": null}
],
"total_count": 2
}
```
**Error:**
```python
{
"error": "Directory not found at src"
}
```
## Error Handling
- Returns an error dict if the directory doesn't exist
- Returns an error dict if the path points to a file instead of a directory
- Returns an error dict if the directory cannot be read (permission issues, etc.)
## Examples
### Listing directory contents
```python
result = list_dir(
path=".",
workspace_id="ws-1",
agent_id="agent-1",
session_id="session-1"
)
# Returns: {"success": True, "path": ".", "entries": [...], "total_count": 5}
```
### Checking an empty directory
```python
result = list_dir(
path="empty_folder",
workspace_id="ws-1",
agent_id="agent-1",
session_id="session-1"
)
# Returns: {"success": True, "path": "empty_folder", "entries": [], "total_count": 0}
```
@@ -0,0 +1,3 @@
from .list_dir import register_tools
__all__ = ["register_tools"]
@@ -0,0 +1,49 @@
import os
from mcp.server.fastmcp import FastMCP
from file_server.security import get_secure_path
def register_tools(mcp: FastMCP) -> None:
"""Register directory listing tools with the MCP server."""
@mcp.tool()
def list_dir(path: str, workspace_id: str, agent_id: str, session_id: str) -> dict:
"""
List the contents of a directory within the session sandbox.
Use this when you need to explore directory contents and see what files
and subdirectories exist.
Args:
path: The directory path (relative to session root)
workspace_id: The ID of the workspace
agent_id: The ID of the agent
session_id: The ID of the current session
Returns:
Dict with directory contents and metadata, or error dict
"""
try:
secure_path = get_secure_path(path, workspace_id, agent_id, session_id)
if not os.path.exists(secure_path):
return {"error": f"Directory not found at {path}"}
items = os.listdir(secure_path)
entries = []
for item in items:
full_path = os.path.join(secure_path, item)
is_dir = os.path.isdir(full_path)
entry = {
"name": item,
"type": "directory" if is_dir else "file",
"size_bytes": os.path.getsize(full_path) if not is_dir else None
}
entries.append(entry)
return {
"success": True,
"path": path,
"entries": entries,
"total_count": len(entries)
}
except Exception as e:
return {"error": f"Failed to list directory: {str(e)}"}
@@ -0,0 +1,102 @@
# Replace File Content Tool
Replaces specific string occurrences in a file within the secure session sandbox.
## Description
The `replace_file_content` tool performs find-and-replace operations on file content. It replaces all occurrences of a target string with a replacement string, providing details about the number of replacements made.
## Use Cases
- Updating configuration values
- Refactoring code (renaming variables, functions)
- Batch text replacements
- Updating version numbers or URLs
## Usage
```python
replace_file_content(
path="config/settings.json",
target='"debug": false',
replacement='"debug": true',
workspace_id="workspace-123",
agent_id="agent-456",
session_id="session-789"
)
```
## Arguments
| Argument | Type | Required | Default | Description |
|----------|------|----------|---------|-------------|
| `path` | str | Yes | - | The path to the file (relative to session root) |
| `target` | str | Yes | - | The string to search for and replace |
| `replacement` | str | Yes | - | The string to replace it with |
| `workspace_id` | str | Yes | - | The ID of the workspace |
| `agent_id` | str | Yes | - | The ID of the agent |
| `session_id` | str | Yes | - | The ID of the current session |
## Returns
Returns a dictionary with the following structure:
**Success:**
```python
{
"success": True,
"path": "config/settings.json",
"occurrences_replaced": 3,
"target_length": 15,
"replacement_length": 14
}
```
**Error:**
```python
{
"error": "Target string not found in config/settings.json"
}
```
## Error Handling
- Returns an error dict if the file doesn't exist
- Returns an error dict if the target string is not found in the file
- Returns an error dict if the file cannot be read or written
- All occurrences of the target string are replaced
## Examples
### Replacing a configuration value
```python
result = replace_file_content(
path="app.config",
target="localhost",
replacement="production.example.com",
workspace_id="ws-1",
agent_id="agent-1",
session_id="session-1"
)
# Returns: {"success": True, "path": "app.config", "occurrences_replaced": 2, "target_length": 9, "replacement_length": 23}
```
### Handling missing target string
```python
result = replace_file_content(
path="README.md",
target="nonexistent text",
replacement="new text",
workspace_id="ws-1",
agent_id="agent-1",
session_id="session-1"
)
# Returns: {"error": "Target string not found in README.md"}
```
## Notes
- This operation replaces **all** occurrences of the target string
- The replacement is case-sensitive
- For regex-based replacements, consider using a different tool
- The file is overwritten with the new content
@@ -0,0 +1,3 @@
from .replace_file_content import register_tools
__all__ = ["register_tools"]
@@ -0,0 +1,51 @@
import os
from mcp.server.fastmcp import FastMCP
from file_server.security import get_secure_path
def register_tools(mcp: FastMCP) -> None:
"""Register file content replacement tools with the MCP server."""
@mcp.tool()
def replace_file_content(path: str, target: str, replacement: str, workspace_id: str, agent_id: str, session_id: str) -> dict:
"""
Replace content in a file within the session sandbox.
Use this when you need to perform find-and-replace operations on file content.
All occurrences of the target string will be replaced.
Args:
path: The path to the file (relative to session root)
target: The string to search for and replace
replacement: The string to replace it with
workspace_id: The ID of the workspace
agent_id: The ID of the agent
session_id: The ID of the current session
Returns:
Dict with replacement count and status, or error dict
"""
try:
secure_path = get_secure_path(path, workspace_id, agent_id, session_id)
if not os.path.exists(secure_path):
return {"error": f"File not found at {path}"}
with open(secure_path, "r", encoding="utf-8") as f:
content = f.read()
if target not in content:
return {"error": f"Target string not found in {path}"}
occurrences = content.count(target)
new_content = content.replace(target, replacement)
with open(secure_path, "w", encoding="utf-8") as f:
f.write(new_content)
return {
"success": True,
"path": path,
"occurrences_replaced": occurrences,
"target_length": len(target),
"replacement_length": len(replacement)
}
except Exception as e:
return {"error": f"Failed to replace content: {str(e)}"}
@@ -0,0 +1,86 @@
# View File Tool
Reads the content of a file within the secure session sandbox.
## Description
The `view_file` tool allows you to read and retrieve the complete content of files within a sandboxed session environment. It provides metadata about the file along with its content.
## Use Cases
- Reading configuration files
- Viewing source code
- Inspecting log files
- Retrieving data files for processing
## Usage
```python
view_file(
path="config/settings.json",
workspace_id="workspace-123",
agent_id="agent-456",
session_id="session-789"
)
```
## Arguments
| Argument | Type | Required | Default | Description |
|----------|------|----------|---------|-------------|
| `path` | str | Yes | - | The path to the file (relative to session root) |
| `workspace_id` | str | Yes | - | The ID of the workspace |
| `agent_id` | str | Yes | - | The ID of the agent |
| `session_id` | str | Yes | - | The ID of the current session |
## Returns
Returns a dictionary with the following structure:
**Success:**
```python
{
"success": True,
"path": "config/settings.json",
"content": "{\"debug\": true}",
"size_bytes": 16,
"lines": 1
}
```
**Error:**
```python
{
"error": "File not found at config/settings.json"
}
```
## Error Handling
- Returns an error dict if the file doesn't exist
- Returns an error dict if the file cannot be read (permission issues, encoding errors, etc.)
- Handles binary files gracefully by returning appropriate error messages
## Examples
### Reading a text file
```python
result = view_file(
path="README.md",
workspace_id="ws-1",
agent_id="agent-1",
session_id="session-1"
)
# Returns: {"success": True, "path": "README.md", "content": "# My Project\n...", "size_bytes": 1024, "lines": 42}
```
### Handling missing files
```python
result = view_file(
path="nonexistent.txt",
workspace_id="ws-1",
agent_id="agent-1",
session_id="session-1"
)
# Returns: {"error": "File not found at nonexistent.txt"}
```
@@ -0,0 +1,3 @@
from .view_file import register_tools
__all__ = ["register_tools"]
@@ -0,0 +1,40 @@
import os
from mcp.server.fastmcp import FastMCP
from file_server.security import get_secure_path
def register_tools(mcp: FastMCP) -> None:
"""Register file view tools with the MCP server."""
@mcp.tool()
def view_file(path: str, workspace_id: str, agent_id: str, session_id: str) -> dict:
"""
Read the content of a file within the session sandbox.
Use this when you need to view the contents of an existing file.
Args:
path: The path to the file (relative to session root)
workspace_id: The ID of the workspace
agent_id: The ID of the agent
session_id: The ID of the current session
Returns:
Dict with file content and metadata, or error dict
"""
try:
secure_path = get_secure_path(path, workspace_id, agent_id, session_id)
if not os.path.exists(secure_path):
return {"error": f"File not found at {path}"}
with open(secure_path, "r", encoding="utf-8") as f:
content = f.read()
return {
"success": True,
"path": path,
"content": content,
"size_bytes": len(content.encode("utf-8")),
"lines": len(content.splitlines())
}
except Exception as e:
return {"error": f"Failed to read file: {str(e)}"}
@@ -0,0 +1,92 @@
# Write to File Tool
Writes content to a file within the secure session sandbox. Supports both overwriting and appending modes.
## Description
The `write_to_file` tool allows you to create new files or modify existing files within a sandboxed session environment. It automatically creates parent directories if they don't exist and provides flexible write modes.
## Use Cases
- Creating new configuration files
- Writing generated code or data
- Appending logs or output to existing files
- Saving processed results to disk
## Usage
```python
write_to_file(
path="config/settings.json",
content='{"debug": true}',
workspace_id="workspace-123",
agent_id="agent-456",
session_id="session-789",
append=False
)
```
## Arguments
| Argument | Type | Required | Default | Description |
|----------|------|----------|---------|-------------|
| `path` | str | Yes | - | The path to the file (relative to session root) |
| `content` | str | Yes | - | The content to write to the file |
| `workspace_id` | str | Yes | - | The ID of the workspace |
| `agent_id` | str | Yes | - | The ID of the agent |
| `session_id` | str | Yes | - | The ID of the current session |
| `append` | bool | No | False | Whether to append to the file instead of overwriting |
## Returns
Returns a dictionary with the following structure:
**Success:**
```python
{
"success": True,
"path": "config/settings.json",
"mode": "written", # or "appended"
"bytes_written": 18
}
```
**Error:**
```python
{
"error": "Failed to write to file: [error message]"
}
```
## Error Handling
- Returns an error dict if the file cannot be written (permission issues, invalid path, etc.)
- Automatically creates parent directories if they don't exist
- Handles encoding errors gracefully
## Examples
### Creating a new file
```python
result = write_to_file(
path="data/output.txt",
content="Hello, world!",
workspace_id="ws-1",
agent_id="agent-1",
session_id="session-1"
)
# Returns: {"success": True, "path": "data/output.txt", "mode": "written", "bytes_written": 13}
```
### Appending to a file
```python
result = write_to_file(
path="logs/activity.log",
content="\n[INFO] Task completed",
workspace_id="ws-1",
agent_id="agent-1",
session_id="session-1",
append=True
)
# Returns: {"success": True, "path": "logs/activity.log", "mode": "appended", "bytes_written": 24}
```
@@ -0,0 +1,3 @@
from .write_to_file import register_tools
__all__ = ["register_tools"]
@@ -0,0 +1,40 @@
import os
from mcp.server.fastmcp import FastMCP
from file_server.security import get_secure_path
def register_tools(mcp: FastMCP) -> None:
"""Register file write tools with the MCP server."""
@mcp.tool()
def write_to_file(path: str, content: str, workspace_id: str, agent_id: str, session_id: str, append: bool = False) -> dict:
"""
Write content to a file within the session sandbox.
Use this when you need to create a new file or overwrite an existing file.
Set append=True to add content to the end of an existing file.
Args:
path: The path to the file (relative to session root)
content: The content to write to the file
workspace_id: The ID of the workspace
agent_id: The ID of the agent
session_id: The ID of the current session
append: Whether to append to the file instead of overwriting (default: False)
Returns:
Dict with success status and path, or error dict
"""
try:
secure_path = get_secure_path(path, workspace_id, agent_id, session_id)
os.makedirs(os.path.dirname(secure_path), exist_ok=True)
mode = "a" if append else "w"
with open(secure_path, mode, encoding="utf-8") as f:
f.write(content)
return {
"success": True,
"path": path,
"mode": "appended" if append else "written",
"bytes_written": len(content.encode("utf-8"))
}
except Exception as e:
return {"error": f"Failed to write to file: {str(e)}"}
@@ -0,0 +1,289 @@
"""Tests for file_system_toolkits tools (FastMCP)."""
import os
import pytest
from pathlib import Path
from unittest.mock import Mock, patch
from fastmcp import FastMCP
@pytest.fixture
def mcp():
"""Create a FastMCP instance."""
return FastMCP("test-server")
@pytest.fixture
def mock_workspace():
"""Mock workspace, agent, and session IDs."""
return {
"workspace_id": "test-workspace",
"agent_id": "test-agent",
"session_id": "test-session"
}
@pytest.fixture
def mock_secure_path(tmp_path):
"""Mock get_secure_path to return temp directory paths."""
def _get_secure_path(path, workspace_id, agent_id, session_id):
return os.path.join(tmp_path, path)
with patch("aden_tools.tools.file_system_toolkits.view_file.view_file.get_secure_path", side_effect=_get_secure_path):
with patch("aden_tools.tools.file_system_toolkits.write_to_file.write_to_file.get_secure_path", side_effect=_get_secure_path):
with patch("aden_tools.tools.file_system_toolkits.list_dir.list_dir.get_secure_path", side_effect=_get_secure_path):
with patch("aden_tools.tools.file_system_toolkits.replace_file_content.replace_file_content.get_secure_path", side_effect=_get_secure_path):
with patch("aden_tools.tools.file_system_toolkits.apply_diff.apply_diff.get_secure_path", side_effect=_get_secure_path):
with patch("aden_tools.tools.file_system_toolkits.apply_patch.apply_patch.get_secure_path", side_effect=_get_secure_path):
with patch("aden_tools.tools.file_system_toolkits.grep_search.grep_search.get_secure_path", side_effect=_get_secure_path):
with patch("aden_tools.tools.file_system_toolkits.grep_search.grep_search.WORKSPACES_DIR", str(tmp_path)):
with patch("aden_tools.tools.file_system_toolkits.execute_command_tool.execute_command_tool.get_secure_path", side_effect=_get_secure_path):
with patch("aden_tools.tools.file_system_toolkits.execute_command_tool.execute_command_tool.WORKSPACES_DIR", str(tmp_path)):
yield
class TestViewFileTool:
"""Tests for view_file tool."""
@pytest.fixture
def view_file_fn(self, mcp):
from aden_tools.tools.file_system_toolkits.view_file import register_tools
register_tools(mcp)
return mcp._tool_manager._tools["view_file"].fn
def test_view_existing_file(self, view_file_fn, mock_workspace, mock_secure_path, tmp_path):
"""Viewing an existing file returns content and metadata."""
test_file = tmp_path / "test.txt"
test_file.write_text("Hello, World!")
result = view_file_fn(path="test.txt", **mock_workspace)
assert result["success"] is True
assert result["content"] == "Hello, World!"
assert result["size_bytes"] == len("Hello, World!".encode("utf-8"))
assert result["lines"] == 1
def test_view_nonexistent_file(self, view_file_fn, mock_workspace, mock_secure_path):
"""Viewing a non-existent file returns an error."""
result = view_file_fn(path="nonexistent.txt", **mock_workspace)
assert "error" in result
assert "not found" in result["error"].lower()
class TestWriteToFileTool:
"""Tests for write_to_file tool."""
@pytest.fixture
def write_to_file_fn(self, mcp):
from aden_tools.tools.file_system_toolkits.write_to_file import register_tools
register_tools(mcp)
return mcp._tool_manager._tools["write_to_file"].fn
def test_write_new_file(self, write_to_file_fn, mock_workspace, mock_secure_path, tmp_path):
"""Writing to a new file creates it successfully."""
result = write_to_file_fn(
path="new_file.txt",
content="Test content",
**mock_workspace
)
assert result["success"] is True
assert result["mode"] == "written"
assert result["bytes_written"] > 0
# Verify file was created
created_file = tmp_path / "new_file.txt"
assert created_file.exists()
assert created_file.read_text() == "Test content"
def test_write_append_mode(self, write_to_file_fn, mock_workspace, mock_secure_path, tmp_path):
"""Writing with append=True appends to existing file."""
test_file = tmp_path / "append_test.txt"
test_file.write_text("Line 1\n")
result = write_to_file_fn(
path="append_test.txt",
content="Line 2\n",
append=True,
**mock_workspace
)
assert result["success"] is True
assert result["mode"] == "appended"
assert test_file.read_text() == "Line 1\nLine 2\n"
class TestListDirTool:
"""Tests for list_dir tool."""
@pytest.fixture
def list_dir_fn(self, mcp):
from aden_tools.tools.file_system_toolkits.list_dir import register_tools
register_tools(mcp)
return mcp._tool_manager._tools["list_dir"].fn
def test_list_directory(self, list_dir_fn, mock_workspace, mock_secure_path, tmp_path):
"""Listing a directory returns all entries."""
# Create test files and directories
(tmp_path / "file1.txt").write_text("content")
(tmp_path / "file2.txt").write_text("content")
(tmp_path / "subdir").mkdir()
result = list_dir_fn(path=".", **mock_workspace)
assert result["success"] is True
assert result["total_count"] == 3
assert len(result["entries"]) == 3
# Check that entries have correct structure
for entry in result["entries"]:
assert "name" in entry
assert "type" in entry
assert entry["type"] in ["file", "directory"]
def test_list_empty_directory(self, list_dir_fn, mock_workspace, mock_secure_path, tmp_path):
"""Listing an empty directory returns empty list."""
empty_dir = tmp_path / "empty"
empty_dir.mkdir()
result = list_dir_fn(path="empty", **mock_workspace)
assert result["success"] is True
assert result["total_count"] == 0
assert result["entries"] == []
class TestReplaceFileContentTool:
"""Tests for replace_file_content tool."""
@pytest.fixture
def replace_file_content_fn(self, mcp):
from aden_tools.tools.file_system_toolkits.replace_file_content import register_tools
register_tools(mcp)
return mcp._tool_manager._tools["replace_file_content"].fn
def test_replace_content(self, replace_file_content_fn, mock_workspace, mock_secure_path, tmp_path):
"""Replacing content in a file works correctly."""
test_file = tmp_path / "replace_test.txt"
test_file.write_text("Hello World! Hello again!")
result = replace_file_content_fn(
path="replace_test.txt",
target="Hello",
replacement="Hi",
**mock_workspace
)
assert result["success"] is True
assert result["occurrences_replaced"] == 2
assert test_file.read_text() == "Hi World! Hi again!"
def test_replace_target_not_found(self, replace_file_content_fn, mock_workspace, mock_secure_path, tmp_path):
"""Replacing non-existent target returns error."""
test_file = tmp_path / "test.txt"
test_file.write_text("Hello World")
result = replace_file_content_fn(
path="test.txt",
target="nonexistent",
replacement="new",
**mock_workspace
)
assert "error" in result
assert "not found" in result["error"].lower()
class TestGrepSearchTool:
"""Tests for grep_search tool."""
@pytest.fixture
def grep_search_fn(self, mcp):
from aden_tools.tools.file_system_toolkits.grep_search import register_tools
register_tools(mcp)
return mcp._tool_manager._tools["grep_search"].fn
def test_grep_search_single_file(self, grep_search_fn, mock_workspace, mock_secure_path, tmp_path):
"""Searching a single file returns matches."""
test_file = tmp_path / "search_test.txt"
test_file.write_text("Line 1\nLine 2 with pattern\nLine 3")
result = grep_search_fn(
path="search_test.txt",
pattern="pattern",
**mock_workspace
)
assert result["success"] is True
assert result["total_matches"] == 1
assert len(result["matches"]) == 1
assert result["matches"][0]["line_number"] == 2
assert "pattern" in result["matches"][0]["line_content"]
def test_grep_search_no_matches(self, grep_search_fn, mock_workspace, mock_secure_path, tmp_path):
"""Searching with no matches returns empty list."""
test_file = tmp_path / "test.txt"
test_file.write_text("Hello World")
result = grep_search_fn(
path="test.txt",
pattern="nonexistent",
**mock_workspace
)
assert result["success"] is True
assert result["total_matches"] == 0
assert result["matches"] == []
class TestExecuteCommandTool:
"""Tests for execute_command_tool."""
@pytest.fixture
def execute_command_fn(self, mcp):
from aden_tools.tools.file_system_toolkits.execute_command_tool import register_tools
register_tools(mcp)
return mcp._tool_manager._tools["execute_command_tool"].fn
def test_execute_simple_command(self, execute_command_fn, mock_workspace, mock_secure_path):
"""Executing a simple command returns output."""
result = execute_command_fn(
command="echo 'Hello World'",
**mock_workspace
)
assert result["success"] is True
assert result["return_code"] == 0
assert "Hello World" in result["stdout"]
def test_execute_failing_command(self, execute_command_fn, mock_workspace, mock_secure_path):
"""Executing a failing command returns non-zero exit code."""
result = execute_command_fn(
command="exit 1",
**mock_workspace
)
assert result["success"] is True
assert result["return_code"] == 1
class TestApplyDiffTool:
"""Tests for apply_diff tool."""
@pytest.fixture
def apply_diff_fn(self, mcp):
from aden_tools.tools.file_system_toolkits.apply_diff import register_tools
register_tools(mcp)
return mcp._tool_manager._tools["apply_diff"].fn
def test_apply_diff_file_not_found(self, apply_diff_fn, mock_workspace, mock_secure_path):
"""Applying diff to non-existent file returns error."""
result = apply_diff_fn(
path="nonexistent.txt",
diff_text="some diff",
**mock_workspace
)
assert "error" in result
assert "not found" in result["error"].lower()