Merge pull request #64 from RichardTang-Aden/fix-file-tools-fix
refactor: Remove file read and write tools and update the workspace d…
This commit is contained in:
@@ -14,8 +14,6 @@ from fastmcp import FastMCP
|
||||
|
||||
# Import register_tools from each tool module
|
||||
from .example_tool import register_tools as register_example
|
||||
from .file_read_tool import register_tools as register_file_read
|
||||
from .file_write_tool import register_tools as register_file_write
|
||||
from .web_search_tool import register_tools as register_web_search
|
||||
from .web_scrape_tool import register_tools as register_web_scrape
|
||||
from .pdf_read_tool import register_tools as register_pdf_read
|
||||
@@ -42,8 +40,6 @@ def register_all_tools(mcp: FastMCP) -> List[str]:
|
||||
List of registered tool names
|
||||
"""
|
||||
register_example(mcp)
|
||||
register_file_read(mcp)
|
||||
register_file_write(mcp)
|
||||
register_web_search(mcp)
|
||||
register_web_scrape(mcp)
|
||||
register_pdf_read(mcp)
|
||||
@@ -60,8 +56,6 @@ def register_all_tools(mcp: FastMCP) -> List[str]:
|
||||
|
||||
return [
|
||||
"example_tool",
|
||||
"file_read",
|
||||
"file_write",
|
||||
"web_search",
|
||||
"web_scrape",
|
||||
"pdf_read",
|
||||
|
||||
@@ -1,28 +0,0 @@
|
||||
# File Read Tool
|
||||
|
||||
Read contents of local files with encoding support.
|
||||
|
||||
## Description
|
||||
|
||||
Use for reading configs, data files, source code, logs, or any text file. Returns file content along with path, name, size, and encoding metadata.
|
||||
|
||||
## Arguments
|
||||
|
||||
| Argument | Type | Required | Default | Description |
|
||||
|----------|------|----------|---------|-------------|
|
||||
| `file_path` | str | Yes | - | Path to the file to read (absolute or relative) |
|
||||
| `encoding` | str | No | `utf-8` | File encoding (utf-8, latin-1, etc.) |
|
||||
| `max_size` | int | No | `10000000` | Maximum file size to read in bytes (default 10MB) |
|
||||
|
||||
## Environment Variables
|
||||
|
||||
This tool does not require any environment variables.
|
||||
|
||||
## Error Handling
|
||||
|
||||
Returns error dicts for common issues:
|
||||
- `File not found: <path>` - File does not exist
|
||||
- `Not a file: <path>` - Path points to a directory
|
||||
- `File too large: <size> bytes (max: <max_size>)` - File exceeds max_size limit
|
||||
- `Failed to decode file with encoding '<encoding>'` - Wrong encoding specified
|
||||
- `Permission denied: <path>` - No read access to file
|
||||
@@ -1,4 +0,0 @@
|
||||
"""File Read Tool - Read contents of local files."""
|
||||
from .file_read_tool import register_tools
|
||||
|
||||
__all__ = ["register_tools"]
|
||||
@@ -1,75 +0,0 @@
|
||||
"""
|
||||
File Read Tool - Read contents of local files.
|
||||
|
||||
Supports reading text files with various encodings.
|
||||
Returns file content along with metadata.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
from fastmcp import FastMCP
|
||||
|
||||
|
||||
def register_tools(mcp: FastMCP) -> None:
|
||||
"""Register file read tools with the MCP server."""
|
||||
|
||||
@mcp.tool()
|
||||
def file_read(
|
||||
file_path: str,
|
||||
encoding: str = "utf-8",
|
||||
max_size: int = 10_000_000,
|
||||
) -> dict:
|
||||
"""
|
||||
Read the contents of a local file.
|
||||
|
||||
Use for reading configs, data files, source code, logs, or any text file.
|
||||
Returns file content along with path, name, size, and encoding.
|
||||
|
||||
Args:
|
||||
file_path: Path to the file to read (absolute or relative)
|
||||
encoding: File encoding (utf-8, latin-1, etc.)
|
||||
max_size: Maximum file size to read in bytes (default 10MB)
|
||||
|
||||
Returns:
|
||||
Dict with file content and metadata, or error dict
|
||||
"""
|
||||
try:
|
||||
path = Path(file_path).resolve()
|
||||
|
||||
# Check if file exists
|
||||
if not path.exists():
|
||||
return {"error": f"File not found: {file_path}"}
|
||||
|
||||
# Check if it's a file (not directory)
|
||||
if not path.is_file():
|
||||
return {"error": f"Not a file: {file_path}"}
|
||||
|
||||
# Check file size
|
||||
file_size = path.stat().st_size
|
||||
if max_size > 0 and file_size > max_size:
|
||||
return {
|
||||
"error": f"File too large: {file_size} bytes (max: {max_size})",
|
||||
"file_size": file_size,
|
||||
}
|
||||
|
||||
# Read the file
|
||||
content = path.read_text(encoding=encoding)
|
||||
|
||||
return {
|
||||
"path": str(path),
|
||||
"name": path.name,
|
||||
"content": content,
|
||||
"size": len(content),
|
||||
"encoding": encoding,
|
||||
}
|
||||
|
||||
except UnicodeDecodeError as e:
|
||||
return {
|
||||
"error": f"Failed to decode file with encoding '{encoding}': {str(e)}",
|
||||
"suggestion": "Try a different encoding like 'latin-1' or 'cp1252'",
|
||||
}
|
||||
except PermissionError:
|
||||
return {"error": f"Permission denied: {file_path}"}
|
||||
except Exception as e:
|
||||
return {"error": f"Failed to read file: {str(e)}"}
|
||||
@@ -1,6 +1,7 @@
|
||||
import os
|
||||
|
||||
WORKSPACES_DIR = os.path.abspath(os.path.join(os.getcwd(), "workdir/workspaces"))
|
||||
# Use user home directory for workspaces
|
||||
WORKSPACES_DIR = os.path.expanduser("~/.hive/workdir/workspaces")
|
||||
|
||||
def get_secure_path(path: str, workspace_id: str, agent_id: str, session_id: str) -> str:
|
||||
"""Resolve and verify a path within a 3-layer sandbox (workspace/agent/session)."""
|
||||
|
||||
@@ -1,29 +0,0 @@
|
||||
# File Write Tool
|
||||
|
||||
Write content to local files with encoding support.
|
||||
|
||||
## Description
|
||||
|
||||
Can create new files or overwrite/append to existing ones. Use for saving data, creating configs, writing reports, or exporting results. Optionally creates parent directories if they don't exist.
|
||||
|
||||
## Arguments
|
||||
|
||||
| Argument | Type | Required | Default | Description |
|
||||
|----------|------|----------|---------|-------------|
|
||||
| `file_path` | str | Yes | - | Path to the file to write (absolute or relative) |
|
||||
| `content` | str | Yes | - | Content to write to the file |
|
||||
| `encoding` | str | No | `utf-8` | File encoding (utf-8, latin-1, etc.) |
|
||||
| `mode` | str | No | `write` | Write mode - 'write' (overwrite) or 'append' |
|
||||
| `create_dirs` | bool | No | `True` | Create parent directories if they don't exist |
|
||||
|
||||
## Environment Variables
|
||||
|
||||
This tool does not require any environment variables.
|
||||
|
||||
## Error Handling
|
||||
|
||||
Returns error dicts for common issues:
|
||||
- `Parent directory does not exist: <path>` - Parent dir missing and create_dirs=False
|
||||
- `Invalid mode: <mode>. Use 'write' or 'append'.` - Invalid mode specified
|
||||
- `Permission denied: <path>` - No write access to file/directory
|
||||
- `OS error writing file: <error>` - Filesystem error
|
||||
@@ -1,4 +0,0 @@
|
||||
"""File Write Tool - Create or update local files."""
|
||||
from .file_write_tool import register_tools
|
||||
|
||||
__all__ = ["register_tools"]
|
||||
@@ -1,83 +0,0 @@
|
||||
"""
|
||||
File Write Tool - Create or update local files.
|
||||
|
||||
Supports writing text files with various encodings.
|
||||
Can create directories if they don't exist.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
from fastmcp import FastMCP
|
||||
|
||||
|
||||
def register_tools(mcp: FastMCP) -> None:
|
||||
"""Register file write tools with the MCP server."""
|
||||
|
||||
@mcp.tool()
|
||||
def file_write(
|
||||
file_path: str,
|
||||
content: str,
|
||||
encoding: str = "utf-8",
|
||||
mode: str = "write",
|
||||
create_dirs: bool = True,
|
||||
) -> dict:
|
||||
"""
|
||||
Write content to a local file.
|
||||
|
||||
Can create new files or overwrite/append to existing ones.
|
||||
Use for saving data, creating configs, writing reports, or exporting results.
|
||||
|
||||
Args:
|
||||
file_path: Path to the file to write (absolute or relative)
|
||||
content: Content to write to the file
|
||||
encoding: File encoding (utf-8, latin-1, etc.)
|
||||
mode: Write mode - 'write' (overwrite) or 'append'
|
||||
create_dirs: Create parent directories if they don't exist
|
||||
|
||||
Returns:
|
||||
Dict with write result or error dict
|
||||
"""
|
||||
try:
|
||||
path = Path(file_path).resolve()
|
||||
|
||||
# Create parent directories if requested
|
||||
if create_dirs:
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
elif not path.parent.exists():
|
||||
return {"error": f"Parent directory does not exist: {path.parent}"}
|
||||
|
||||
# Determine write mode
|
||||
if mode == "append":
|
||||
write_mode = "a"
|
||||
elif mode == "write":
|
||||
write_mode = "w"
|
||||
else:
|
||||
return {"error": f"Invalid mode: {mode}. Use 'write' or 'append'."}
|
||||
|
||||
# Check if we're overwriting
|
||||
existed = path.exists()
|
||||
previous_size = path.stat().st_size if existed else 0
|
||||
|
||||
# Write the file
|
||||
with open(path, write_mode, encoding=encoding) as f:
|
||||
f.write(content)
|
||||
|
||||
new_size = path.stat().st_size
|
||||
|
||||
return {
|
||||
"path": str(path),
|
||||
"name": path.name,
|
||||
"bytes_written": len(content.encode(encoding)),
|
||||
"total_size": new_size,
|
||||
"mode": mode,
|
||||
"created": not existed,
|
||||
"previous_size": previous_size if existed else None,
|
||||
}
|
||||
|
||||
except PermissionError:
|
||||
return {"error": f"Permission denied: {file_path}"}
|
||||
except OSError as e:
|
||||
return {"error": f"OS error writing file: {str(e)}"}
|
||||
except Exception as e:
|
||||
return {"error": f"Failed to write file: {str(e)}"}
|
||||
@@ -1,96 +0,0 @@
|
||||
"""Tests for file_read tool (FastMCP)."""
|
||||
import pytest
|
||||
from pathlib import Path
|
||||
|
||||
from fastmcp import FastMCP
|
||||
from aden_tools.tools.file_read_tool import register_tools
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def file_read_fn(mcp: FastMCP):
|
||||
"""Register and return the file_read tool function."""
|
||||
register_tools(mcp)
|
||||
# Access the registered tool's function directly
|
||||
return mcp._tool_manager._tools["file_read"].fn
|
||||
|
||||
|
||||
class TestFileReadTool:
|
||||
"""Tests for file_read tool."""
|
||||
|
||||
def test_read_existing_file(self, file_read_fn, sample_text_file: Path):
|
||||
"""Reading an existing file returns content and metadata."""
|
||||
result = file_read_fn(file_path=str(sample_text_file))
|
||||
|
||||
assert "error" not in result
|
||||
assert result["content"] == "Hello, World!\nLine 2\nLine 3"
|
||||
assert result["name"] == "test.txt"
|
||||
assert result["encoding"] == "utf-8"
|
||||
assert "size" in result
|
||||
|
||||
def test_read_file_not_found(self, file_read_fn, tmp_path: Path):
|
||||
"""Reading a non-existent file returns an error dict."""
|
||||
missing_file = tmp_path / "does_not_exist.txt"
|
||||
|
||||
result = file_read_fn(file_path=str(missing_file))
|
||||
|
||||
assert "error" in result
|
||||
assert "not found" in result["error"].lower()
|
||||
|
||||
def test_read_directory_returns_error(self, file_read_fn, tmp_path: Path):
|
||||
"""Reading a directory (not a file) returns an error."""
|
||||
result = file_read_fn(file_path=str(tmp_path))
|
||||
|
||||
assert "error" in result
|
||||
assert "not a file" in result["error"].lower()
|
||||
|
||||
def test_read_file_too_large(self, file_read_fn, tmp_path: Path):
|
||||
"""Reading a file exceeding max_size returns an error."""
|
||||
large_file = tmp_path / "large.txt"
|
||||
large_file.write_text("x" * 1000)
|
||||
|
||||
result = file_read_fn(file_path=str(large_file), max_size=100)
|
||||
|
||||
assert "error" in result
|
||||
assert "too large" in result["error"].lower()
|
||||
assert "file_size" in result
|
||||
|
||||
def test_read_with_no_size_limit(self, file_read_fn, tmp_path: Path):
|
||||
"""Reading with max_size=0 allows any file size."""
|
||||
large_file = tmp_path / "large.txt"
|
||||
content = "x" * 100_000
|
||||
large_file.write_text(content)
|
||||
|
||||
# max_size=0 means no limit in the implementation
|
||||
result = file_read_fn(file_path=str(large_file), max_size=0)
|
||||
|
||||
assert "error" not in result
|
||||
assert result["content"] == content
|
||||
|
||||
def test_read_with_different_encoding(self, file_read_fn, tmp_path: Path):
|
||||
"""Reading with a specific encoding works."""
|
||||
latin_file = tmp_path / "latin.txt"
|
||||
# Write bytes directly with latin-1 encoding
|
||||
latin_file.write_bytes("café".encode("latin-1"))
|
||||
|
||||
result = file_read_fn(file_path=str(latin_file), encoding="latin-1")
|
||||
|
||||
assert "error" not in result
|
||||
assert result["content"] == "café"
|
||||
assert result["encoding"] == "latin-1"
|
||||
|
||||
def test_read_with_wrong_encoding_returns_error(self, file_read_fn, tmp_path: Path):
|
||||
"""Reading with wrong encoding returns helpful error."""
|
||||
# Create a file with bytes that aren't valid UTF-8
|
||||
binary_file = tmp_path / "binary.txt"
|
||||
binary_file.write_bytes(b"\xff\xfe")
|
||||
|
||||
result = file_read_fn(file_path=str(binary_file), encoding="utf-8")
|
||||
|
||||
assert "error" in result
|
||||
assert "suggestion" in result
|
||||
|
||||
def test_returns_absolute_path(self, file_read_fn, sample_text_file: Path):
|
||||
"""Result includes the absolute path."""
|
||||
result = file_read_fn(file_path=str(sample_text_file))
|
||||
|
||||
assert result["path"] == str(sample_text_file.resolve())
|
||||
@@ -1,99 +0,0 @@
|
||||
"""Tests for file_write tool (FastMCP)."""
|
||||
import pytest
|
||||
from pathlib import Path
|
||||
|
||||
from fastmcp import FastMCP
|
||||
from aden_tools.tools.file_write_tool import register_tools
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def file_write_fn(mcp: FastMCP):
|
||||
"""Register and return the file_write tool function."""
|
||||
register_tools(mcp)
|
||||
return mcp._tool_manager._tools["file_write"].fn
|
||||
|
||||
|
||||
class TestFileWriteTool:
|
||||
"""Tests for file_write tool."""
|
||||
|
||||
def test_write_creates_new_file(self, file_write_fn, tmp_path: Path):
|
||||
"""Writing to a new file creates it with content."""
|
||||
new_file = tmp_path / "new.txt"
|
||||
|
||||
result = file_write_fn(file_path=str(new_file), content="Hello, World!")
|
||||
|
||||
assert "error" not in result
|
||||
assert result["created"] is True
|
||||
assert result["name"] == "new.txt"
|
||||
assert new_file.read_text() == "Hello, World!"
|
||||
|
||||
def test_write_overwrites_existing(self, file_write_fn, tmp_path: Path):
|
||||
"""Writing to existing file overwrites by default."""
|
||||
existing = tmp_path / "existing.txt"
|
||||
existing.write_text("old content")
|
||||
|
||||
result = file_write_fn(file_path=str(existing), content="new content")
|
||||
|
||||
assert "error" not in result
|
||||
assert result["created"] is False
|
||||
assert result["previous_size"] is not None
|
||||
assert existing.read_text() == "new content"
|
||||
|
||||
def test_write_appends_to_existing(self, file_write_fn, tmp_path: Path):
|
||||
"""Writing with mode='append' adds to existing content."""
|
||||
existing = tmp_path / "existing.txt"
|
||||
existing.write_text("line1\n")
|
||||
|
||||
result = file_write_fn(file_path=str(existing), content="line2\n", mode="append")
|
||||
|
||||
assert "error" not in result
|
||||
assert result["mode"] == "append"
|
||||
assert existing.read_text() == "line1\nline2\n"
|
||||
|
||||
def test_write_creates_parent_dirs(self, file_write_fn, tmp_path: Path):
|
||||
"""Writing with create_dirs=True creates missing directories."""
|
||||
deep_path = tmp_path / "nested" / "dirs" / "file.txt"
|
||||
|
||||
result = file_write_fn(file_path=str(deep_path), content="content", create_dirs=True)
|
||||
|
||||
assert "error" not in result
|
||||
assert deep_path.exists()
|
||||
assert deep_path.read_text() == "content"
|
||||
|
||||
def test_write_fails_without_parent_dir(self, file_write_fn, tmp_path: Path):
|
||||
"""Writing with create_dirs=False fails if parent doesn't exist."""
|
||||
missing_dir = tmp_path / "missing" / "file.txt"
|
||||
|
||||
result = file_write_fn(file_path=str(missing_dir), content="content", create_dirs=False)
|
||||
|
||||
assert "error" in result
|
||||
assert "parent directory" in result["error"].lower()
|
||||
|
||||
def test_write_invalid_mode(self, file_write_fn, tmp_path: Path):
|
||||
"""Writing with invalid mode returns error."""
|
||||
result = file_write_fn(
|
||||
file_path=str(tmp_path / "test.txt"),
|
||||
content="content",
|
||||
mode="invalid"
|
||||
)
|
||||
|
||||
assert "error" in result
|
||||
assert "invalid mode" in result["error"].lower()
|
||||
|
||||
def test_write_returns_bytes_written(self, file_write_fn, tmp_path: Path):
|
||||
"""Result includes accurate bytes_written count."""
|
||||
content = "Hello, World!"
|
||||
|
||||
result = file_write_fn(file_path=str(tmp_path / "test.txt"), content=content)
|
||||
|
||||
assert result["bytes_written"] == len(content.encode("utf-8"))
|
||||
|
||||
def test_write_with_encoding(self, file_write_fn, tmp_path: Path):
|
||||
"""Writing with specific encoding works."""
|
||||
file_path = tmp_path / "latin.txt"
|
||||
|
||||
result = file_write_fn(file_path=str(file_path), content="café", encoding="latin-1")
|
||||
|
||||
assert "error" not in result
|
||||
# Verify it was written with latin-1 encoding
|
||||
assert file_path.read_bytes() == "café".encode("latin-1")
|
||||
Reference in New Issue
Block a user