refactor: Remove file read and write tools and update the workspace directory to use the user's home directory.

This commit is contained in:
Richard T
2026-01-21 19:19:48 -08:00
parent 1c9bbd7b02
commit 406ad7924c
10 changed files with 2 additions and 425 deletions
@@ -14,8 +14,6 @@ from fastmcp import FastMCP
# Import register_tools from each tool module
from .example_tool import register_tools as register_example
from .file_read_tool import register_tools as register_file_read
from .file_write_tool import register_tools as register_file_write
from .web_search_tool import register_tools as register_web_search
from .web_scrape_tool import register_tools as register_web_scrape
from .pdf_read_tool import register_tools as register_pdf_read
@@ -42,8 +40,6 @@ def register_all_tools(mcp: FastMCP) -> List[str]:
List of registered tool names
"""
register_example(mcp)
register_file_read(mcp)
register_file_write(mcp)
register_web_search(mcp)
register_web_scrape(mcp)
register_pdf_read(mcp)
@@ -60,8 +56,6 @@ def register_all_tools(mcp: FastMCP) -> List[str]:
return [
"example_tool",
"file_read",
"file_write",
"web_search",
"web_scrape",
"pdf_read",
@@ -1,28 +0,0 @@
# File Read Tool
Read contents of local files with encoding support.
## Description
Use for reading configs, data files, source code, logs, or any text file. Returns file content along with path, name, size, and encoding metadata.
## Arguments
| Argument | Type | Required | Default | Description |
|----------|------|----------|---------|-------------|
| `file_path` | str | Yes | - | Path to the file to read (absolute or relative) |
| `encoding` | str | No | `utf-8` | File encoding (utf-8, latin-1, etc.) |
| `max_size` | int | No | `10000000` | Maximum file size to read in bytes (default 10MB) |
## Environment Variables
This tool does not require any environment variables.
## Error Handling
Returns error dicts for common issues:
- `File not found: <path>` - File does not exist
- `Not a file: <path>` - Path points to a directory
- `File too large: <size> bytes (max: <max_size>)` - File exceeds max_size limit
- `Failed to decode file with encoding '<encoding>'` - Wrong encoding specified
- `Permission denied: <path>` - No read access to file
@@ -1,4 +0,0 @@
"""File Read Tool - Read contents of local files."""
from .file_read_tool import register_tools
__all__ = ["register_tools"]
@@ -1,75 +0,0 @@
"""
File Read Tool - Read contents of local files.
Supports reading text files with various encodings.
Returns file content along with metadata.
"""
from __future__ import annotations
from pathlib import Path
from fastmcp import FastMCP
def register_tools(mcp: FastMCP) -> None:
"""Register file read tools with the MCP server."""
@mcp.tool()
def file_read(
file_path: str,
encoding: str = "utf-8",
max_size: int = 10_000_000,
) -> dict:
"""
Read the contents of a local file.
Use for reading configs, data files, source code, logs, or any text file.
Returns file content along with path, name, size, and encoding.
Args:
file_path: Path to the file to read (absolute or relative)
encoding: File encoding (utf-8, latin-1, etc.)
max_size: Maximum file size to read in bytes (default 10MB)
Returns:
Dict with file content and metadata, or error dict
"""
try:
path = Path(file_path).resolve()
# Check if file exists
if not path.exists():
return {"error": f"File not found: {file_path}"}
# Check if it's a file (not directory)
if not path.is_file():
return {"error": f"Not a file: {file_path}"}
# Check file size
file_size = path.stat().st_size
if max_size > 0 and file_size > max_size:
return {
"error": f"File too large: {file_size} bytes (max: {max_size})",
"file_size": file_size,
}
# Read the file
content = path.read_text(encoding=encoding)
return {
"path": str(path),
"name": path.name,
"content": content,
"size": len(content),
"encoding": encoding,
}
except UnicodeDecodeError as e:
return {
"error": f"Failed to decode file with encoding '{encoding}': {str(e)}",
"suggestion": "Try a different encoding like 'latin-1' or 'cp1252'",
}
except PermissionError:
return {"error": f"Permission denied: {file_path}"}
except Exception as e:
return {"error": f"Failed to read file: {str(e)}"}
@@ -1,6 +1,7 @@
import os
WORKSPACES_DIR = os.path.abspath(os.path.join(os.getcwd(), "workdir/workspaces"))
# Use user home directory for workspaces
WORKSPACES_DIR = os.path.expanduser("~/.hive/workdir/workspaces")
def get_secure_path(path: str, workspace_id: str, agent_id: str, session_id: str) -> str:
"""Resolve and verify a path within a 3-layer sandbox (workspace/agent/session)."""
@@ -1,29 +0,0 @@
# File Write Tool
Write content to local files with encoding support.
## Description
Can create new files or overwrite/append to existing ones. Use for saving data, creating configs, writing reports, or exporting results. Optionally creates parent directories if they don't exist.
## Arguments
| Argument | Type | Required | Default | Description |
|----------|------|----------|---------|-------------|
| `file_path` | str | Yes | - | Path to the file to write (absolute or relative) |
| `content` | str | Yes | - | Content to write to the file |
| `encoding` | str | No | `utf-8` | File encoding (utf-8, latin-1, etc.) |
| `mode` | str | No | `write` | Write mode - 'write' (overwrite) or 'append' |
| `create_dirs` | bool | No | `True` | Create parent directories if they don't exist |
## Environment Variables
This tool does not require any environment variables.
## Error Handling
Returns error dicts for common issues:
- `Parent directory does not exist: <path>` - Parent dir missing and create_dirs=False
- `Invalid mode: <mode>. Use 'write' or 'append'.` - Invalid mode specified
- `Permission denied: <path>` - No write access to file/directory
- `OS error writing file: <error>` - Filesystem error
@@ -1,4 +0,0 @@
"""File Write Tool - Create or update local files."""
from .file_write_tool import register_tools
__all__ = ["register_tools"]
@@ -1,83 +0,0 @@
"""
File Write Tool - Create or update local files.
Supports writing text files with various encodings.
Can create directories if they don't exist.
"""
from __future__ import annotations
from pathlib import Path
from fastmcp import FastMCP
def register_tools(mcp: FastMCP) -> None:
"""Register file write tools with the MCP server."""
@mcp.tool()
def file_write(
file_path: str,
content: str,
encoding: str = "utf-8",
mode: str = "write",
create_dirs: bool = True,
) -> dict:
"""
Write content to a local file.
Can create new files or overwrite/append to existing ones.
Use for saving data, creating configs, writing reports, or exporting results.
Args:
file_path: Path to the file to write (absolute or relative)
content: Content to write to the file
encoding: File encoding (utf-8, latin-1, etc.)
mode: Write mode - 'write' (overwrite) or 'append'
create_dirs: Create parent directories if they don't exist
Returns:
Dict with write result or error dict
"""
try:
path = Path(file_path).resolve()
# Create parent directories if requested
if create_dirs:
path.parent.mkdir(parents=True, exist_ok=True)
elif not path.parent.exists():
return {"error": f"Parent directory does not exist: {path.parent}"}
# Determine write mode
if mode == "append":
write_mode = "a"
elif mode == "write":
write_mode = "w"
else:
return {"error": f"Invalid mode: {mode}. Use 'write' or 'append'."}
# Check if we're overwriting
existed = path.exists()
previous_size = path.stat().st_size if existed else 0
# Write the file
with open(path, write_mode, encoding=encoding) as f:
f.write(content)
new_size = path.stat().st_size
return {
"path": str(path),
"name": path.name,
"bytes_written": len(content.encode(encoding)),
"total_size": new_size,
"mode": mode,
"created": not existed,
"previous_size": previous_size if existed else None,
}
except PermissionError:
return {"error": f"Permission denied: {file_path}"}
except OSError as e:
return {"error": f"OS error writing file: {str(e)}"}
except Exception as e:
return {"error": f"Failed to write file: {str(e)}"}
@@ -1,96 +0,0 @@
"""Tests for file_read tool (FastMCP)."""
import pytest
from pathlib import Path
from fastmcp import FastMCP
from aden_tools.tools.file_read_tool import register_tools
@pytest.fixture
def file_read_fn(mcp: FastMCP):
"""Register and return the file_read tool function."""
register_tools(mcp)
# Access the registered tool's function directly
return mcp._tool_manager._tools["file_read"].fn
class TestFileReadTool:
"""Tests for file_read tool."""
def test_read_existing_file(self, file_read_fn, sample_text_file: Path):
"""Reading an existing file returns content and metadata."""
result = file_read_fn(file_path=str(sample_text_file))
assert "error" not in result
assert result["content"] == "Hello, World!\nLine 2\nLine 3"
assert result["name"] == "test.txt"
assert result["encoding"] == "utf-8"
assert "size" in result
def test_read_file_not_found(self, file_read_fn, tmp_path: Path):
"""Reading a non-existent file returns an error dict."""
missing_file = tmp_path / "does_not_exist.txt"
result = file_read_fn(file_path=str(missing_file))
assert "error" in result
assert "not found" in result["error"].lower()
def test_read_directory_returns_error(self, file_read_fn, tmp_path: Path):
"""Reading a directory (not a file) returns an error."""
result = file_read_fn(file_path=str(tmp_path))
assert "error" in result
assert "not a file" in result["error"].lower()
def test_read_file_too_large(self, file_read_fn, tmp_path: Path):
"""Reading a file exceeding max_size returns an error."""
large_file = tmp_path / "large.txt"
large_file.write_text("x" * 1000)
result = file_read_fn(file_path=str(large_file), max_size=100)
assert "error" in result
assert "too large" in result["error"].lower()
assert "file_size" in result
def test_read_with_no_size_limit(self, file_read_fn, tmp_path: Path):
"""Reading with max_size=0 allows any file size."""
large_file = tmp_path / "large.txt"
content = "x" * 100_000
large_file.write_text(content)
# max_size=0 means no limit in the implementation
result = file_read_fn(file_path=str(large_file), max_size=0)
assert "error" not in result
assert result["content"] == content
def test_read_with_different_encoding(self, file_read_fn, tmp_path: Path):
"""Reading with a specific encoding works."""
latin_file = tmp_path / "latin.txt"
# Write bytes directly with latin-1 encoding
latin_file.write_bytes("café".encode("latin-1"))
result = file_read_fn(file_path=str(latin_file), encoding="latin-1")
assert "error" not in result
assert result["content"] == "café"
assert result["encoding"] == "latin-1"
def test_read_with_wrong_encoding_returns_error(self, file_read_fn, tmp_path: Path):
"""Reading with wrong encoding returns helpful error."""
# Create a file with bytes that aren't valid UTF-8
binary_file = tmp_path / "binary.txt"
binary_file.write_bytes(b"\xff\xfe")
result = file_read_fn(file_path=str(binary_file), encoding="utf-8")
assert "error" in result
assert "suggestion" in result
def test_returns_absolute_path(self, file_read_fn, sample_text_file: Path):
"""Result includes the absolute path."""
result = file_read_fn(file_path=str(sample_text_file))
assert result["path"] == str(sample_text_file.resolve())
@@ -1,99 +0,0 @@
"""Tests for file_write tool (FastMCP)."""
import pytest
from pathlib import Path
from fastmcp import FastMCP
from aden_tools.tools.file_write_tool import register_tools
@pytest.fixture
def file_write_fn(mcp: FastMCP):
"""Register and return the file_write tool function."""
register_tools(mcp)
return mcp._tool_manager._tools["file_write"].fn
class TestFileWriteTool:
"""Tests for file_write tool."""
def test_write_creates_new_file(self, file_write_fn, tmp_path: Path):
"""Writing to a new file creates it with content."""
new_file = tmp_path / "new.txt"
result = file_write_fn(file_path=str(new_file), content="Hello, World!")
assert "error" not in result
assert result["created"] is True
assert result["name"] == "new.txt"
assert new_file.read_text() == "Hello, World!"
def test_write_overwrites_existing(self, file_write_fn, tmp_path: Path):
"""Writing to existing file overwrites by default."""
existing = tmp_path / "existing.txt"
existing.write_text("old content")
result = file_write_fn(file_path=str(existing), content="new content")
assert "error" not in result
assert result["created"] is False
assert result["previous_size"] is not None
assert existing.read_text() == "new content"
def test_write_appends_to_existing(self, file_write_fn, tmp_path: Path):
"""Writing with mode='append' adds to existing content."""
existing = tmp_path / "existing.txt"
existing.write_text("line1\n")
result = file_write_fn(file_path=str(existing), content="line2\n", mode="append")
assert "error" not in result
assert result["mode"] == "append"
assert existing.read_text() == "line1\nline2\n"
def test_write_creates_parent_dirs(self, file_write_fn, tmp_path: Path):
"""Writing with create_dirs=True creates missing directories."""
deep_path = tmp_path / "nested" / "dirs" / "file.txt"
result = file_write_fn(file_path=str(deep_path), content="content", create_dirs=True)
assert "error" not in result
assert deep_path.exists()
assert deep_path.read_text() == "content"
def test_write_fails_without_parent_dir(self, file_write_fn, tmp_path: Path):
"""Writing with create_dirs=False fails if parent doesn't exist."""
missing_dir = tmp_path / "missing" / "file.txt"
result = file_write_fn(file_path=str(missing_dir), content="content", create_dirs=False)
assert "error" in result
assert "parent directory" in result["error"].lower()
def test_write_invalid_mode(self, file_write_fn, tmp_path: Path):
"""Writing with invalid mode returns error."""
result = file_write_fn(
file_path=str(tmp_path / "test.txt"),
content="content",
mode="invalid"
)
assert "error" in result
assert "invalid mode" in result["error"].lower()
def test_write_returns_bytes_written(self, file_write_fn, tmp_path: Path):
"""Result includes accurate bytes_written count."""
content = "Hello, World!"
result = file_write_fn(file_path=str(tmp_path / "test.txt"), content=content)
assert result["bytes_written"] == len(content.encode("utf-8"))
def test_write_with_encoding(self, file_write_fn, tmp_path: Path):
"""Writing with specific encoding works."""
file_path = tmp_path / "latin.txt"
result = file_write_fn(file_path=str(file_path), content="café", encoding="latin-1")
assert "error" not in result
# Verify it was written with latin-1 encoding
assert file_path.read_bytes() == "café".encode("latin-1")