feat: consolidate search and list file tools

This commit is contained in:
Richard Tang
2026-04-30 15:43:15 -07:00
parent 76a7dd4bd5
commit f8233bda56
15 changed files with 687 additions and 384 deletions
@@ -42,7 +42,7 @@ COMPACTABLE_TOOLS: frozenset[str] = frozenset(
"write_file",
"edit_file",
"browser_screenshot",
"list_directory",
"search_files",
}
)
@@ -858,7 +858,7 @@ def build_emergency_summary(
if not all_files:
parts.append(
"NOTE: Large tool results may have been saved to files. "
"Use list_directory to check the data directory."
"Use search_files(target='files', path='.') to check the data directory."
)
except Exception:
parts.append("NOTE: Large tool results were saved to files. Use read_file(path='<path>') to read them.")
+12 -11
View File
@@ -41,7 +41,6 @@ _QUEEN_INDEPENDENT_TOOLS = [
"write_file",
"edit_file",
"hashline_edit",
"list_directory",
"search_files",
"run_command",
"undo_changes",
@@ -60,7 +59,6 @@ _QUEEN_INDEPENDENT_TOOLS = [
# (e.g. inspect an existing skill) before committing.
_QUEEN_INCUBATING_TOOLS = [
"read_file",
"list_directory",
"search_files",
"run_command",
# Schedule lives on the colony, not on the queen session — pass it
@@ -76,7 +74,6 @@ _QUEEN_INCUBATING_TOOLS = [
_QUEEN_WORKING_TOOLS = [
# Read-only
"read_file",
"list_directory",
"search_files",
"run_command",
# Monitoring + worker dialogue
@@ -95,7 +92,6 @@ _QUEEN_WORKING_TOOLS = [
_QUEEN_REVIEWING_TOOLS = [
# Read-only
"read_file",
"list_directory",
"search_files",
"run_command",
# Status + escalation replies
@@ -249,8 +245,11 @@ re-read state.
See "Independent execution" for the per-step flow and granularity rule.
## File I/O (coder-tools MCP)
- read_file, write_file, edit_file, hashline_edit, list_directory, \
search_files, run_command, undo_changes
- read_file, write_file, edit_file, hashline_edit, search_files, \
run_command, undo_changes
- search_files covers grep/find/ls in one tool: target='content' to \
search inside files, target='files' (with a glob like '*.py') to list \
or find files. Mtime-sorted in files mode.
## Browser Automation (gcu-tools MCP)
- Use `browser_*` tools (browser_start, browser_navigate, browser_click, \
@@ -277,9 +276,10 @@ purpose — your job in this phase is to nail the spec, not keep doing \
work. Available:
## Read-only inspection (coder-tools MCP)
- read_file, list_directory, search_files, run_command for confirming \
details before you commit (e.g. peek at an existing skill in \
~/.hive/skills/, sanity-check an API URL).
- read_file, search_files, run_command for confirming details before \
you commit (e.g. peek at an existing skill in ~/.hive/skills/, sanity-check \
an API URL). search_files covers both grep (target='content') and ls/find \
(target='files', glob like '*.py').
## Approved → operational checklist (use your judgement, ask only what's missing)
The conversation that got you here probably did NOT cover all of:
@@ -373,7 +373,8 @@ operational, not editorial.
born from a fresh chat via start_incubating_colony.
## Read-only inspection
- read_file, list_directory, search_files, run_command
- read_file, search_files, run_command (search_files covers grep/find/ls \
via target='content' or target='files')
When every worker has reported (success or failure), the phase \
auto-moves to REVIEWING. You do not need to call a transition tool \
@@ -392,7 +393,7 @@ _queen_tools_reviewing = """
# Tools (REVIEWING mode)
Workers have finished. You have:
- Read-only: read_file, list_directory, search_files, run_command
- Read-only: read_file, search_files, run_command (search_files = grep+find+ls)
- get_worker_status(focus?) Pull the final status / per-worker reports
- list_worker_questions() / reply_to_worker(request_id, reply) Answer any \
late escalations still in the inbox
@@ -37,11 +37,11 @@ logger = logging.getLogger(__name__)
_TOOL_CATEGORIES: dict[str, list[str]] = {
# Read-only file operations — safe baseline for every knowledge queen.
# search_files is unified: covers content grep AND directory listing
# via target='content' / target='files'. It replaces list_directory,
# list_dir, and list_files.
"file_read": [
"read_file",
"list_directory",
"list_dir",
"list_files",
"search_files",
"grep_search",
"pdf_read",
@@ -80,23 +80,6 @@ _TOOL_CATEGORIES: dict[str, list[str]] = {
],
# Browser automation — every tool from the gcu-tools MCP server.
"browser": ["@server:gcu-tools"],
# External research / information-gathering.
"research": [
"search_papers",
"download_paper",
"search_wikipedia",
"web_scrape",
],
# Security scanners — pentest-ish, only for engineering/security roles.
"security": [
"dns_security_scan",
"http_headers_scan",
"port_scan",
"ssl_tls_scan",
"subdomain_enumerate",
"tech_stack_detect",
"risk_score",
],
# Lightweight context helpers — good default for every queen.
"time_context": [
"get_current_time",
+1 -1
View File
@@ -71,7 +71,7 @@ class ToolRegistry:
{
# File system reads
"read_file",
"list_directory",
"search_files",
"grep",
"glob",
# Web reads
-1
View File
@@ -29,7 +29,6 @@ _ALWAYS_AVAILABLE_TOOLS: frozenset[str] = frozenset(
"read_file",
"write_file",
"edit_file",
"list_directory",
"search_files",
"hashline_edit",
"set_output",
+1 -1
View File
@@ -683,7 +683,7 @@ class Orchestrator:
# Set per-execution data_dir and agent_id so data tools and
# spillover files share the same session-scoped directory, and
# so MCP tools whose server-side schemas mark agent_id as a
# required field (list_dir, hashline_edit, replace_file_content,
# required field (search_files, hashline_edit, replace_file_content,
# execute_command_tool, …) get a valid value injected even on
# registry instances where agent_loader.setup() didn't populate
# the session_context. Without this, FastMCP rejects those
@@ -43,7 +43,6 @@ _WORKER_INHERITED_TOOLS: frozenset[str] = frozenset(
"write_file",
"edit_file",
"hashline_edit",
"list_directory",
"search_files",
"undo_changes",
# Shell
+1 -1
View File
@@ -889,7 +889,7 @@ def test_concurrency_safe_allowlist_is_conservative():
allowlist = ToolRegistry.CONCURRENCY_SAFE_TOOLS
# Positive assertions: known-safe read operations are present.
for name in ("read_file", "grep", "glob", "list_directory", "web_search"):
for name in ("read_file", "grep", "glob", "search_files", "web_search"):
assert name in allowlist, f"{name} should be concurrency-safe"
# Negative assertions: nothing that mutates state is allowed in.
+66
View File
@@ -0,0 +1,66 @@
def search_tool(pattern: str, target: str = "content", path: str = ".",
file_glob: str = None, limit: int = 50, offset: int = 0,
output_mode: str = "content", context: int = 0,
task_id: str = "default") -> str:
"""Search for content or files."""
try:
# Track searches to detect *consecutive* repeated search loops.
# Include pagination args so users can page through truncated
# results without tripping the repeated-search guard.
search_key = (
"search",
pattern,
target,
str(path),
file_glob or "",
limit,
offset,
)
with _read_tracker_lock:
task_data = _read_tracker.setdefault(task_id, {
"last_key": None, "consecutive": 0, "read_history": set(),
})
if task_data["last_key"] == search_key:
task_data["consecutive"] += 1
else:
task_data["last_key"] = search_key
task_data["consecutive"] = 1
count = task_data["consecutive"]
if count >= 4:
return json.dumps({
"error": (
f"BLOCKED: You have run this exact search {count} times in a row. "
"The results have NOT changed. You already have this information. "
"STOP re-searching and proceed with your task."
),
"pattern": pattern,
"already_searched": count,
}, ensure_ascii=False)
file_ops = _get_file_ops(task_id)
result = file_ops.search(
pattern=pattern, path=path, target=target, file_glob=file_glob,
limit=limit, offset=offset, output_mode=output_mode, context=context
)
if hasattr(result, 'matches'):
for m in result.matches:
if hasattr(m, 'content') and m.content:
m.content = redact_sensitive_text(m.content)
result_dict = result.to_dict()
if count >= 3:
result_dict["_warning"] = (
f"You have run this exact search {count} times consecutively. "
"The results have not changed. Use the information you already have."
)
result_json = json.dumps(result_dict, ensure_ascii=False)
# Hint when results were truncated — explicit next offset is clearer
# than relying on the model to infer it from total_count vs match count.
if result_dict.get("truncated"):
next_offset = offset + limit
result_json += f"\n\n[Hint: Results truncated. Use offset={next_offset} to see more, or narrow with a more specific pattern or file_glob.]"
return result_json
except Exception as e:
return tool_error(str(e))
+4 -3
View File
@@ -2,8 +2,9 @@
"""
File Tools MCP Server
Minimal FastMCP server exposing 6 file tools (read_file, write_file, edit_file,
list_directory, search_files, run_command) with no path sandboxing.
Minimal FastMCP server exposing 5 file tools (read_file, write_file, edit_file,
hashline_edit, search_files) with no path sandboxing. ``search_files`` is
unified covers grep, find, and ls via target='content' / target='files'.
Usage:
# Run with STDIO transport (for agent integration)
@@ -82,7 +83,7 @@ def main() -> None:
if not args.stdio:
logger.info(
"Registered 6 file tools: read_file, write_file, edit_file, list_directory, search_files, run_command"
"Registered 5 file tools: read_file, write_file, edit_file, hashline_edit, search_files"
)
if args.stdio:
+377 -164
View File
@@ -1,8 +1,12 @@
"""
Shared file operation tools for MCP servers.
Provides 7 tools (read_file, write_file, edit_file, hashline_edit,
list_directory, search_files, run_command) plus supporting helpers.
Provides 5 tools (read_file, write_file, edit_file, hashline_edit,
search_files) plus supporting helpers. ``search_files`` is unified
it covers both content grep (``target='content'``) and file listing
(``target='files'``), replacing the older ``list_directory`` tool and
the LLM's choice between grep/find/ls.
Used by both files_server.py (unsandboxed) and coder_tools_server.py
(project-root sandboxed with git snapshots).
@@ -108,6 +112,285 @@ BINARY_EXTENSIONS = frozenset(
}
)
# ── search_files anti-loop tracker ────────────────────────────────────────
#
# Process-level memory of the most recent search_files call per task. When
# the same query (target+pattern+path+glob+pagination+output) is repeated
# back-to-back, we warn the model on the 3rd hit and block on the 4th.
# Mirrors the Hermes design — see scripts/hermes_search_files.md.
import threading as _threading
_SEARCH_TRACKER_LOCK = _threading.Lock()
_SEARCH_TRACKER: dict[str, dict] = {}
# Skip set shared by both search targets — common build/cache dirs that are
# almost never what the model wants to walk.
_SEARCH_SKIP_DIRS = frozenset(
{".git", "__pycache__", "node_modules", ".venv", ".tox", ".mypy_cache", ".ruff_cache"}
)
def _relativize(path: str, root: str | None) -> str:
"""Best-effort relative path; falls back to the original on cross-volume."""
if not root:
return path
try:
norm_path = os.path.normpath(path.replace("/", os.sep))
norm_root = os.path.normpath(root.replace("/", os.sep))
return os.path.relpath(norm_path, norm_root)
except ValueError:
return path
def _do_search_files_target(
pattern: str,
resolved: str,
display_root: str,
limit: int,
offset: int,
) -> str:
"""target='files': enumerate files matching a glob, mtime-sorted (newest first)."""
if not os.path.isdir(resolved):
return f"Error: Directory not found: {resolved}"
glob = pattern or "*"
files: list[tuple[float, str]] = []
# Try ripgrep --files first; it respects .gitignore which is what we want.
try:
cmd = [
"rg",
"--files",
"--no-messages",
"--hidden",
"--glob=!.git/*",
]
if glob and glob != "*":
cmd.extend(["--glob", glob])
cmd.append(resolved)
rg = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=30,
encoding="utf-8",
stdin=subprocess.DEVNULL,
)
if rg.returncode <= 1:
for raw in rg.stdout.splitlines():
raw = raw.strip()
if not raw:
continue
try:
files.append((os.path.getmtime(raw), raw))
except OSError:
continue
else:
files = []
except FileNotFoundError:
# ripgrep absent — fall through to os.walk
files = []
except subprocess.TimeoutExpired:
return "Error: file listing timed out after 30 seconds"
# Python fallback (also runs when rg returned nothing on platforms where
# rg.returncode reports >1 for "no files in glob").
if not files:
for root, dirs, fnames in os.walk(resolved):
dirs[:] = [d for d in dirs if d not in _SEARCH_SKIP_DIRS and not d.startswith(".")]
for fname in fnames:
if fname.startswith("."):
continue
if glob and glob != "*" and not fnmatch.fnmatch(fname, glob):
continue
full = os.path.join(root, fname)
try:
files.append((os.path.getmtime(full), full))
except OSError:
continue
files.sort(reverse=True)
total = len(files)
page = files[offset : offset + max(0, int(limit))]
if not page:
return "No files found." if total == 0 else f"No files at offset {offset} (total: {total})."
lines = [_relativize(p, display_root) for _, p in page]
out = "\n".join(lines)
next_offset = offset + len(page)
if total > next_offset:
out += (
f"\n\n[Hint: showing {len(page)} of {total} files. "
f"Use offset={next_offset} for more, or narrow with a more specific glob.]"
)
return out
def _do_search_content_target(
pattern: str,
resolved: str,
project_root: str | None,
file_glob: str,
limit: int,
offset: int,
output_mode: str,
context: int,
hashline: bool,
) -> str:
"""target='content': regex search across file contents (ripgrep + Python fallback)."""
display_root = project_root or (resolved if os.path.isdir(resolved) else os.path.dirname(resolved))
cap = max(1, int(limit))
# Try ripgrep first.
try:
cmd = ["rg", "-nH", "--no-messages", "--hidden", "--glob=!.git/*"]
if context and output_mode == "content":
cmd.extend(["-C", str(int(context))])
if file_glob:
cmd.extend(["--glob", file_glob])
if output_mode == "files_only":
cmd.append("-l")
elif output_mode == "count":
cmd.append("-c")
cmd.append(pattern)
cmd.append(resolved)
rg = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=30,
encoding="utf-8",
stdin=subprocess.DEVNULL,
)
if rg.returncode <= 1:
raw_lines = [ln for ln in rg.stdout.splitlines() if ln]
total = len(raw_lines)
page = raw_lines[offset : offset + cap]
if not page:
return "No matches found." if total == 0 else f"No matches at offset {offset} (total: {total})."
formatted: list[str] = []
for line in page:
# Relativize path prefix on every line.
m = re.match(r"^(.+?):(\d+):(.*)$", line) if output_mode == "content" else None
if m:
fpath, lineno, rest = m.group(1), m.group(2), m.group(3)
rel = _relativize(fpath, display_root)
if hashline:
h = compute_line_hash(rest)
line = f"{rel}:{lineno}:{h}|{rest}"
else:
line = f"{rel}:{lineno}:{rest}"
else:
# files_only/count: single path (or path:count) per line
head, sep, tail = line.partition(":")
if sep and tail.isdigit():
line = f"{_relativize(head, display_root)}:{tail}"
else:
line = _relativize(line, display_root)
if len(line) > MAX_LINE_LENGTH:
line = line[:MAX_LINE_LENGTH] + "..."
formatted.append(line)
out = "\n".join(formatted)
next_offset = offset + len(page)
if total > next_offset:
out += (
f"\n\n[Hint: showing {len(page)} of {total} matches. "
f"Use offset={next_offset} for more, or narrow with file_glob/pattern.]"
)
return out
except FileNotFoundError:
pass # ripgrep missing — Python fallback below
except subprocess.TimeoutExpired:
return "Error: search timed out after 30 seconds"
# Python fallback (no ripgrep): regex over file contents.
try:
compiled = re.compile(pattern)
except re.error as e:
return f"Error: invalid regex: {e}"
if os.path.isfile(resolved):
candidates = [resolved]
else:
candidates = []
for root, dirs, fnames in os.walk(resolved):
dirs[:] = [d for d in dirs if d not in _SEARCH_SKIP_DIRS and not d.startswith(".")]
for fname in fnames:
if file_glob and not fnmatch.fnmatch(fname, file_glob):
continue
candidates.append(os.path.join(root, fname))
# files_only / count modes need per-file aggregation.
if output_mode in ("files_only", "count"):
items: list[tuple[str, int]] = []
for fpath in candidates:
try:
with open(fpath, encoding="utf-8", errors="ignore") as f:
n = sum(1 for line in f if compiled.search(line.rstrip()))
except OSError:
continue
if n:
items.append((fpath, n))
total = len(items)
page = items[offset : offset + cap]
if not page:
return "No matches found." if total == 0 else f"No matches at offset {offset} (total: {total})."
if output_mode == "files_only":
lines = [_relativize(p, display_root) for p, _ in page]
else:
lines = [f"{_relativize(p, display_root)}:{n}" for p, n in page]
out = "\n".join(lines)
next_offset = offset + len(page)
if total > next_offset:
out += f"\n\n[Hint: showing {len(page)} of {total}. Use offset={next_offset} for more.]"
return out
# output_mode == "content"
matches: list[str] = []
for fpath in candidates:
rel = _relativize(fpath, display_root)
try:
with open(fpath, encoding="utf-8", errors="ignore") as f:
buf = f.readlines()
except OSError:
continue
for i, raw in enumerate(buf, 1):
stripped = raw.rstrip()
if not compiled.search(stripped):
continue
if context > 0:
lo = max(0, i - 1 - context)
hi = min(len(buf), i + context)
ctx = []
for j in range(lo, hi):
marker = ":" if (j + 1) == i else "-"
ln = buf[j].rstrip()
ctx.append(f"{rel}:{j + 1}{marker}{ln[:MAX_LINE_LENGTH]}")
matches.append("\n".join(ctx))
elif hashline:
h = compute_line_hash(stripped)
matches.append(f"{rel}:{i}:{h}|{stripped}")
else:
matches.append(f"{rel}:{i}:{stripped[:MAX_LINE_LENGTH]}")
total = len(matches)
page = matches[offset : offset + cap]
if not page:
return "No matches found." if total == 0 else f"No matches at offset {offset} (total: {total})."
out = "\n\n".join(page) if context > 0 else "\n".join(page)
next_offset = offset + len(page)
if total > next_offset:
out += (
f"\n\n[Hint: showing {len(page)} of {total} matches. "
f"Use offset={next_offset} for more, or narrow with file_glob/pattern.]"
)
return out
# ── Context-aware sandboxing ─────────────────────────────────────────────────
# Context variable for additional allowed paths (beyond base_root)
@@ -603,180 +886,110 @@ def register_file_tools(
return f"Error editing file: {e}"
@mcp.tool()
def list_directory(path: str = ".", recursive: bool = False) -> str:
"""List directory contents with type indicators.
def search_files(
pattern: str,
target: str = "content",
path: str = ".",
file_glob: str = "",
limit: int = 50,
offset: int = 0,
output_mode: str = "content",
context: int = 0,
hashline: bool = False,
task_id: str = "",
) -> str:
"""Search file contents or find files by name. Use this instead of grep, find, or ls.
Directories have a / suffix. Hidden files and common build directories
are skipped.
Two modes:
target='content' (default): Regex search inside files. Output modes:
'content' (lines+numbers, default), 'files_only' (paths only), 'count' (per-file counts).
target='files': Find files by glob pattern (e.g. '*.py', '*config*').
Also use this instead of ls results sorted by modification time (newest first).
Pagination: limit/offset both apply; the response includes a hint with the
next offset when truncated. The same query repeated back-to-back is warned
at the 3rd call and blocked at the 4th use the results you already have.
Args:
path: Absolute directory path (default: current directory).
recursive: List recursively (default: false). Truncates at 500 entries.
pattern: Regex (content mode) or glob (files mode, e.g. '*.py'). For
an "ls"-style listing pass '*' or '*.<ext>'.
target: 'content' to grep inside files, 'files' to list/find files.
Legacy aliases: 'grep' -> 'content', 'find'/'ls' -> 'files'.
path: Directory (or, in content mode, a single file) to search.
file_glob: Restrict content search to filenames matching this glob.
Ignored in files mode (use ``pattern``).
limit: Max results to return (default 50).
offset: Skip first N results for pagination (default 0).
output_mode: Content-mode output shape 'content' | 'files_only' | 'count'.
context: Lines of context before and after each match (content mode only).
hashline: Content mode: include N:hhhh hash anchors for hashline_edit.
task_id: Optional anti-loop scope key (defaults to a shared bucket).
"""
resolved = _resolve(path)
if not os.path.isdir(resolved):
return f"Error: Directory not found: {path}"
# Legacy aliases — keep older prompts working.
if target in ("grep",):
target = "content"
elif target in ("find", "ls"):
target = "files"
if target not in ("content", "files"):
return f"Error: invalid target '{target}'. Use 'content' or 'files'."
if output_mode not in ("content", "files_only", "count"):
return f"Error: invalid output_mode '{output_mode}'. Use 'content', 'files_only', or 'count'."
# Anti-loop guard. Key includes everything that would change results so
# paginating through the same query doesn't trip the alarm.
key = (target, pattern, str(path), file_glob, int(limit), int(offset), output_mode, int(context))
bucket = task_id or "_default"
with _SEARCH_TRACKER_LOCK:
td = _SEARCH_TRACKER.setdefault(bucket, {"last_key": None, "consecutive": 0})
if td["last_key"] == key:
td["consecutive"] += 1
else:
td["last_key"] = key
td["consecutive"] = 1
consecutive = td["consecutive"]
if consecutive >= 4:
return (
f"BLOCKED: this exact search has run {consecutive} times in a row. "
"Results have NOT changed. Use the information you already have and proceed."
)
try:
skip = {
".git",
"__pycache__",
"node_modules",
".venv",
".tox",
".mypy_cache",
".ruff_cache",
}
entries: list[str] = []
if recursive:
for root, dirs, files in os.walk(resolved):
dirs[:] = sorted(d for d in dirs if d not in skip and not d.startswith("."))
rel_root = os.path.relpath(root, resolved)
if rel_root == ".":
rel_root = ""
for f in sorted(files):
if f.startswith("."):
continue
entries.append(os.path.join(rel_root, f) if rel_root else f)
if len(entries) >= 500:
entries.append("... (truncated at 500 entries)")
return "\n".join(entries)
else:
for entry in sorted(os.listdir(resolved)):
if entry.startswith(".") or entry in skip:
continue
full = os.path.join(resolved, entry)
suffix = "/" if os.path.isdir(full) else ""
entries.append(f"{entry}{suffix}")
return "\n".join(entries) if entries else "(empty directory)"
resolved = _resolve(path)
except Exception as e:
return f"Error listing directory: {e}"
return f"Error: {e}"
@mcp.tool()
def search_files(pattern: str, path: str = ".", include: str = "", hashline: bool = False) -> str:
"""Search file contents using regex. Uses ripgrep if available.
Results sorted by file with line numbers. Set hashline=True to include
content-hash anchors (N:hhhh) for use with hashline_edit.
Args:
pattern: Regex pattern to search for.
path: Absolute directory path to search (default: current directory).
include: File glob filter (e.g. '*.py').
hashline: If True, include hash anchors in results (default: False).
"""
resolved = _resolve(path)
if not os.path.isdir(resolved):
return f"Error: Directory not found: {path}"
# Try ripgrep first
try:
cmd = [
"rg",
"-nH",
"--no-messages",
"--hidden",
"--max-count=20",
"--glob=!.git/*",
pattern,
]
if include:
cmd.extend(["--glob", include])
cmd.append(resolved)
rg_result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=30,
encoding="utf-8",
stdin=subprocess.DEVNULL,
if target == "files":
result = _do_search_files_target(
pattern=pattern,
resolved=resolved,
display_root=project_root or resolved,
limit=limit,
offset=offset,
)
if rg_result.returncode <= 1:
output = rg_result.stdout.strip()
if not output:
return "No matches found."
lines = []
for line in output.split("\n")[:SEARCH_RESULT_LIMIT]:
if project_root:
line = line.replace(project_root + "/", "")
if hashline:
# Parse file:linenum:content and insert hash anchor
parts = line.split(":", 2)
if len(parts) >= 3:
content = parts[2]
h = compute_line_hash(content)
line = f"{parts[0]}:{parts[1]}:{h}|{content}"
else:
# Platform-agnostic relativization: ripgrep may output
# forward or backslash paths; normalize before relpath (Windows).
match = re.match(r"^(.+):(\d+):", line)
if match:
path_part, line_num, rest = (
match.group(1),
match.group(2),
line[match.end() :],
# content mode allows a single file as path; the target=files mode does not
if not os.path.isdir(resolved) and not os.path.isfile(resolved):
return f"Error: Path not found: {path}"
result = _do_search_content_target(
pattern=pattern,
resolved=resolved,
project_root=project_root,
file_glob=file_glob,
limit=limit,
offset=offset,
output_mode=output_mode,
context=context,
hashline=hashline,
)
path_part = os.path.normpath(path_part.replace("/", os.sep))
proj_norm = os.path.normpath(project_root.replace("/", os.sep))
try:
rel = os.path.relpath(path_part, proj_norm)
line = f"{rel}:{line_num}:{rest}"
except ValueError:
pass
if len(line) > MAX_LINE_LENGTH:
line = line[:MAX_LINE_LENGTH] + "..."
lines.append(line)
total = output.count("\n") + 1
result_str = "\n".join(lines)
if total > SEARCH_RESULT_LIMIT:
result_str += f"\n\n... ({total} total matches, showing first {SEARCH_RESULT_LIMIT})"
return result_str
except FileNotFoundError:
pass # ripgrep not installed — fall through to Python
except subprocess.TimeoutExpired:
return "Error: Search timed out after 30 seconds"
# Fallback: Python regex
try:
compiled = re.compile(pattern)
matches: list[str] = []
skip_dirs = {".git", "__pycache__", "node_modules", ".venv", ".tox"}
for root, dirs, files in os.walk(resolved):
dirs[:] = [d for d in dirs if d not in skip_dirs]
for fname in files:
if include and not fnmatch.fnmatch(fname, include):
continue
fpath = os.path.join(root, fname)
if project_root:
proj_norm = os.path.normpath(project_root.replace("/", os.sep))
try:
display_path = os.path.relpath(fpath, proj_norm)
except ValueError:
display_path = fpath
else:
display_path = fpath
try:
with open(fpath, encoding="utf-8", errors="ignore") as f:
for i, line in enumerate(f, 1):
stripped = line.rstrip()
if compiled.search(stripped):
if hashline:
h = compute_line_hash(stripped)
matches.append(f"{display_path}:{i}:{h}|{stripped}")
else:
matches.append(f"{display_path}:{i}:{stripped[:MAX_LINE_LENGTH]}")
if len(matches) >= SEARCH_RESULT_LIMIT:
return "\n".join(matches) + "\n... (truncated)"
except (OSError, UnicodeDecodeError):
continue
return "\n".join(matches) if matches else "No matches found."
except re.error as e:
return f"Error: Invalid regex: {e}"
if consecutive == 3:
result += (
f"\n\n[Warning: this exact search has run {consecutive} times consecutively. "
"Results have not changed — use what you have instead of re-searching.]"
)
return result
@mcp.tool()
def hashline_edit(
@@ -203,106 +203,100 @@ def register_tools(mcp: FastMCP) -> None:
except Exception as e:
return f"Error writing file: {e}"
@mcp.tool()
def list_files(
path: str = ".",
recursive: bool = False,
data_dir: str = "",
) -> str:
"""List directory contents with type indicators.
Directories have a / suffix. Hidden files and common build directories
are skipped.
Args:
path: Directory path (default: data_dir).
recursive: List recursively (default: false).
data_dir: Auto-injected - the session's data directory.
"""
try:
resolved = _resolve_path(path, data_dir)
except ValueError as e:
return f"Error: {e}"
if not os.path.isdir(resolved):
return f"Error: Directory not found: {path}"
try:
skip = {".git", "__pycache__", "node_modules", ".venv", ".tox"}
entries: list[str] = []
if recursive:
for root, dirs, files in os.walk(resolved):
dirs[:] = sorted(d for d in dirs if d not in skip and not d.startswith("."))
rel_root = os.path.relpath(root, resolved)
if rel_root == ".":
rel_root = ""
for f in sorted(files):
if f.startswith("."):
continue
entries.append(os.path.join(rel_root, f) if rel_root else f)
if len(entries) >= 500:
entries.append("... (truncated at 500 entries)")
return "\n".join(entries)
else:
for entry in sorted(os.listdir(resolved)):
if entry.startswith(".") or entry in skip:
continue
full = os.path.join(resolved, entry)
suffix = "/" if os.path.isdir(full) else ""
entries.append(f"{entry}{suffix}")
return "\n".join(entries) if entries else "(empty directory)"
except Exception as e:
return f"Error listing directory: {e}"
@mcp.tool()
def search_files(
pattern: str,
target: str = "content",
path: str = ".",
file_glob: str = "",
limit: int = 50,
offset: int = 0,
output_mode: str = "content",
context: int = 0,
data_dir: str = "",
agent_id: str = "",
) -> str:
"""Search file contents using regex.
"""Search file contents or find files by name. Use this instead of grep, find, or ls.
Results sorted by file with line numbers. Searches within
the session's data directory or ~/.hive/.
Sandboxed to the session's data directory and ~/.hive/.
Args:
pattern: Regex pattern to search for.
path: Directory path to search (default: data_dir).
data_dir: Auto-injected - the session's data directory.
Two modes:
target='content' (default): Regex search inside files.
target='files': Find files by glob pattern (e.g. '*.py'). Results
sorted by modification time (newest first) also use this instead of ls.
See file_ops.search_files for the full parameter contract.
"""
import re
from aden_tools.file_ops import (
_do_search_content_target,
_do_search_files_target,
_SEARCH_TRACKER,
_SEARCH_TRACKER_LOCK,
)
# Legacy aliases
if target == "grep":
target = "content"
elif target in ("find", "ls"):
target = "files"
if target not in ("content", "files"):
return f"Error: invalid target '{target}'. Use 'content' or 'files'."
if output_mode not in ("content", "files_only", "count"):
return f"Error: invalid output_mode '{output_mode}'."
try:
resolved = _resolve_path(path, data_dir)
except ValueError as e:
return f"Error: {e}"
if not os.path.isdir(resolved):
return f"Error: Directory not found: {path}"
# Anti-loop guard scoped per agent_id (or shared bucket if absent).
bucket = agent_id or "_default"
key = (target, pattern, str(path), file_glob, int(limit), int(offset), output_mode, int(context))
with _SEARCH_TRACKER_LOCK:
td = _SEARCH_TRACKER.setdefault(bucket, {"last_key": None, "consecutive": 0})
if td["last_key"] == key:
td["consecutive"] += 1
else:
td["last_key"] = key
td["consecutive"] = 1
consecutive = td["consecutive"]
if consecutive >= 4:
return (
f"BLOCKED: this exact search has run {consecutive} times in a row. "
"Results have NOT changed. Use the information you already have and proceed."
)
try:
compiled = re.compile(pattern)
matches: list[str] = []
skip_dirs = {".git", "__pycache__", "node_modules", ".venv"}
# display_root: relativize against the data_dir (or the search root) so
# output paths read naturally inside the agent's workspace.
display_root = data_dir or resolved
for root, dirs, files in os.walk(resolved):
dirs[:] = [d for d in dirs if d not in skip_dirs]
for fname in files:
fpath = os.path.join(root, fname)
display_path = os.path.relpath(fpath, resolved)
try:
with open(fpath, encoding="utf-8", errors="ignore") as f:
for i, line in enumerate(f, 1):
stripped = line.rstrip()
if compiled.search(stripped):
matches.append(f"{display_path}:{i}:{stripped[:2000]}")
if len(matches) >= 100:
return "\n".join(matches) + "\n... (truncated)"
except (OSError, UnicodeDecodeError):
continue
if target == "files":
result = _do_search_files_target(
pattern=pattern,
resolved=resolved,
display_root=display_root,
limit=limit,
offset=offset,
)
else:
if not os.path.isdir(resolved) and not os.path.isfile(resolved):
return f"Error: Path not found: {path}"
result = _do_search_content_target(
pattern=pattern,
resolved=resolved,
project_root=display_root,
file_glob=file_glob,
limit=limit,
offset=offset,
output_mode=output_mode,
context=context,
hashline=False,
)
return "\n".join(matches) if matches else "No matches found."
except re.error as e:
return f"Error: Invalid regex: {e}"
if consecutive == 3:
result += (
f"\n\n[Warning: this exact search has run {consecutive} times consecutively. "
"Results have not changed — use what you have instead of re-searching.]"
)
return result
@@ -1,3 +1,13 @@
"""Agent-sandboxed search_files registration.
This toolkit historically registered a separate ``list_dir`` tool that
returned ``{name, type, size_bytes}`` dicts. It has been folded into
``search_files`` one tool covers grep, find, and ls. We keep this
module as the registration site for the agent-sandboxed variant so
toolkits scoped via ``get_sandboxed_path(path, agent_id)`` continue to
expose file search through the same canonical name.
"""
import os
from mcp.server.fastmcp import FastMCP
@@ -6,51 +16,108 @@ from ..security import get_sandboxed_path
def register_tools(mcp: FastMCP) -> None:
"""Register directory listing tools with the MCP server."""
"""Register the agent-sandboxed search_files tool with the MCP server."""
@mcp.tool()
def list_dir(path: str, agent_id: str) -> dict:
"""
Purpose
List the contents of a directory within the agent sandbox.
def search_files(
pattern: str = "*",
target: str = "files",
path: str = ".",
file_glob: str = "",
limit: int = 50,
offset: int = 0,
output_mode: str = "content",
context: int = 0,
agent_id: str = "",
) -> str:
"""Search file contents or find files by name within the agent sandbox.
When to use
Explore directory structure and contents
Discover available files and subdirectories
Verify file existence before reading or writing
Use this instead of grep, find, or ls.
Rules & Constraints
Path must point to an existing directory
Returns file names, types, and sizes
Does not recurse into subdirectories
target='files' (default here): list/find files by glob mtime-sorted.
target='content': regex search inside files.
Args:
path: The directory path (relative to agent sandbox)
agent_id: The ID of the agent
Returns:
Dict with directory contents and metadata, or error dict
pattern: Glob (files mode) or regex (content mode). Defaults to ``*``
so a bare call lists every file in the sandbox.
target: 'files' (default) or 'content'. Legacy aliases: 'grep'/'find'/'ls'.
path: Directory or file relative to the agent sandbox.
file_glob: Restrict content search to files matching this glob.
limit: Max results (default 50).
offset: Pagination offset (default 0).
output_mode: Content-mode output 'content' | 'files_only' | 'count'.
context: Lines of surrounding context for content matches.
agent_id: Auto-injected sandbox owner.
"""
from aden_tools.file_ops import (
_do_search_content_target,
_do_search_files_target,
_SEARCH_TRACKER,
_SEARCH_TRACKER_LOCK,
)
if target == "grep":
target = "content"
elif target in ("find", "ls"):
target = "files"
if target not in ("content", "files"):
return f"Error: invalid target '{target}'. Use 'content' or 'files'."
if output_mode not in ("content", "files_only", "count"):
return f"Error: invalid output_mode '{output_mode}'."
try:
secure_path = get_sandboxed_path(path, agent_id)
if not os.path.exists(secure_path):
return {"error": f"Path not found: {path}"}
if not os.path.isdir(secure_path):
return {"error": f"Path is not a directory: {path}"}
items = os.listdir(secure_path)
entries = []
for item in items:
full_path = os.path.join(secure_path, item)
is_dir = os.path.isdir(full_path)
entry = {
"name": item,
"type": "directory" if is_dir else "file",
"size_bytes": os.path.getsize(full_path) if not is_dir else None,
}
entries.append(entry)
return {"success": True, "path": path, "entries": entries, "total_count": len(entries)}
resolved = get_sandboxed_path(path, agent_id)
except Exception as e:
return {"error": f"Failed to list directory: {str(e)}"}
return f"Error: {e}"
if not os.path.exists(resolved):
return f"Error: Path not found: {path}"
bucket = agent_id or "_default"
key = (target, pattern, str(path), file_glob, int(limit), int(offset), output_mode, int(context))
with _SEARCH_TRACKER_LOCK:
td = _SEARCH_TRACKER.setdefault(bucket, {"last_key": None, "consecutive": 0})
if td["last_key"] == key:
td["consecutive"] += 1
else:
td["last_key"] = key
td["consecutive"] = 1
consecutive = td["consecutive"]
if consecutive >= 4:
return (
f"BLOCKED: this exact search has run {consecutive} times in a row. "
"Results have NOT changed. Use the information you already have and proceed."
)
# Display paths relative to the sandbox root, not the resolved absolute.
try:
sandbox_root = get_sandboxed_path(".", agent_id)
except Exception:
sandbox_root = resolved
if target == "files":
result = _do_search_files_target(
pattern=pattern,
resolved=resolved,
display_root=sandbox_root,
limit=limit,
offset=offset,
)
else:
result = _do_search_content_target(
pattern=pattern,
resolved=resolved,
project_root=sandbox_root,
file_glob=file_glob,
limit=limit,
offset=offset,
output_mode=output_mode,
context=context,
hashline=False,
)
if consecutive == 3:
result += (
f"\n\n[Warning: this exact search has run {consecutive} times consecutively. "
"Results have not changed — use what you have instead of re-searching.]"
)
return result
+1 -1
View File
@@ -7,7 +7,7 @@ Provides file I/O capabilities so GCU subagents can read spillover files
Adapted from coder_tools_server.py for the GCU context:
- No project root restriction (accepts absolute paths)
- No git snapshots
- Focused on read_file, list_directory, search_files
- Focused on read_file, search_files (search_files = grep+find+ls)
"""
from fastmcp import FastMCP
+34 -54
View File
@@ -86,76 +86,56 @@ def mock_secure_path(tmp_path):
yield
class TestListDirTool:
"""Tests for list_dir tool."""
class TestSandboxedSearchFiles:
"""Tests for the agent-sandboxed search_files registration (formerly list_dir)."""
@pytest.fixture
def list_dir_fn(self, mcp):
def search_files_fn(self, mcp):
from aden_tools.tools.file_system_toolkits.list_dir import register_tools
register_tools(mcp)
return mcp._tool_manager._tools["list_dir"].fn
return mcp._tool_manager._tools["search_files"].fn
def test_list_directory(self, list_dir_fn, mock_workspace, mock_secure_path, tmp_path):
"""Listing a directory returns all entries."""
# Create test files and directories
def test_files_mode_lists_entries(self, search_files_fn, mock_workspace, mock_secure_path, tmp_path):
"""target='files' returns every file in the sandbox, one per line."""
(tmp_path / "file1.txt").write_text("content", encoding="utf-8")
(tmp_path / "file2.txt").write_text("content", encoding="utf-8")
(tmp_path / "subdir").mkdir()
(tmp_path / "subdir" / "nested.txt").write_text("x", encoding="utf-8")
result = list_dir_fn(path=".", **mock_workspace)
result = search_files_fn(pattern="*", target="files", path=".", **mock_workspace)
assert result["success"] is True
assert result["total_count"] == 3
assert len(result["entries"]) == 3
assert "file1.txt" in result
assert "file2.txt" in result
# rg --files / os.walk return files only, so subdir itself isn't listed,
# but its contents are.
assert "nested.txt" in result
# Check that entries have correct structure
for entry in result["entries"]:
assert "name" in entry
assert "type" in entry
assert entry["type"] in ["file", "directory"]
def test_files_mode_glob_filter(self, search_files_fn, mock_workspace, mock_secure_path, tmp_path):
"""target='files' with a glob restricts the listing."""
(tmp_path / "a.py").write_text("x", encoding="utf-8")
(tmp_path / "b.txt").write_text("x", encoding="utf-8")
def test_list_empty_directory(self, list_dir_fn, mock_workspace, mock_secure_path, tmp_path):
"""Listing an empty directory returns empty list."""
empty_dir = tmp_path / "empty"
empty_dir.mkdir()
result = search_files_fn(pattern="*.py", target="files", path=".", **mock_workspace)
assert "a.py" in result
assert "b.txt" not in result
result = list_dir_fn(path="empty", **mock_workspace)
def test_nonexistent_path_returns_error_string(self, search_files_fn, mock_workspace, mock_secure_path):
"""Missing path returns an Error: string, not a dict."""
result = search_files_fn(pattern="*", target="files", path="nonexistent_dir", **mock_workspace)
assert isinstance(result, str)
assert "Error" in result
assert "not found" in result.lower()
assert result["success"] is True
assert result["total_count"] == 0
assert result["entries"] == []
def test_content_mode_finds_matches(self, search_files_fn, mock_workspace, mock_secure_path, tmp_path):
"""target='content' searches inside files and returns rel-path matches."""
(tmp_path / "hello.txt").write_text("needle here\n", encoding="utf-8")
(tmp_path / "other.txt").write_text("nothing\n", encoding="utf-8")
def test_list_nonexistent_directory(self, list_dir_fn, mock_workspace, mock_secure_path):
"""Listing a non-existent directory returns error."""
result = list_dir_fn(path="nonexistent_dir", **mock_workspace)
assert "error" in result
assert "not found" in result["error"].lower()
def test_list_directory_with_file_sizes(self, list_dir_fn, mock_workspace, mock_secure_path, tmp_path):
"""Listing a directory returns file sizes for files."""
(tmp_path / "small.txt").write_text("hi", encoding="utf-8")
(tmp_path / "larger.txt").write_text("hello world", encoding="utf-8")
(tmp_path / "subdir").mkdir()
result = list_dir_fn(path=".", **mock_workspace)
assert result["success"] is True
# Find entries by name
entries_by_name = {e["name"]: e for e in result["entries"]}
# Files should have size_bytes
assert entries_by_name["small.txt"]["type"] == "file"
assert entries_by_name["small.txt"]["size_bytes"] == 2
assert entries_by_name["larger.txt"]["type"] == "file"
assert entries_by_name["larger.txt"]["size_bytes"] == 11
# Directories should have None for size_bytes
assert entries_by_name["subdir"]["type"] == "directory"
assert entries_by_name["subdir"]["size_bytes"] is None
result = search_files_fn(pattern="needle", target="content", path=".", **mock_workspace)
assert "hello.txt" in result
assert "needle" in result
assert "other.txt" not in result
class TestReplaceFileContentTool: