refactor: reorganized file tools

This commit is contained in:
Richard Tang
2026-02-27 17:52:21 -08:00
parent 2fa8f4283c
commit 14182c45fc
10 changed files with 736 additions and 1535 deletions
-23
View File
@@ -467,12 +467,6 @@ class EventLoopNode(NodeProtocol):
if ctx.is_subagent_mode and ctx.report_callback is not None:
tools.append(self._build_report_to_parent_tool())
# Add built-in file tools when spillover is configured
if self._config.spillover_dir:
from framework.graph.file_tools import build_file_tools
tools.extend(build_file_tools())
logger.info(
"[%s] Tools available (%d): %s | client_facing=%s | judge=%s",
node_id,
@@ -1525,8 +1519,6 @@ class EventLoopNode(NodeProtocol):
# Phase 1: triage — handle framework tools immediately,
# queue real tools and subagents for parallel execution.
from framework.graph.file_tools import execute_file_tool, is_file_tool
results_by_id: dict[str, ToolResult] = {}
timing_by_id: dict[str, dict[str, Any]] = {} # tool_use_id -> {start_timestamp, duration_s}
pending_real: list[ToolCallEvent] = []
@@ -1682,21 +1674,6 @@ class EventLoopNode(NodeProtocol):
)
results_by_id[tc.tool_use_id] = result
elif is_file_tool(tc.tool_name):
# --- Built-in file tool: execute inline, log as real work ---
_tc_start = time.time()
_tc_ts = datetime.now(timezone.utc).isoformat()
result = execute_file_tool(
tc.tool_name,
tc.tool_input,
tool_use_id=tc.tool_use_id,
)
timing_by_id[tc.tool_use_id] = {
"start_timestamp": _tc_ts,
"duration_s": round(time.time() - _tc_start, 3),
}
results_by_id[tc.tool_use_id] = self._truncate_tool_result(result, tc.tool_name)
else:
# --- Real tool: check for truncated args, else queue ---
if "_raw" in tc.tool_input:
-745
View File
@@ -1,745 +0,0 @@
"""Built-in file and command tools for EventLoopNode.
Provides 6 tools (read_file, write_file, edit_file, list_directory,
search_files, run_command) that are always available when spillover is
configured. Adapted from tools/coder_tools_server.py with PROJECT_ROOT
scoping removed all paths are absolute.
Public API:
build_file_tools() -> list[Tool] 6 Tool schema objects
is_file_tool(name) -> bool membership check
execute_file_tool(name, inputs) dispatch + catch exceptions
"""
from __future__ import annotations
import difflib
import fnmatch
import logging
import os
import re
import subprocess
import time
from pathlib import Path
from framework.llm.provider import Tool, ToolResult
logger = logging.getLogger(__name__)
# ── Constants (matching coder_tools_server.py) ────────────────────────────
MAX_READ_LINES = 2000
MAX_LINE_LENGTH = 2000
MAX_OUTPUT_BYTES = 50 * 1024 # 50KB byte budget for read output
MAX_COMMAND_OUTPUT = 30_000 # chars before truncation
SEARCH_RESULT_LIMIT = 100
BINARY_EXTENSIONS = frozenset(
{
".zip",
".tar",
".gz",
".bz2",
".xz",
".7z",
".rar",
".exe",
".dll",
".so",
".dylib",
".bin",
".class",
".jar",
".war",
".pyc",
".pyo",
".wasm",
".png",
".jpg",
".jpeg",
".gif",
".bmp",
".ico",
".webp",
".svg",
".mp3",
".mp4",
".avi",
".mov",
".mkv",
".wav",
".flac",
".pdf",
".doc",
".docx",
".xls",
".xlsx",
".ppt",
".pptx",
".ttf",
".otf",
".woff",
".woff2",
".eot",
".o",
".a",
".lib",
".obj",
}
)
FILE_TOOL_NAMES = frozenset(
{
"read_file",
"write_file",
"edit_file",
"list_directory",
"search_files",
"run_command",
}
)
# ── Public API ────────────────────────────────────────────────────────────
def build_file_tools() -> list[Tool]:
"""Return 6 Tool schema objects for the built-in file tools."""
return [
Tool(
name="read_file",
description=(
"Read file contents with line numbers and byte-budget truncation. "
"Binary files are detected and rejected. Large files are automatically "
"truncated at 2000 lines or 50KB. Use offset and limit to paginate."
),
parameters={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Absolute file path to read.",
},
"offset": {
"type": "integer",
"description": "Starting line number, 1-indexed (default: 1).",
"default": 1,
},
"limit": {
"type": "integer",
"description": "Max lines to return, 0 = up to 2000 (default: 0).",
"default": 0,
},
},
"required": ["path"],
},
),
Tool(
name="write_file",
description=(
"Create or overwrite a file with the given content. "
"Automatically creates parent directories."
),
parameters={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Absolute file path to write.",
},
"content": {
"type": "string",
"description": "Complete file content to write.",
},
},
"required": ["path", "content"],
},
),
Tool(
name="edit_file",
description=(
"Replace text in a file using a fuzzy-match cascade. "
"Tries exact match first, then falls back through increasingly "
"fuzzy strategies: line-trimmed, block-anchor, whitespace-normalized, "
"indentation-flexible, and trimmed-boundary matching."
),
parameters={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Absolute file path to edit.",
},
"old_text": {
"type": "string",
"description": "Text to find (fuzzy matching applied if exact fails).",
},
"new_text": {
"type": "string",
"description": "Replacement text.",
},
"replace_all": {
"type": "boolean",
"description": "Replace all occurrences (default: first only).",
"default": False,
},
},
"required": ["path", "old_text", "new_text"],
},
),
Tool(
name="list_directory",
description=(
"List directory contents with type indicators. "
"Directories have a / suffix. Hidden files and common "
"build directories are skipped."
),
parameters={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Absolute directory path (default: current directory).",
"default": ".",
},
"recursive": {
"type": "boolean",
"description": (
"List recursively (default: false). "
"Truncates at 500 entries."
),
"default": False,
},
},
"required": [],
},
),
Tool(
name="search_files",
description=(
"Search file contents using regex. Uses ripgrep when available, "
"falls back to Python regex. Results sorted by file with line numbers."
),
parameters={
"type": "object",
"properties": {
"pattern": {
"type": "string",
"description": "Regex pattern to search for.",
},
"path": {
"type": "string",
"description": (
"Absolute directory path to search "
"(default: current directory)."
),
"default": ".",
},
"include": {
"type": "string",
"description": "File glob filter (e.g. '*.py').",
"default": "",
},
},
"required": ["pattern"],
},
),
Tool(
name="run_command",
description=(
"Execute a shell command. Output is truncated at 30K chars. "
"Timeout defaults to 120s, max 300s."
),
parameters={
"type": "object",
"properties": {
"command": {
"type": "string",
"description": "Shell command to execute.",
},
"cwd": {
"type": "string",
"description": (
"Working directory (absolute path). "
"Defaults to current directory."
),
"default": "",
},
"timeout": {
"type": "integer",
"description": "Timeout in seconds (default: 120, max: 300).",
"default": 120,
},
},
"required": ["command"],
},
),
]
def is_file_tool(name: str) -> bool:
"""Check if *name* is a built-in file tool."""
return name in FILE_TOOL_NAMES
def execute_file_tool(name: str, inputs: dict, tool_use_id: str = "") -> ToolResult:
"""Dispatch to the appropriate handler, catch exceptions.
Returns a ToolResult. On exception the result has ``is_error=True``.
"""
handlers = {
"read_file": _handle_read_file,
"write_file": _handle_write_file,
"edit_file": _handle_edit_file,
"list_directory": _handle_list_directory,
"search_files": _handle_search_files,
"run_command": _handle_run_command,
}
handler = handlers.get(name)
if handler is None:
return ToolResult(
tool_use_id=tool_use_id,
content=f"Unknown file tool: {name}",
is_error=True,
)
try:
content = handler(**inputs)
return ToolResult(tool_use_id=tool_use_id, content=content, is_error=False)
except Exception as e:
logger.warning("file_tool %s raised: %s", name, e, exc_info=True)
return ToolResult(
tool_use_id=tool_use_id,
content=f"Error in {name}: {e}",
is_error=True,
)
# ── Private helpers ───────────────────────────────────────────────────────
def _is_binary(filepath: str) -> bool:
"""Detect binary files by extension and content sampling."""
_, ext = os.path.splitext(filepath)
if ext.lower() in BINARY_EXTENSIONS:
return True
try:
with open(filepath, "rb") as f:
chunk = f.read(4096)
if b"\x00" in chunk:
return True
non_printable = sum(1 for b in chunk if b < 9 or (13 < b < 32) or b > 126)
return non_printable / max(len(chunk), 1) > 0.3
except OSError:
return False
def _levenshtein(a: str, b: str) -> int:
"""Standard Levenshtein distance."""
if not a:
return len(b)
if not b:
return len(a)
m, n = len(a), len(b)
dp = list(range(n + 1))
for i in range(1, m + 1):
prev = dp[0]
dp[0] = i
for j in range(1, n + 1):
temp = dp[j]
if a[i - 1] == b[j - 1]:
dp[j] = prev
else:
dp[j] = 1 + min(prev, dp[j], dp[j - 1])
prev = temp
return dp[n]
def _similarity(a: str, b: str) -> float:
maxlen = max(len(a), len(b))
if maxlen == 0:
return 1.0
return 1.0 - _levenshtein(a, b) / maxlen
def _fuzzy_find_candidates(content: str, old_text: str):
"""Yield candidate substrings from content that match old_text,
using a cascade of increasingly fuzzy strategies.
"""
# Strategy 1: Exact match
if old_text in content:
yield old_text
content_lines = content.split("\n")
search_lines = old_text.split("\n")
# Strip trailing empty line from search (common copy-paste artifact)
while search_lines and not search_lines[-1].strip():
search_lines = search_lines[:-1]
if not search_lines:
return
n_search = len(search_lines)
# Strategy 2: Line-trimmed match
for i in range(len(content_lines) - n_search + 1):
window = content_lines[i : i + n_search]
if all(cl.strip() == sl.strip() for cl, sl in zip(window, search_lines, strict=True)):
yield "\n".join(window)
# Strategy 3: Block-anchor match (first/last line as anchors, fuzzy middle)
if n_search >= 3:
first_trimmed = search_lines[0].strip()
last_trimmed = search_lines[-1].strip()
candidates = []
for i, line in enumerate(content_lines):
if line.strip() == first_trimmed:
end = i + n_search
if end <= len(content_lines) and content_lines[end - 1].strip() == last_trimmed:
block = content_lines[i:end]
middle_content = "\n".join(block[1:-1])
middle_search = "\n".join(search_lines[1:-1])
sim = _similarity(middle_content, middle_search)
candidates.append((sim, "\n".join(block)))
if candidates:
candidates.sort(key=lambda x: x[0], reverse=True)
if candidates[0][0] > 0.3:
yield candidates[0][1]
# Strategy 4: Whitespace-normalized match
normalized_search = re.sub(r"\s+", " ", old_text).strip()
for i in range(len(content_lines) - n_search + 1):
window = content_lines[i : i + n_search]
normalized_block = re.sub(r"\s+", " ", "\n".join(window)).strip()
if normalized_block == normalized_search:
yield "\n".join(window)
# Strategy 5: Indentation-flexible match
def _strip_indent(lines):
non_empty = [ln for ln in lines if ln.strip()]
if not non_empty:
return "\n".join(lines)
min_indent = min(len(ln) - len(ln.lstrip()) for ln in non_empty)
return "\n".join(ln[min_indent:] for ln in lines)
stripped_search = _strip_indent(search_lines)
for i in range(len(content_lines) - n_search + 1):
block = content_lines[i : i + n_search]
if _strip_indent(block) == stripped_search:
yield "\n".join(block)
# Strategy 6: Trimmed-boundary match
trimmed = old_text.strip()
if trimmed != old_text and trimmed in content:
yield trimmed
def _compute_diff(old: str, new: str, path: str) -> str:
"""Compute a unified diff for display."""
old_lines = old.splitlines(keepends=True)
new_lines = new.splitlines(keepends=True)
diff = difflib.unified_diff(old_lines, new_lines, fromfile=path, tofile=path, n=3)
result = "".join(diff)
if len(result) > 2000:
result = result[:2000] + "\n... (diff truncated)"
return result
# ── Handlers ──────────────────────────────────────────────────────────────
def _handle_read_file(path: str, offset: int = 1, limit: int = 0, **_kw) -> str:
"""Read file contents with line numbers and byte-budget truncation."""
resolved = str(Path(path).resolve())
if os.path.isdir(resolved):
entries = []
for entry in sorted(os.listdir(resolved)):
full = os.path.join(resolved, entry)
suffix = "/" if os.path.isdir(full) else ""
entries.append(f" {entry}{suffix}")
total = len(entries)
return f"Directory: {path} ({total} entries)\n" + "\n".join(entries[:200])
if not os.path.isfile(resolved):
return f"Error: File not found: {path}"
if _is_binary(resolved):
size = os.path.getsize(resolved)
return f"Binary file: {path} ({size:,} bytes). Cannot display binary content."
try:
with open(resolved, encoding="utf-8", errors="replace") as f:
all_lines = f.readlines()
total_lines = len(all_lines)
start_idx = max(0, offset - 1)
effective_limit = limit if limit > 0 else MAX_READ_LINES
end_idx = min(start_idx + effective_limit, total_lines)
output_lines = []
byte_count = 0
truncated_by_bytes = False
for i in range(start_idx, end_idx):
line = all_lines[i].rstrip("\n\r")
if len(line) > MAX_LINE_LENGTH:
line = line[:MAX_LINE_LENGTH] + "..."
formatted = f"{i + 1:>6}\t{line}"
line_bytes = len(formatted.encode("utf-8")) + 1
if byte_count + line_bytes > MAX_OUTPUT_BYTES:
truncated_by_bytes = True
break
output_lines.append(formatted)
byte_count += line_bytes
result = "\n".join(output_lines)
lines_shown = len(output_lines)
actual_end = start_idx + lines_shown
if actual_end < total_lines or truncated_by_bytes:
result += f"\n\n(Showing lines {start_idx + 1}-{actual_end} of {total_lines}."
if truncated_by_bytes:
result += " Truncated by byte budget."
result += f" Use offset={actual_end + 1} to continue reading.)"
return result
except Exception as e:
return f"Error reading file: {e}"
def _handle_write_file(path: str, content: str, **_kw) -> str:
"""Create or overwrite a file."""
resolved = str(Path(path).resolve())
try:
existed = os.path.isfile(resolved)
os.makedirs(os.path.dirname(resolved), exist_ok=True)
with open(resolved, "w", encoding="utf-8") as f:
f.write(content)
line_count = content.count("\n") + (1 if content and not content.endswith("\n") else 0)
action = "Updated" if existed else "Created"
return f"{action} {path} ({len(content):,} bytes, {line_count} lines)"
except Exception as e:
return f"Error writing file: {e}"
def _handle_edit_file(
path: str, old_text: str, new_text: str, replace_all: bool = False, **_kw
) -> str:
"""Replace text in a file using a fuzzy-match cascade."""
resolved = str(Path(path).resolve())
if not os.path.isfile(resolved):
return f"Error: File not found: {path}"
try:
with open(resolved, encoding="utf-8") as f:
content = f.read()
matched_text = None
strategy_used = None
strategies = [
"exact",
"line-trimmed",
"block-anchor",
"whitespace-normalized",
"indentation-flexible",
"trimmed-boundary",
]
for i, candidate in enumerate(_fuzzy_find_candidates(content, old_text)):
idx = content.find(candidate)
if idx == -1:
continue
if replace_all:
matched_text = candidate
strategy_used = strategies[min(i, len(strategies) - 1)]
break
last_idx = content.rfind(candidate)
if idx == last_idx:
matched_text = candidate
strategy_used = strategies[min(i, len(strategies) - 1)]
break
if matched_text is None:
close = difflib.get_close_matches(old_text[:200], content.split("\n"), n=3, cutoff=0.4)
msg = f"Error: Could not find a unique match for old_text in {path}."
if close:
suggestions = "\n".join(f" {line}" for line in close)
msg += f"\n\nDid you mean one of these lines?\n{suggestions}"
return msg
if replace_all:
count = content.count(matched_text)
new_content = content.replace(matched_text, new_text)
else:
count = 1
new_content = content.replace(matched_text, new_text, 1)
with open(resolved, "w", encoding="utf-8") as f:
f.write(new_content)
diff = _compute_diff(content, new_content, path)
match_info = f" (matched via {strategy_used})" if strategy_used != "exact" else ""
result = f"Replaced {count} occurrence(s) in {path}{match_info}"
if diff:
result += f"\n\n{diff}"
return result
except Exception as e:
return f"Error editing file: {e}"
def _handle_list_directory(path: str = ".", recursive: bool = False, **_kw) -> str:
"""List directory contents with type indicators."""
resolved = str(Path(path).resolve())
if not os.path.isdir(resolved):
return f"Error: Directory not found: {path}"
try:
skip = {
".git",
"__pycache__",
"node_modules",
".venv",
".tox",
".mypy_cache",
".ruff_cache",
}
entries: list[str] = []
if recursive:
for root, dirs, files in os.walk(resolved):
dirs[:] = sorted(d for d in dirs if d not in skip and not d.startswith("."))
rel_root = os.path.relpath(root, resolved)
if rel_root == ".":
rel_root = ""
for f in sorted(files):
if f.startswith("."):
continue
entries.append(os.path.join(rel_root, f) if rel_root else f)
if len(entries) >= 500:
entries.append("... (truncated at 500 entries)")
return "\n".join(entries)
else:
for entry in sorted(os.listdir(resolved)):
if entry.startswith(".") or entry in skip:
continue
full = os.path.join(resolved, entry)
suffix = "/" if os.path.isdir(full) else ""
entries.append(f"{entry}{suffix}")
return "\n".join(entries) if entries else "(empty directory)"
except Exception as e:
return f"Error listing directory: {e}"
def _handle_search_files(pattern: str, path: str = ".", include: str = "", **_kw) -> str:
"""Search file contents using regex. Ripgrep with Python fallback."""
resolved = str(Path(path).resolve())
if not os.path.isdir(resolved):
return f"Error: Directory not found: {path}"
# Try ripgrep first
try:
cmd = [
"rg",
"-nH",
"--no-messages",
"--hidden",
"--max-count=20",
"--glob=!.git/*",
pattern,
]
if include:
cmd.extend(["--glob", include])
cmd.append(resolved)
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode <= 1:
output = result.stdout.strip()
if not output:
return "No matches found."
lines = []
for line in output.split("\n")[:SEARCH_RESULT_LIMIT]:
if len(line) > MAX_LINE_LENGTH:
line = line[:MAX_LINE_LENGTH] + "..."
lines.append(line)
total = output.count("\n") + 1
result_str = "\n".join(lines)
if total > SEARCH_RESULT_LIMIT:
result_str += (
f"\n\n... ({total} total matches, showing first {SEARCH_RESULT_LIMIT})"
)
return result_str
except FileNotFoundError:
pass # ripgrep not installed — fall through to Python
except subprocess.TimeoutExpired:
return "Error: Search timed out after 30 seconds"
# Fallback: Python regex
try:
compiled = re.compile(pattern)
matches: list[str] = []
skip_dirs = {".git", "__pycache__", "node_modules", ".venv", ".tox"}
for root, dirs, files in os.walk(resolved):
dirs[:] = [d for d in dirs if d not in skip_dirs]
for fname in files:
if include and not fnmatch.fnmatch(fname, include):
continue
fpath = os.path.join(root, fname)
try:
with open(fpath, encoding="utf-8", errors="ignore") as f:
for i, line in enumerate(f, 1):
if compiled.search(line):
matches.append(f"{fpath}:{i}:{line.rstrip()[:MAX_LINE_LENGTH]}")
if len(matches) >= SEARCH_RESULT_LIMIT:
return "\n".join(matches) + "\n... (truncated)"
except (OSError, UnicodeDecodeError):
continue
return "\n".join(matches) if matches else "No matches found."
except re.error as e:
return f"Error: Invalid regex: {e}"
def _handle_run_command(command: str, cwd: str = "", timeout: int = 120, **_kw) -> str:
"""Execute a shell command."""
timeout = min(timeout, 300)
work_dir = cwd if cwd else None
try:
start = time.monotonic()
result = subprocess.run(
command,
shell=True,
cwd=work_dir,
capture_output=True,
text=True,
timeout=timeout,
)
elapsed = time.monotonic() - start
parts = []
if result.stdout:
parts.append(result.stdout)
if result.stderr:
parts.append(f"[stderr]\n{result.stderr}")
output = "\n".join(parts)
if len(output) > MAX_COMMAND_OUTPUT:
output = (
output[:MAX_COMMAND_OUTPUT]
+ f"\n\n... (output truncated at {MAX_COMMAND_OUTPUT:,} chars)"
)
code = result.returncode
output += f"\n\n[exit code: {code}, {elapsed:.1f}s]"
return output
except subprocess.TimeoutExpired:
return (
f"Error: Command timed out after {timeout}s. "
"Consider breaking it into smaller operations."
)
except Exception as e:
return f"Error executing command: {e}"
+23
View File
@@ -0,0 +1,23 @@
"""File tools MCP server constants.
Analogous to ``gcu.py`` defines the server name and default stdio config
so the runner can auto-register the files MCP server for any agent that has
``event_loop`` or ``gcu`` nodes.
"""
# ---------------------------------------------------------------------------
# MCP server identity
# ---------------------------------------------------------------------------
FILES_MCP_SERVER_NAME = "files-tools"
"""Name used to identify the file tools MCP server in ``mcp_servers.json``."""
FILES_MCP_SERVER_CONFIG: dict = {
"name": FILES_MCP_SERVER_NAME,
"transport": "stdio",
"command": "uv",
"args": ["run", "python", "files_server.py", "--stdio"],
"cwd": "../../tools",
"description": "File tools for reading, writing, editing, and searching files",
}
"""Default stdio config for the file tools MCP server (relative to exports/<agent>/)."""
+26
View File
@@ -1168,6 +1168,32 @@ class AgentRunner:
if tool_name not in existing:
node.tools.append(tool_name)
# For event_loop/gcu nodes: auto-register file tools MCP server, then expand tool lists
has_loop_nodes = any(
node.node_type in ("event_loop", "gcu") for node in self.graph.nodes
)
if has_loop_nodes:
from framework.graph.files import FILES_MCP_SERVER_CONFIG, FILES_MCP_SERVER_NAME
files_tool_names = self._tool_registry.get_server_tool_names(FILES_MCP_SERVER_NAME)
if not files_tool_names:
files_config = dict(FILES_MCP_SERVER_CONFIG)
cwd = files_config.get("cwd")
if cwd and not Path(cwd).is_absolute():
files_config["cwd"] = str((self.agent_path / cwd).resolve())
self._tool_registry.register_mcp_server(files_config)
files_tool_names = self._tool_registry.get_server_tool_names(
FILES_MCP_SERVER_NAME
)
if files_tool_names:
for node in self.graph.nodes:
if node.node_type in ("event_loop", "gcu"):
existing = set(node.tools)
for tool_name in sorted(files_tool_names):
if tool_name not in existing:
node.tools.append(tool_name)
# Get tools for runtime
tools = list(self._tool_registry.get_tools().values())
tool_executor = self._tool_registry.get_executor()
+8 -2
View File
@@ -498,13 +498,19 @@ class SessionManager:
len(queen_tools),
[t.name for t in queen_tools],
)
await executor.execute(
result = await executor.execute(
graph=queen_graph,
goal=queen_goal,
input_data={"greeting": initial_prompt or "Session started."},
session_state={"resume_session_id": session.id},
)
logger.warning("Queen executor returned (should be forever-alive)")
if result.success:
logger.warning("Queen executor returned (should be forever-alive)")
else:
logger.error(
"Queen executor failed: %s",
result.error or "(no error message)",
)
except Exception:
logger.error("Queen conversation crashed", exc_info=True)
finally:
-211
View File
@@ -1,211 +0,0 @@
"""Smoke tests for built-in file tools (framework.graph.file_tools)."""
from framework.graph.file_tools import (
FILE_TOOL_NAMES,
build_file_tools,
execute_file_tool,
is_file_tool,
)
class TestBuildFileTools:
def test_returns_six_tools(self):
tools = build_file_tools()
assert len(tools) == 6
def test_tool_names_match(self):
tools = build_file_tools()
names = {t.name for t in tools}
assert names == FILE_TOOL_NAMES
def test_all_tools_have_descriptions(self):
for tool in build_file_tools():
assert tool.description, f"{tool.name} missing description"
def test_all_tools_have_parameters(self):
for tool in build_file_tools():
assert tool.parameters, f"{tool.name} missing parameters"
assert tool.parameters.get("type") == "object"
class TestIsFileTool:
def test_known_tools(self):
for name in FILE_TOOL_NAMES:
assert is_file_tool(name)
def test_unknown_tool(self):
assert not is_file_tool("web_search")
assert not is_file_tool("set_output")
assert not is_file_tool("load_data")
class TestReadFile:
def test_read_temp_file(self, tmp_path):
f = tmp_path / "test.txt"
f.write_text("line1\nline2\nline3\n")
result = execute_file_tool("read_file", {"path": str(f)})
assert not result.is_error
assert "line1" in result.content
assert "line2" in result.content
assert "line3" in result.content
def test_read_with_offset(self, tmp_path):
f = tmp_path / "test.txt"
f.write_text("a\nb\nc\nd\ne\n")
result = execute_file_tool("read_file", {"path": str(f), "offset": 3, "limit": 2})
assert not result.is_error
assert "c" in result.content
assert "d" in result.content
def test_read_missing_file(self):
result = execute_file_tool("read_file", {"path": "/tmp/nonexistent_file_abc123.txt"})
assert not result.is_error # returns error text, not is_error flag
assert "not found" in result.content.lower() or "error" in result.content.lower()
def test_read_directory_lists_entries(self, tmp_path):
(tmp_path / "a.txt").write_text("hello")
(tmp_path / "b.txt").write_text("world")
result = execute_file_tool("read_file", {"path": str(tmp_path)})
assert not result.is_error
assert "a.txt" in result.content
assert "b.txt" in result.content
class TestWriteFile:
def test_write_new_file(self, tmp_path):
f = tmp_path / "new.txt"
result = execute_file_tool("write_file", {"path": str(f), "content": "hello world"})
assert not result.is_error
assert "Created" in result.content
assert f.read_text() == "hello world"
def test_overwrite_existing(self, tmp_path):
f = tmp_path / "existing.txt"
f.write_text("old")
result = execute_file_tool("write_file", {"path": str(f), "content": "new"})
assert not result.is_error
assert "Updated" in result.content
assert f.read_text() == "new"
def test_creates_parent_dirs(self, tmp_path):
f = tmp_path / "sub" / "dir" / "file.txt"
result = execute_file_tool("write_file", {"path": str(f), "content": "deep"})
assert not result.is_error
assert f.read_text() == "deep"
class TestEditFile:
def test_exact_match(self, tmp_path):
f = tmp_path / "edit.txt"
f.write_text("hello world\nfoo bar\n")
result = execute_file_tool(
"edit_file",
{"path": str(f), "old_text": "foo bar", "new_text": "baz qux"},
)
assert not result.is_error
assert "Replaced 1" in result.content
assert "baz qux" in f.read_text()
def test_fuzzy_whitespace_match(self, tmp_path):
f = tmp_path / "edit2.txt"
f.write_text(" hello world \n")
result = execute_file_tool(
"edit_file",
{"path": str(f), "old_text": "hello world", "new_text": "goodbye"},
)
assert not result.is_error
# Should match via fuzzy strategies
def test_no_match_returns_error(self, tmp_path):
f = tmp_path / "edit3.txt"
f.write_text("hello world\n")
result = execute_file_tool(
"edit_file",
{"path": str(f), "old_text": "xyz not present", "new_text": "replacement"},
)
assert not result.is_error # error in content, not flag
assert "could not find" in result.content.lower()
class TestListDirectory:
def test_list_basic(self, tmp_path):
(tmp_path / "file1.txt").write_text("a")
(tmp_path / "file2.py").write_text("b")
(tmp_path / "subdir").mkdir()
result = execute_file_tool("list_directory", {"path": str(tmp_path)})
assert not result.is_error
assert "file1.txt" in result.content
assert "file2.py" in result.content
assert "subdir/" in result.content
def test_list_recursive(self, tmp_path):
sub = tmp_path / "sub"
sub.mkdir()
(sub / "deep.txt").write_text("x")
result = execute_file_tool("list_directory", {"path": str(tmp_path), "recursive": True})
assert not result.is_error
assert "deep.txt" in result.content
def test_list_missing_dir(self):
result = execute_file_tool("list_directory", {"path": "/tmp/nonexistent_dir_abc123"})
assert not result.is_error
assert "not found" in result.content.lower()
class TestSearchFiles:
def test_search_basic(self, tmp_path):
(tmp_path / "a.py").write_text("def hello():\n pass\n")
(tmp_path / "b.py").write_text("def world():\n pass\n")
result = execute_file_tool("search_files", {"pattern": "def hello", "path": str(tmp_path)})
assert not result.is_error
assert "hello" in result.content
def test_search_with_include(self, tmp_path):
(tmp_path / "a.py").write_text("target line\n")
(tmp_path / "b.txt").write_text("target line\n")
result = execute_file_tool(
"search_files",
{"pattern": "target", "path": str(tmp_path), "include": "*.py"},
)
assert not result.is_error
assert "a.py" in result.content
def test_search_no_matches(self, tmp_path):
(tmp_path / "a.txt").write_text("nothing here\n")
result = execute_file_tool(
"search_files", {"pattern": "zzz_nonexistent", "path": str(tmp_path)}
)
assert not result.is_error
assert "no matches" in result.content.lower()
class TestRunCommand:
def test_basic_command(self):
result = execute_file_tool("run_command", {"command": "echo hello"})
assert not result.is_error
assert "hello" in result.content
assert "exit code: 0" in result.content
def test_command_with_cwd(self, tmp_path):
result = execute_file_tool("run_command", {"command": "pwd", "cwd": str(tmp_path)})
assert not result.is_error
assert str(tmp_path) in result.content
def test_command_failure(self):
result = execute_file_tool("run_command", {"command": "exit 1"})
assert not result.is_error # error in content, not flag
assert "exit code: 1" in result.content
def test_command_timeout(self):
result = execute_file_tool("run_command", {"command": "sleep 10", "timeout": 1})
assert not result.is_error
assert "timed out" in result.content.lower()
class TestExecuteUnknownTool:
def test_unknown_tool(self):
result = execute_file_tool("nonexistent_tool", {})
assert result.is_error
assert "Unknown" in result.content
+22 -554
View File
@@ -13,8 +13,6 @@ Usage:
"""
import argparse
import difflib
import fnmatch
import json
import logging
import os
@@ -26,64 +24,6 @@ from pathlib import Path
logger = logging.getLogger(__name__)
# ── Constants (inspired by opencode) ──────────────────────────────────────
MAX_READ_LINES = 2000
MAX_LINE_LENGTH = 2000
MAX_OUTPUT_BYTES = 50 * 1024 # 50KB byte budget for read output
MAX_COMMAND_OUTPUT = 30_000 # chars before truncation
SEARCH_RESULT_LIMIT = 100
BINARY_EXTENSIONS = frozenset(
{
".zip",
".tar",
".gz",
".bz2",
".xz",
".7z",
".rar",
".exe",
".dll",
".so",
".dylib",
".bin",
".class",
".jar",
".war",
".pyc",
".pyo",
".wasm",
".png",
".jpg",
".jpeg",
".gif",
".bmp",
".ico",
".webp",
".svg",
".mp3",
".mp4",
".avi",
".mov",
".mkv",
".wav",
".flac",
".pdf",
".doc",
".docx",
".xls",
".xlsx",
".ppt",
".pptx",
".sqlite",
".db",
".o",
".a",
".lib",
}
)
def setup_logger():
if not logger.handlers:
@@ -144,139 +84,6 @@ def _resolve_path(path: str) -> str:
return resolved
def _is_binary(filepath: str) -> bool:
"""Detect binary files by extension and content sampling."""
_, ext = os.path.splitext(filepath)
if ext.lower() in BINARY_EXTENSIONS:
return True
try:
with open(filepath, "rb") as f:
chunk = f.read(4096)
if b"\x00" in chunk:
return True
non_printable = sum(1 for b in chunk if b < 9 or (13 < b < 32) or b > 126)
return non_printable / max(len(chunk), 1) > 0.3
except OSError:
return False
# ── Fuzzy edit strategies (ported from opencode's 9-strategy cascade) ─────
def _levenshtein(a: str, b: str) -> int:
"""Standard Levenshtein distance."""
if not a:
return len(b)
if not b:
return len(a)
m, n = len(a), len(b)
dp = list(range(n + 1))
for i in range(1, m + 1):
prev = dp[0]
dp[0] = i
for j in range(1, n + 1):
temp = dp[j]
if a[i - 1] == b[j - 1]:
dp[j] = prev
else:
dp[j] = 1 + min(prev, dp[j], dp[j - 1])
prev = temp
return dp[n]
def _similarity(a: str, b: str) -> float:
maxlen = max(len(a), len(b))
if maxlen == 0:
return 1.0
return 1.0 - _levenshtein(a, b) / maxlen
def _fuzzy_find_candidates(content: str, old_text: str):
"""
Yield candidate substrings from content that match old_text,
using a cascade of increasingly fuzzy strategies.
Ported from opencode's edit.ts replace() cascade.
"""
# Strategy 1: Exact match
if old_text in content:
yield old_text
content_lines = content.split("\n")
search_lines = old_text.split("\n")
# Strip trailing empty line from search (common copy-paste artifact)
while search_lines and not search_lines[-1].strip():
search_lines = search_lines[:-1]
if not search_lines:
return
n_search = len(search_lines)
# Strategy 2: Line-trimmed match
# Each line trimmed; yields original content substring preserving indentation
for i in range(len(content_lines) - n_search + 1):
window = content_lines[i : i + n_search]
if all(cl.strip() == sl.strip() for cl, sl in zip(window, search_lines, strict=True)):
yield "\n".join(window)
# Strategy 3: Block-anchor match (first/last line as anchors, fuzzy middle)
if n_search >= 3:
first_trimmed = search_lines[0].strip()
last_trimmed = search_lines[-1].strip()
candidates = []
for i, line in enumerate(content_lines):
if line.strip() == first_trimmed:
end = i + n_search
if end <= len(content_lines) and content_lines[end - 1].strip() == last_trimmed:
block = content_lines[i:end]
# Score middle lines
middle_content = "\n".join(block[1:-1])
middle_search = "\n".join(search_lines[1:-1])
sim = _similarity(middle_content, middle_search)
candidates.append((sim, "\n".join(block)))
if candidates:
candidates.sort(key=lambda x: x[0], reverse=True)
if candidates[0][0] > 0.3:
yield candidates[0][1]
# Strategy 4: Whitespace-normalized match
normalized_search = re.sub(r"\s+", " ", old_text).strip()
for i in range(len(content_lines) - n_search + 1):
window = content_lines[i : i + n_search]
normalized_block = re.sub(r"\s+", " ", "\n".join(window)).strip()
if normalized_block == normalized_search:
yield "\n".join(window)
# Strategy 5: Indentation-flexible match
def _strip_indent(lines):
non_empty = [ln for ln in lines if ln.strip()]
if not non_empty:
return "\n".join(lines)
min_indent = min(len(ln) - len(ln.lstrip()) for ln in non_empty)
return "\n".join(ln[min_indent:] for ln in lines)
stripped_search = _strip_indent(search_lines)
for i in range(len(content_lines) - n_search + 1):
block = content_lines[i : i + n_search]
if _strip_indent(block) == stripped_search:
yield "\n".join(block)
# Strategy 6: Trimmed-boundary match
trimmed = old_text.strip()
if trimmed != old_text and trimmed in content:
yield trimmed
def _compute_diff(old: str, new: str, path: str) -> str:
"""Compute a unified diff for display."""
old_lines = old.splitlines(keepends=True)
new_lines = new.splitlines(keepends=True)
diff = difflib.unified_diff(old_lines, new_lines, fromfile=path, tofile=path, n=3)
result = "".join(diff)
if len(result) > 2000:
result = result[:2000] + "\n... (diff truncated)"
return result
# ── Git snapshot system (ported from opencode's shadow git) ───────────────
@@ -301,357 +108,22 @@ def _ensure_snapshot_repo():
_snapshot_git("config", "core.autocrlf", "false")
# ── Tool: read_file ──────────────────────────────────────────────────────
@mcp.tool()
def read_file(path: str, offset: int = 1, limit: int = 0) -> str:
"""Read file contents with line numbers and byte-budget truncation.
Returns numbered lines. Binary files are detected and rejected.
Large files are automatically truncated at 2000 lines or 50KB.
Args:
path: File path (relative to project root or absolute within project)
offset: Starting line number, 1-indexed (default: 1)
limit: Max lines to return, 0 = up to 2000 (default: 0)
Returns:
File contents with line numbers, or error message
"""
resolved = _resolve_path(path)
if os.path.isdir(resolved):
# List directory contents instead
entries = []
for entry in sorted(os.listdir(resolved)):
full = os.path.join(resolved, entry)
suffix = "/" if os.path.isdir(full) else ""
entries.append(f" {entry}{suffix}")
total = len(entries)
return f"Directory: {path} ({total} entries)\n" + "\n".join(entries[:200])
if not os.path.isfile(resolved):
return f"Error: File not found: {path}"
if _is_binary(resolved):
size = os.path.getsize(resolved)
return f"Binary file: {path} ({size:,} bytes). Cannot display binary content."
def _take_snapshot() -> str:
"""Take a git snapshot and return the tree hash. Silent on failure."""
if not SNAPSHOT_DIR:
return ""
try:
with open(resolved, encoding="utf-8", errors="replace") as f:
all_lines = f.readlines()
total_lines = len(all_lines)
start_idx = max(0, offset - 1) # Convert 1-indexed to 0-indexed
effective_limit = limit if limit > 0 else MAX_READ_LINES
end_idx = min(start_idx + effective_limit, total_lines)
# Apply byte budget (like opencode)
output_lines = []
byte_count = 0
truncated_by_bytes = False
for i in range(start_idx, end_idx):
line = all_lines[i].rstrip("\n\r")
if len(line) > MAX_LINE_LENGTH:
line = line[:MAX_LINE_LENGTH] + "..."
formatted = f"{i + 1:>6}\t{line}"
line_bytes = len(formatted.encode("utf-8")) + 1 # +1 for newline
if byte_count + line_bytes > MAX_OUTPUT_BYTES:
truncated_by_bytes = True
break
output_lines.append(formatted)
byte_count += line_bytes
result = "\n".join(output_lines)
# Truncation notices
lines_shown = len(output_lines)
actual_end = start_idx + lines_shown
if actual_end < total_lines or truncated_by_bytes:
result += f"\n\n(Showing lines {start_idx + 1}-{actual_end} of {total_lines}."
if truncated_by_bytes:
result += " Truncated by byte budget."
result += f" Use offset={actual_end + 1} to continue reading.)"
return result
except Exception as e:
return f"Error reading file: {e}"
# ── Tool: write_file ─────────────────────────────────────────────────────
@mcp.tool()
def write_file(path: str, content: str) -> str:
"""Create or overwrite a file. Automatically creates parent directories.
Takes a snapshot before writing for undo capability.
Args:
path: File path relative to project root
content: Complete file content
Returns:
Success message with file stats, or error
"""
resolved = _resolve_path(path)
try:
# Snapshot before write
_take_snapshot()
existed = os.path.isfile(resolved)
os.makedirs(os.path.dirname(resolved), exist_ok=True)
with open(resolved, "w", encoding="utf-8") as f:
f.write(content)
line_count = content.count("\n") + (1 if content and not content.endswith("\n") else 0)
action = "Updated" if existed else "Created"
return f"{action} {path} ({len(content):,} bytes, {line_count} lines)"
except Exception as e:
return f"Error writing file: {e}"
# ── Tool: edit_file (fuzzy-match cascade) ─────────────────────────────────
@mcp.tool()
def edit_file(path: str, old_text: str, new_text: str, replace_all: bool = False) -> str:
"""Replace text in a file using a fuzzy-match cascade.
Tries exact match first, then falls back through increasingly fuzzy
strategies: line-trimmed, block-anchor, whitespace-normalized,
indentation-flexible, and trimmed-boundary matching.
Inspired by opencode's 9-strategy edit tool.
Args:
path: File path relative to project root
old_text: Text to find (fuzzy matching applied if exact fails)
new_text: Replacement text
replace_all: Replace all occurrences (default: first only)
Returns:
Success message with diff preview, or error with suggestions
"""
resolved = _resolve_path(path)
if not os.path.isfile(resolved):
return f"Error: File not found: {path}"
try:
with open(resolved, encoding="utf-8") as f:
content = f.read()
# Snapshot before edit
_take_snapshot()
# Try fuzzy cascade
matched_text = None
strategy_used = None
strategies = [
"exact",
"line-trimmed",
"block-anchor",
"whitespace-normalized",
"indentation-flexible",
"trimmed-boundary",
]
for i, candidate in enumerate(_fuzzy_find_candidates(content, old_text)):
idx = content.find(candidate)
if idx == -1:
continue
if replace_all:
matched_text = candidate
strategy_used = strategies[min(i, len(strategies) - 1)]
break
# Check uniqueness
last_idx = content.rfind(candidate)
if idx == last_idx:
matched_text = candidate
strategy_used = strategies[min(i, len(strategies) - 1)]
break
# Multiple matches — continue to next strategy
if matched_text is None:
# Generate helpful error
close = difflib.get_close_matches(old_text[:200], content.split("\n"), n=3, cutoff=0.4)
msg = f"Error: Could not find a unique match for old_text in {path}."
if close:
suggestions = "\n".join(f" {line}" for line in close)
msg += f"\n\nDid you mean one of these lines?\n{suggestions}"
return msg
if replace_all:
count = content.count(matched_text)
new_content = content.replace(matched_text, new_text)
else:
count = 1
new_content = content.replace(matched_text, new_text, 1)
# Write
with open(resolved, "w", encoding="utf-8") as f:
f.write(new_content)
# Build response with diff preview
diff = _compute_diff(content, new_content, path)
match_info = f" (matched via {strategy_used})" if strategy_used != "exact" else ""
result = f"Replaced {count} occurrence(s) in {path}{match_info}"
if diff:
result += f"\n\n{diff}"
return result
except Exception as e:
return f"Error editing file: {e}"
# ── Tool: list_directory ──────────────────────────────────────────────────
@mcp.tool()
def list_directory(path: str = ".", recursive: bool = False) -> str:
"""List directory contents with type indicators.
Args:
path: Directory path (relative to project root, default: root)
recursive: List recursively (default: False)
Returns:
Sorted directory listing with / suffix for directories
"""
resolved = _resolve_path(path)
if not os.path.isdir(resolved):
return f"Error: Directory not found: {path}"
try:
skip = {
".git",
"__pycache__",
"node_modules",
".venv",
".tox",
".mypy_cache",
".ruff_cache",
}
entries = []
if recursive:
for root, dirs, files in os.walk(resolved):
dirs[:] = sorted(d for d in dirs if d not in skip and not d.startswith("."))
rel_root = os.path.relpath(root, resolved)
if rel_root == ".":
rel_root = ""
for f in sorted(files):
if f.startswith("."):
continue
entries.append(os.path.join(rel_root, f) if rel_root else f)
if len(entries) >= 500:
entries.append("... (truncated at 500 entries)")
return "\n".join(entries)
else:
for entry in sorted(os.listdir(resolved)):
if entry.startswith(".") or entry in skip:
continue
full = os.path.join(resolved, entry)
suffix = "/" if os.path.isdir(full) else ""
entries.append(f"{entry}{suffix}")
return "\n".join(entries) if entries else "(empty directory)"
except Exception as e:
return f"Error listing directory: {e}"
# ── Tool: search_files ───────────────────────────────────────────────────
@mcp.tool()
def search_files(pattern: str, path: str = ".", include: str = "") -> str:
"""Search file contents using regex. Results sorted by modification time.
Uses ripgrep when available, falls back to Python regex.
Args:
pattern: Regex pattern to search for
path: Directory to search (relative to project root)
include: File glob filter (e.g. '*.py')
Returns:
Matching lines grouped by file with line numbers
"""
resolved = _resolve_path(path)
if not os.path.isdir(resolved):
return f"Error: Directory not found: {path}"
try:
cmd = [
"rg",
"-nH",
"--no-messages",
"--hidden",
"--max-count=20",
"--glob=!.git/*",
pattern,
]
if include:
cmd.extend(["--glob", include])
cmd.append(resolved)
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode <= 1:
output = result.stdout.strip()
if not output:
return "No matches found."
# Group by file, make paths relative
lines = []
for line in output.split("\n")[:SEARCH_RESULT_LIMIT]:
line = line.replace(PROJECT_ROOT + "/", "")
if len(line) > MAX_LINE_LENGTH:
line = line[:MAX_LINE_LENGTH] + "..."
lines.append(line)
total = output.count("\n") + 1
result_str = "\n".join(lines)
if total > SEARCH_RESULT_LIMIT:
result_str += (
f"\n\n... ({total} total matches, showing first {SEARCH_RESULT_LIMIT})"
)
return result_str
except FileNotFoundError:
pass
except subprocess.TimeoutExpired:
return "Error: Search timed out after 30 seconds"
# Fallback: Python regex
try:
compiled = re.compile(pattern)
matches = []
skip_dirs = {".git", "__pycache__", "node_modules", ".venv", ".tox"}
for root, dirs, files in os.walk(resolved):
dirs[:] = [d for d in dirs if d not in skip_dirs]
for fname in files:
if include and not fnmatch.fnmatch(fname, include):
continue
fpath = os.path.join(root, fname)
rel = os.path.relpath(fpath, PROJECT_ROOT)
try:
with open(fpath, encoding="utf-8", errors="ignore") as f:
for i, line in enumerate(f, 1):
if compiled.search(line):
matches.append(f"{rel}:{i}:{line.rstrip()[:MAX_LINE_LENGTH]}")
if len(matches) >= SEARCH_RESULT_LIMIT:
return "\n".join(matches) + "\n... (truncated)"
except (OSError, UnicodeDecodeError):
continue
return "\n".join(matches) if matches else "No matches found."
except re.error as e:
return f"Error: Invalid regex: {e}"
_ensure_snapshot_repo()
_snapshot_git("add", ".")
return _snapshot_git("write-tree")
except Exception:
return ""
# ── Tool: run_command ─────────────────────────────────────────────────────
MAX_COMMAND_OUTPUT = 30_000 # chars before truncation
@mcp.tool()
def run_command(command: str, cwd: str = "", timeout: int = 120) -> str:
@@ -668,7 +140,7 @@ def run_command(command: str, cwd: str = "", timeout: int = 120) -> str:
Returns:
Combined stdout/stderr with exit code
"""
timeout = min(timeout, 300) # Cap at 5 minutes
timeout = min(timeout, 300)
work_dir = _resolve_path(cwd) if cwd else PROJECT_ROOT
try:
@@ -698,7 +170,6 @@ def run_command(command: str, cwd: str = "", timeout: int = 120) -> str:
output = "\n".join(parts)
# Truncate large output (like opencode's MAX_METADATA_LENGTH)
if len(output) > MAX_COMMAND_OUTPUT:
output = (
output[:MAX_COMMAND_OUTPUT]
@@ -717,19 +188,7 @@ def run_command(command: str, cwd: str = "", timeout: int = 120) -> str:
return f"Error executing command: {e}"
# ── Tool: snapshot (git-based undo) ───────────────────────────────────────
def _take_snapshot() -> str:
"""Take a git snapshot and return the tree hash. Silent on failure."""
if not SNAPSHOT_DIR:
return ""
try:
_ensure_snapshot_repo()
_snapshot_git("add", ".")
return _snapshot_git("write-tree")
except Exception:
return ""
# ── Tool: undo_changes (git-based undo) ──────────────────────────────────
@mcp.tool()
@@ -1694,6 +1153,8 @@ def run_agent_tests(
def main() -> None:
global PROJECT_ROOT, SNAPSHOT_DIR
from aden_tools.file_ops import register_file_tools
parser = argparse.ArgumentParser(description="Coder Tools MCP Server")
parser.add_argument("--project-root", default="")
parser.add_argument("--port", type=int, default=int(os.getenv("CODER_TOOLS_PORT", "4002")))
@@ -1711,6 +1172,13 @@ def main() -> None:
logger.info(f"Project root: {PROJECT_ROOT}")
logger.info(f"Snapshot dir: {SNAPSHOT_DIR}")
register_file_tools(
mcp,
resolve_path=_resolve_path,
before_write=_take_snapshot,
project_root=PROJECT_ROOT,
)
if args.stdio:
mcp.run(transport="stdio")
else:
+94
View File
@@ -0,0 +1,94 @@
#!/usr/bin/env python3
"""
File Tools MCP Server
Minimal FastMCP server exposing 6 file tools (read_file, write_file, edit_file,
list_directory, search_files, run_command) with no path sandboxing.
Usage:
# Run with STDIO transport (for agent integration)
python files_server.py --stdio
# Run with HTTP transport
python files_server.py --port 4003
"""
from __future__ import annotations
import argparse
import logging
import os
import sys
logger = logging.getLogger(__name__)
def setup_logger() -> None:
"""Configure logger for files server."""
if not logger.handlers:
stream = sys.stderr if "--stdio" in sys.argv else sys.stdout
handler = logging.StreamHandler(stream)
formatter = logging.Formatter("[FILES] %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
setup_logger()
# Suppress FastMCP banner in STDIO mode
if "--stdio" in sys.argv:
import rich.console
_original_console_init = rich.console.Console.__init__
def _patched_console_init(self, *args, **kwargs):
kwargs["file"] = sys.stderr
_original_console_init(self, *args, **kwargs)
rich.console.Console.__init__ = _patched_console_init
from aden_tools.file_ops import register_file_tools # noqa: E402
from fastmcp import FastMCP # noqa: E402
mcp = FastMCP("files-tools")
register_file_tools(mcp)
# ── Entry point ───────────────────────────────────────────────────────────
def main() -> None:
"""Entry point for the File Tools MCP server."""
parser = argparse.ArgumentParser(description="File Tools MCP Server")
parser.add_argument(
"--port",
type=int,
default=int(os.getenv("FILES_PORT", "4003")),
help="HTTP server port (default: 4003)",
)
parser.add_argument(
"--host",
default="0.0.0.0",
help="HTTP server host (default: 0.0.0.0)",
)
parser.add_argument(
"--stdio",
action="store_true",
help="Use STDIO transport instead of HTTP",
)
args = parser.parse_args()
if not args.stdio:
logger.info("Registered 6 file tools: read_file, write_file, edit_file, "
"list_directory, search_files, run_command")
if args.stdio:
mcp.run(transport="stdio")
else:
logger.info(f"Starting File Tools server on {args.host}:{args.port}")
mcp.run(transport="http", host=args.host, port=args.port)
if __name__ == "__main__":
main()
+558
View File
@@ -0,0 +1,558 @@
"""
Shared file operation tools for MCP servers.
Provides 6 tools (read_file, write_file, edit_file, list_directory, search_files,
run_command) plus supporting helpers. Used by both files_server.py (unsandboxed)
and coder_tools_server.py (project-root sandboxed with git snapshots).
Usage:
from aden_tools.file_ops import register_file_tools
mcp = FastMCP("my-server")
register_file_tools(mcp) # unsandboxed defaults
register_file_tools(mcp, resolve_path=fn, ...) # sandboxed with hooks
"""
from __future__ import annotations
import difflib
import fnmatch
import os
import re
import subprocess
from collections.abc import Callable
from pathlib import Path
from fastmcp import FastMCP
# ── Constants ─────────────────────────────────────────────────────────────
MAX_READ_LINES = 2000
MAX_LINE_LENGTH = 2000
MAX_OUTPUT_BYTES = 50 * 1024 # 50KB byte budget for read output
MAX_COMMAND_OUTPUT = 30_000 # chars before truncation
SEARCH_RESULT_LIMIT = 100
BINARY_EXTENSIONS = frozenset(
{
".zip",
".tar",
".gz",
".bz2",
".xz",
".7z",
".rar",
".exe",
".dll",
".so",
".dylib",
".bin",
".class",
".jar",
".war",
".pyc",
".pyo",
".wasm",
".png",
".jpg",
".jpeg",
".gif",
".bmp",
".ico",
".webp",
".svg",
".mp3",
".mp4",
".avi",
".mov",
".mkv",
".wav",
".flac",
".pdf",
".doc",
".docx",
".xls",
".xlsx",
".ppt",
".pptx",
".sqlite",
".db",
".ttf",
".otf",
".woff",
".woff2",
".eot",
".o",
".a",
".lib",
".obj",
}
)
# ── Private helpers ───────────────────────────────────────────────────────
def _default_resolve_path(p: str) -> str:
"""Default path resolver — just resolves to absolute."""
return str(Path(p).resolve())
def _is_binary(filepath: str) -> bool:
"""Detect binary files by extension and content sampling."""
_, ext = os.path.splitext(filepath)
if ext.lower() in BINARY_EXTENSIONS:
return True
try:
with open(filepath, "rb") as f:
chunk = f.read(4096)
if b"\x00" in chunk:
return True
non_printable = sum(1 for b in chunk if b < 9 or (13 < b < 32) or b > 126)
return non_printable / max(len(chunk), 1) > 0.3
except OSError:
return False
def _levenshtein(a: str, b: str) -> int:
"""Standard Levenshtein distance."""
if not a:
return len(b)
if not b:
return len(a)
m, n = len(a), len(b)
dp = list(range(n + 1))
for i in range(1, m + 1):
prev = dp[0]
dp[0] = i
for j in range(1, n + 1):
temp = dp[j]
if a[i - 1] == b[j - 1]:
dp[j] = prev
else:
dp[j] = 1 + min(prev, dp[j], dp[j - 1])
prev = temp
return dp[n]
def _similarity(a: str, b: str) -> float:
maxlen = max(len(a), len(b))
if maxlen == 0:
return 1.0
return 1.0 - _levenshtein(a, b) / maxlen
def _fuzzy_find_candidates(content: str, old_text: str):
"""Yield candidate substrings from content that match old_text,
using a cascade of increasingly fuzzy strategies.
"""
# Strategy 1: Exact match
if old_text in content:
yield old_text
content_lines = content.split("\n")
search_lines = old_text.split("\n")
# Strip trailing empty line from search (common copy-paste artifact)
while search_lines and not search_lines[-1].strip():
search_lines = search_lines[:-1]
if not search_lines:
return
n_search = len(search_lines)
# Strategy 2: Line-trimmed match
for i in range(len(content_lines) - n_search + 1):
window = content_lines[i : i + n_search]
if all(cl.strip() == sl.strip() for cl, sl in zip(window, search_lines, strict=True)):
yield "\n".join(window)
# Strategy 3: Block-anchor match (first/last line as anchors, fuzzy middle)
if n_search >= 3:
first_trimmed = search_lines[0].strip()
last_trimmed = search_lines[-1].strip()
candidates = []
for i, line in enumerate(content_lines):
if line.strip() == first_trimmed:
end = i + n_search
if end <= len(content_lines) and content_lines[end - 1].strip() == last_trimmed:
block = content_lines[i:end]
middle_content = "\n".join(block[1:-1])
middle_search = "\n".join(search_lines[1:-1])
sim = _similarity(middle_content, middle_search)
candidates.append((sim, "\n".join(block)))
if candidates:
candidates.sort(key=lambda x: x[0], reverse=True)
if candidates[0][0] > 0.3:
yield candidates[0][1]
# Strategy 4: Whitespace-normalized match
normalized_search = re.sub(r"\s+", " ", old_text).strip()
for i in range(len(content_lines) - n_search + 1):
window = content_lines[i : i + n_search]
normalized_block = re.sub(r"\s+", " ", "\n".join(window)).strip()
if normalized_block == normalized_search:
yield "\n".join(window)
# Strategy 5: Indentation-flexible match
def _strip_indent(lines):
non_empty = [ln for ln in lines if ln.strip()]
if not non_empty:
return "\n".join(lines)
min_indent = min(len(ln) - len(ln.lstrip()) for ln in non_empty)
return "\n".join(ln[min_indent:] for ln in lines)
stripped_search = _strip_indent(search_lines)
for i in range(len(content_lines) - n_search + 1):
block = content_lines[i : i + n_search]
if _strip_indent(block) == stripped_search:
yield "\n".join(block)
# Strategy 6: Trimmed-boundary match
trimmed = old_text.strip()
if trimmed != old_text and trimmed in content:
yield trimmed
def _compute_diff(old: str, new: str, path: str) -> str:
"""Compute a unified diff for display."""
old_lines = old.splitlines(keepends=True)
new_lines = new.splitlines(keepends=True)
diff = difflib.unified_diff(old_lines, new_lines, fromfile=path, tofile=path, n=3)
result = "".join(diff)
if len(result) > 2000:
result = result[:2000] + "\n... (diff truncated)"
return result
# ── Factory ───────────────────────────────────────────────────────────────
def register_file_tools(
mcp: FastMCP,
*,
resolve_path: Callable[[str], str] | None = None,
before_write: Callable[[], None] | None = None,
project_root: str | None = None,
) -> None:
"""Register the 5 shared file tools on an MCP server.
Args:
mcp: FastMCP instance to register tools on.
resolve_path: Path resolver. Default: resolve to absolute path.
Raise ValueError to reject paths (e.g. outside sandbox).
before_write: Hook called before write/edit operations (e.g. git snapshot).
project_root: If set, search_files relativizes output paths to this root.
"""
_resolve = resolve_path or _default_resolve_path
@mcp.tool()
def read_file(path: str, offset: int = 1, limit: int = 0) -> str:
"""Read file contents with line numbers and byte-budget truncation.
Binary files are detected and rejected. Large files are automatically
truncated at 2000 lines or 50KB. Use offset and limit to paginate.
Args:
path: Absolute file path to read.
offset: Starting line number, 1-indexed (default: 1).
limit: Max lines to return, 0 = up to 2000 (default: 0).
"""
resolved = _resolve(path)
if os.path.isdir(resolved):
entries = []
for entry in sorted(os.listdir(resolved)):
full = os.path.join(resolved, entry)
suffix = "/" if os.path.isdir(full) else ""
entries.append(f" {entry}{suffix}")
total = len(entries)
return f"Directory: {path} ({total} entries)\n" + "\n".join(entries[:200])
if not os.path.isfile(resolved):
return f"Error: File not found: {path}"
if _is_binary(resolved):
size = os.path.getsize(resolved)
return f"Binary file: {path} ({size:,} bytes). Cannot display binary content."
try:
with open(resolved, encoding="utf-8", errors="replace") as f:
all_lines = f.readlines()
total_lines = len(all_lines)
start_idx = max(0, offset - 1)
effective_limit = limit if limit > 0 else MAX_READ_LINES
end_idx = min(start_idx + effective_limit, total_lines)
output_lines = []
byte_count = 0
truncated_by_bytes = False
for i in range(start_idx, end_idx):
line = all_lines[i].rstrip("\n\r")
if len(line) > MAX_LINE_LENGTH:
line = line[:MAX_LINE_LENGTH] + "..."
formatted = f"{i + 1:>6}\t{line}"
line_bytes = len(formatted.encode("utf-8")) + 1
if byte_count + line_bytes > MAX_OUTPUT_BYTES:
truncated_by_bytes = True
break
output_lines.append(formatted)
byte_count += line_bytes
result = "\n".join(output_lines)
lines_shown = len(output_lines)
actual_end = start_idx + lines_shown
if actual_end < total_lines or truncated_by_bytes:
result += f"\n\n(Showing lines {start_idx + 1}-{actual_end} of {total_lines}."
if truncated_by_bytes:
result += " Truncated by byte budget."
result += f" Use offset={actual_end + 1} to continue reading.)"
return result
except Exception as e:
return f"Error reading file: {e}"
@mcp.tool()
def write_file(path: str, content: str) -> str:
"""Create or overwrite a file with the given content.
Automatically creates parent directories.
Args:
path: Absolute file path to write.
content: Complete file content to write.
"""
resolved = _resolve(path)
try:
if before_write:
before_write()
existed = os.path.isfile(resolved)
os.makedirs(os.path.dirname(resolved), exist_ok=True)
with open(resolved, "w", encoding="utf-8") as f:
f.write(content)
line_count = content.count("\n") + (1 if content and not content.endswith("\n") else 0)
action = "Updated" if existed else "Created"
return f"{action} {path} ({len(content):,} bytes, {line_count} lines)"
except Exception as e:
return f"Error writing file: {e}"
@mcp.tool()
def edit_file(path: str, old_text: str, new_text: str, replace_all: bool = False) -> str:
"""Replace text in a file using a fuzzy-match cascade.
Tries exact match first, then falls back through increasingly fuzzy
strategies: line-trimmed, block-anchor, whitespace-normalized,
indentation-flexible, and trimmed-boundary matching.
Args:
path: Absolute file path to edit.
old_text: Text to find (fuzzy matching applied if exact fails).
new_text: Replacement text.
replace_all: Replace all occurrences (default: first only).
"""
resolved = _resolve(path)
if not os.path.isfile(resolved):
return f"Error: File not found: {path}"
try:
with open(resolved, encoding="utf-8") as f:
content = f.read()
if before_write:
before_write()
matched_text = None
strategy_used = None
strategies = [
"exact",
"line-trimmed",
"block-anchor",
"whitespace-normalized",
"indentation-flexible",
"trimmed-boundary",
]
for i, candidate in enumerate(_fuzzy_find_candidates(content, old_text)):
idx = content.find(candidate)
if idx == -1:
continue
if replace_all:
matched_text = candidate
strategy_used = strategies[min(i, len(strategies) - 1)]
break
last_idx = content.rfind(candidate)
if idx == last_idx:
matched_text = candidate
strategy_used = strategies[min(i, len(strategies) - 1)]
break
if matched_text is None:
close = difflib.get_close_matches(
old_text[:200], content.split("\n"), n=3, cutoff=0.4
)
msg = f"Error: Could not find a unique match for old_text in {path}."
if close:
suggestions = "\n".join(f" {line}" for line in close)
msg += f"\n\nDid you mean one of these lines?\n{suggestions}"
return msg
if replace_all:
count = content.count(matched_text)
new_content = content.replace(matched_text, new_text)
else:
count = 1
new_content = content.replace(matched_text, new_text, 1)
with open(resolved, "w", encoding="utf-8") as f:
f.write(new_content)
diff = _compute_diff(content, new_content, path)
match_info = f" (matched via {strategy_used})" if strategy_used != "exact" else ""
result = f"Replaced {count} occurrence(s) in {path}{match_info}"
if diff:
result += f"\n\n{diff}"
return result
except Exception as e:
return f"Error editing file: {e}"
@mcp.tool()
def list_directory(path: str = ".", recursive: bool = False) -> str:
"""List directory contents with type indicators.
Directories have a / suffix. Hidden files and common build directories
are skipped.
Args:
path: Absolute directory path (default: current directory).
recursive: List recursively (default: false). Truncates at 500 entries.
"""
resolved = _resolve(path)
if not os.path.isdir(resolved):
return f"Error: Directory not found: {path}"
try:
skip = {
".git",
"__pycache__",
"node_modules",
".venv",
".tox",
".mypy_cache",
".ruff_cache",
}
entries: list[str] = []
if recursive:
for root, dirs, files in os.walk(resolved):
dirs[:] = sorted(d for d in dirs if d not in skip and not d.startswith("."))
rel_root = os.path.relpath(root, resolved)
if rel_root == ".":
rel_root = ""
for f in sorted(files):
if f.startswith("."):
continue
entries.append(os.path.join(rel_root, f) if rel_root else f)
if len(entries) >= 500:
entries.append("... (truncated at 500 entries)")
return "\n".join(entries)
else:
for entry in sorted(os.listdir(resolved)):
if entry.startswith(".") or entry in skip:
continue
full = os.path.join(resolved, entry)
suffix = "/" if os.path.isdir(full) else ""
entries.append(f"{entry}{suffix}")
return "\n".join(entries) if entries else "(empty directory)"
except Exception as e:
return f"Error listing directory: {e}"
@mcp.tool()
def search_files(pattern: str, path: str = ".", include: str = "") -> str:
"""Search file contents using regex. Uses ripgrep when available, falls back to Python regex.
Results sorted by file with line numbers.
Args:
pattern: Regex pattern to search for.
path: Absolute directory path to search (default: current directory).
include: File glob filter (e.g. '*.py').
"""
resolved = _resolve(path)
if not os.path.isdir(resolved):
return f"Error: Directory not found: {path}"
# Try ripgrep first
try:
cmd = [
"rg",
"-nH",
"--no-messages",
"--hidden",
"--max-count=20",
"--glob=!.git/*",
pattern,
]
if include:
cmd.extend(["--glob", include])
cmd.append(resolved)
rg_result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if rg_result.returncode <= 1:
output = rg_result.stdout.strip()
if not output:
return "No matches found."
lines = []
for line in output.split("\n")[:SEARCH_RESULT_LIMIT]:
if project_root:
line = line.replace(project_root + "/", "")
if len(line) > MAX_LINE_LENGTH:
line = line[:MAX_LINE_LENGTH] + "..."
lines.append(line)
total = output.count("\n") + 1
result_str = "\n".join(lines)
if total > SEARCH_RESULT_LIMIT:
result_str += (
f"\n\n... ({total} total matches, showing first {SEARCH_RESULT_LIMIT})"
)
return result_str
except FileNotFoundError:
pass # ripgrep not installed — fall through to Python
except subprocess.TimeoutExpired:
return "Error: Search timed out after 30 seconds"
# Fallback: Python regex
try:
compiled = re.compile(pattern)
matches: list[str] = []
skip_dirs = {".git", "__pycache__", "node_modules", ".venv", ".tox"}
for root, dirs, files in os.walk(resolved):
dirs[:] = [d for d in dirs if d not in skip_dirs]
for fname in files:
if include and not fnmatch.fnmatch(fname, include):
continue
fpath = os.path.join(root, fname)
display_path = (
os.path.relpath(fpath, project_root) if project_root else fpath
)
try:
with open(fpath, encoding="utf-8", errors="ignore") as f:
for i, line in enumerate(f, 1):
if compiled.search(line):
matches.append(
f"{display_path}:{i}:{line.rstrip()[:MAX_LINE_LENGTH]}"
)
if len(matches) >= SEARCH_RESULT_LIMIT:
return "\n".join(matches) + "\n... (truncated)"
except (OSError, UnicodeDecodeError):
continue
return "\n".join(matches) if matches else "No matches found."
except re.error as e:
return f"Error: Invalid regex: {e}"
+5
View File
@@ -0,0 +1,5 @@
"""Thin re-export of shared file tools for GCU subagents."""
from aden_tools.file_ops import register_file_tools
__all__ = ["register_file_tools"]