diff --git a/core/framework/agents/queen/queen_tools_defaults.py b/core/framework/agents/queen/queen_tools_defaults.py index 50664601..dbc832b5 100644 --- a/core/framework/agents/queen/queen_tools_defaults.py +++ b/core/framework/agents/queen/queen_tools_defaults.py @@ -36,32 +36,20 @@ logger = logging.getLogger(__name__) # the named entries only). _TOOL_CATEGORIES: dict[str, list[str]] = { - # Read-only file operations — safe baseline for every knowledge queen. - # search_files is unified: covers content grep AND directory listing - # via target='content' / target='files'. - "file_read": [ - "read_file", - "search_files", - "pdf_read", + # Unified file ops — read, write, edit, search across the post-refactor + # files-tools MCP server (read_file, write_file, edit_file, hashline_edit, + # apply_patch, search_files). + "file_ops": [ + "@server:files-tools", ], - # File mutation — only personas that author or edit artifacts. - "file_write": [ - "write_file", - "edit_file", - "hashline_edit", - ], - # Shell + process control — engineering personas only. - # Includes the legacy coder-tools commands (run_command, bash_*) and - # the full terminal-tools MCP server (foreground exec with auto-promotion, - # background jobs, persistent PTY sessions, ripgrep/find). - "shell": [ - "execute_command_tool", - "bash_kill", - "bash_output", + # Terminal + process control — engineering personas only. + # The terminal-tools MCP server covers foreground exec with auto-promotion, + # background jobs, persistent PTY sessions, and ripgrep/find search. + "terminal": [ "@server:terminal-tools", ], # Tabular data. CSV/Excel read/write + DuckDB SQL. - "data": [ + "advanced_spreadsheet": [ "csv_read", "csv_info", "csv_write", @@ -75,19 +63,60 @@ _TOOL_CATEGORIES: dict[str, list[str]] = { "excel_sheet_list", "excel_sql", ], - # Browser automation — every tool from the gcu-tools MCP server. - "browser": ["@server:gcu-tools"], + # Browser lifecycle + read-only inspection (navigation, snapshots, query). + # Split out from interaction so personas that only need to *observe* pages + # (e.g. research, status checks) don't pull in click/type/drag/etc. + "browser_basic": [ + "browser_setup", + "browser_status", + "browser_start", + "browser_stop", + "browser_tabs", + "browser_open", + "browser_close", + "browser_activate_tab", + "browser_close_all", + "browser_close_finished", + "browser_navigate", + "browser_go_back", + "browser_go_forward", + "browser_reload", + "browser_screenshot", + "browser_snapshot", + "browser_html", + "browser_console", + "browser_evaluate", + "browser_get_text", + "browser_get_attribute", + "browser_get_rect", + "browser_shadow_query", + ], + # Browser interaction — anything that mutates page state (clicks, typing, + # drag, scrolling, dialogs, file uploads). Pair with browser_basic for + # full automation; omit for read-only personas. + "browser_interaction": [ + "browser_click", + "browser_click_coordinate", + "browser_type", + "browser_fill", + "browser_type_focused", + "browser_press", + "browser_press_at", + "browser_hover", + "browser_hover_coordinate", + "browser_select", + "browser_scroll", + "browser_drag", + "browser_wait", + "browser_resize", + "browser_upload", + "browser_dialog", + ], # Lightweight context helpers — good default for every queen. "time_context": [ "get_current_time", "get_account_info", - ], - # Agent-management tools — building/validating/checking agents. - "agent_mgmt": [ - "save_agent_draft", - "confirm_and_build", - "enqueue_task", - ], + ] } @@ -107,77 +136,71 @@ _TOOL_CATEGORIES: dict[str, list[str]] = { QUEEN_DEFAULT_CATEGORIES: dict[str, list[str]] = { # Head of Technology — builds and operates systems; full toolkit. "queen_technology": [ - "file_read", - "file_write", - "shell", - "data", - "browser", + "file_ops", + "terminal", + "browser_basic", + "browser_interaction", "research", "security", "time_context", - "agent_mgmt", ], - # Head of Growth — data, experiments, competitor research; no shell/security. + # Head of Growth — data, experiments, competitor research; no terminal/security. "queen_growth": [ - "file_read", - "file_write", - "data", - "browser", + "file_ops", + "browser_basic", + "browser_interaction", "research", "time_context", ], - # Head of Product Strategy — user research + roadmaps; no shell/security. + # Head of Product Strategy — user research + roadmaps; no terminal/security. "queen_product_strategy": [ - "file_read", - "file_write", - "data", - "browser", + "file_ops", + "browser_basic", + "browser_interaction", "research", "time_context", ], # Head of Finance — financial models (CSV/Excel heavy), market research. "queen_finance_fundraising": [ - "file_read", - "file_write", - "data", - "browser", + "file_ops", + "advanced_spreadsheet", + "browser_basic", + "browser_interaction", "research", "time_context", ], - # Head of Legal — reads contracts/PDFs, researches; no shell/data/security. + # Head of Legal — reads contracts/PDFs, researches; no terminal/data/security. "queen_legal": [ - "file_read", - "file_write", - "browser", + "file_ops", + "browser_basic", + "browser_interaction", "research", "time_context", ], - # Head of Brand & Design — visual refs, style guides; no shell/data/security. + # Head of Brand & Design — visual refs, style guides; no terminal/data/security. "queen_brand_design": [ - "file_read", - "file_write", - "browser", + "file_ops", + "browser_basic", + "browser_interaction", "research", "time_context", ], # Head of Talent — candidate pipelines, resumes; data + browser heavy. "queen_talent": [ - "file_read", - "file_write", - "data", - "browser", + "file_ops", + "browser_basic", + "browser_interaction", "research", "time_context", ], # Head of Operations — processes, automation, observability. "queen_operations": [ - "file_read", - "file_write", + "file_ops", "data", - "browser", + "browser_basic", + "browser_interaction", "research", "time_context", - "agent_mgmt", ], } diff --git a/core/framework/server/routes_execution.py b/core/framework/server/routes_execution.py index 9e667f00..406c7256 100644 --- a/core/framework/server/routes_execution.py +++ b/core/framework/server/routes_execution.py @@ -44,7 +44,6 @@ _WORKER_INHERITED_TOOLS: frozenset[str] = frozenset( "edit_file", "hashline_edit", "search_files", - "undo_changes", # Shell "run_command", # Framework synthetics (always available to any AgentLoop node) diff --git a/core/framework/server/tests/test_queen_tools.py b/core/framework/server/tests/test_queen_tools.py index 3cb1ed61..bed3e39b 100644 --- a/core/framework/server/tests/test_queen_tools.py +++ b/core/framework/server/tests/test_queen_tools.py @@ -187,13 +187,16 @@ async def test_get_tools_applies_role_default(queen_dir, monkeypatch): _, queen_id = queen_dir # queen_technology — has a role default manager = _FakeManager() - # Seed a catalog covering tools the role default references so the - # response reflects what the queen would actually see on boot. + # Seed two MCP servers: files-tools is referenced by the technology + # role via the @server:files-tools shorthand in `file_ops`, so its + # tools should bubble into the default. unrelated-server is NOT + # referenced by any role category — its tools must NOT leak in. manager._mcp_tool_catalog = { "files-tools": [ {"name": "read_file", "description": "", "input_schema": {}}, - {"name": "port_scan", "description": "", "input_schema": {}}, # security - {"name": "excel_read", "description": "", "input_schema": {}}, # data + {"name": "edit_file", "description": "", "input_schema": {}}, + ], + "unrelated-server": [ {"name": "fluffy_unknown_tool", "description": "", "input_schema": {}}, ], } @@ -204,14 +207,13 @@ async def test_get_tools_applies_role_default(queen_dir, monkeypatch): assert resp.status == 200 body = await resp.json() - # queen_technology's role default includes file_read, data, security, etc. assert body["is_role_default"] is True enabled = set(body["enabled_mcp_tools"] or []) + # @server:files-tools shorthand pulls in every tool under that server. assert "read_file" in enabled - assert "port_scan" in enabled # technology role includes security - assert "excel_read" in enabled - # Tools not in any category (and not in a @server: expansion target - # the role references) are NOT part of the default. + assert "edit_file" in enabled + # Tools registered under a server the role doesn't reference are NOT + # part of the default. assert "fluffy_unknown_tool" not in enabled @@ -220,16 +222,16 @@ def test_resolve_queen_default_tools_expands_server_shorthand(): from framework.agents.queen.queen_tools_defaults import resolve_queen_default_tools catalog = { - "gcu-tools": [ - {"name": "browser_navigate"}, - {"name": "browser_click"}, + "files-tools": [ + {"name": "read_file"}, + {"name": "write_file"}, ], } - # queen_brand_design uses "browser" category → expands via @server:gcu-tools. + # queen_brand_design uses "file_ops" category → expands via @server:files-tools. result = resolve_queen_default_tools("queen_brand_design", catalog) assert result is not None - assert "browser_navigate" in result - assert "browser_click" in result + assert "read_file" in result + assert "write_file" in result def test_resolve_queen_default_tools_unknown_queen_returns_none(): diff --git a/tools/files_server.py b/tools/files_server.py index 7869b6fa..a0c63562 100644 --- a/tools/files_server.py +++ b/tools/files_server.py @@ -2,9 +2,12 @@ """ File Tools MCP Server -Minimal FastMCP server exposing 5 file tools (read_file, write_file, edit_file, -hashline_edit, search_files) with no path sandboxing. ``search_files`` is +Minimal FastMCP server exposing 4 file tools (read_file, write_file, +search_files, edit_file) with no path sandboxing. ``search_files`` is unified — covers grep, find, and ls via target='content' / target='files'. +``edit_file`` is unified — covers single-file fuzzy find/replace +(mode='replace') and multi-file structured patches with two-phase apply +(mode='patch'). Usage: # Run with STDIO transport (for agent integration) @@ -82,7 +85,7 @@ def main() -> None: args = parser.parse_args() if not args.stdio: - logger.info("Registered 5 file tools: read_file, write_file, edit_file, hashline_edit, search_files") + logger.info("Registered 4 file tools: read_file, write_file, search_files, edit_file") if args.stdio: mcp.run(transport="stdio") diff --git a/tools/src/aden_tools/file_ops.py b/tools/src/aden_tools/file_ops.py index a01e4a9f..ffa30450 100644 --- a/tools/src/aden_tools/file_ops.py +++ b/tools/src/aden_tools/file_ops.py @@ -1,11 +1,13 @@ """ Shared file operation tools for MCP servers. -Provides 5 tools (read_file, write_file, edit_file, hashline_edit, -search_files) plus supporting helpers. ``search_files`` is unified — -it covers both content grep (``target='content'``) and file listing +Provides 4 tools (read_file, write_file, search_files, edit_file) +plus supporting helpers. ``search_files`` is unified — it covers both +content grep (``target='content'``) and file listing (``target='files'``), replacing the older ``list_directory`` tool and -the LLM's choice between grep/find/ls. +the LLM's choice between grep/find/ls. ``edit_file`` is unified — it +covers single-file fuzzy find/replace (``mode='replace'``) and +multi-file structured edits with two-phase apply (``mode='patch'``). Used by files_server.py (the MCP entry point that exposes these tools to the queen and any other agent loading the files-tools server). @@ -23,30 +25,21 @@ from __future__ import annotations import contextlib import difflib import fnmatch -import json import os import re import subprocess import sys -import tempfile import threading as _threading from collections.abc import Callable +from dataclasses import dataclass, field from pathlib import Path +from typing import Annotated from fastmcp import FastMCP +from pydantic import Field from aden_tools.file_state_cache import Freshness, check_fresh, record_read -from aden_tools.hashline import ( - HASHLINE_MAX_FILE_BYTES, - compute_line_hash, - format_hashlines, - maybe_strip, - parse_anchor, - strip_boundary_echo, - strip_content_prefixes, - strip_insert_echo, - validate_anchor, -) +from aden_tools.hashline import compute_line_hash # ── Constants ───────────────────────────────────────────────────────────── @@ -599,11 +592,40 @@ def _similarity(a: str, b: str) -> float: return 1.0 - _levenshtein(a, b) / maxlen +# Unicode normalization map for fuzzy matching. LLMs frequently emit +# typographic variants (smart quotes, em-dashes, ellipsis, NBSP) when +# the source file uses ASCII — or vice versa. +_UNICODE_NORMALIZATIONS = ( + ("‘", "'"), # left single quote + ("’", "'"), # right single quote + ("“", '"'), # left double quote + ("”", '"'), # right double quote + ("—", "--"), # em-dash + ("–", "-"), # en-dash + ("…", "..."), # ellipsis + (" ", " "), # NBSP +) + + +def _unicode_normalize(s: str) -> str: + for src, dst in _UNICODE_NORMALIZATIONS: + s = s.replace(src, dst) + return s + + def _fuzzy_find_candidates(content: str, old_text: str): - """Yield candidate substrings from content that match old_text, - using a cascade of increasingly fuzzy strategies. + """Yield candidate substrings from content that match old_text. + + Strategies are ordered as a safety gradient: strict and zero-false- + positive first, similarity-based last. Callers stop at the first + yielded candidate that they can act on, so an exact match never + falls through to a heuristic match. + + Order: exact → line-trimmed → whitespace-normalized → + indentation-flexible → escape-normalized → trimmed-boundary → + unicode-normalized → block-anchor → context-aware. """ - # Strategy 1: Exact match + # 1. Exact match if old_text in content: yield old_text @@ -617,13 +639,76 @@ def _fuzzy_find_candidates(content: str, old_text: str): n_search = len(search_lines) - # Strategy 2: Line-trimmed match + # 2. Line-trimmed match for i in range(len(content_lines) - n_search + 1): window = content_lines[i : i + n_search] if all(cl.strip() == sl.strip() for cl, sl in zip(window, search_lines, strict=True)): yield "\n".join(window) - # Strategy 3: Block-anchor match (first/last line as anchors, fuzzy middle) + # 3. Whitespace-normalized match (collapse runs of whitespace) + normalized_search = re.sub(r"\s+", " ", old_text).strip() + for i in range(len(content_lines) - n_search + 1): + window = content_lines[i : i + n_search] + normalized_block = re.sub(r"\s+", " ", "\n".join(window)).strip() + if normalized_block == normalized_search: + yield "\n".join(window) + + # 4. Indentation-flexible match (strip common leading indent) + def _strip_indent(lines): + non_empty = [ln for ln in lines if ln.strip()] + if not non_empty: + return "\n".join(lines) + min_indent = min(len(ln) - len(ln.lstrip()) for ln in non_empty) + return "\n".join(ln[min_indent:] for ln in lines) + + stripped_search = _strip_indent(search_lines) + for i in range(len(content_lines) - n_search + 1): + block = content_lines[i : i + n_search] + if _strip_indent(block) == stripped_search: + yield "\n".join(block) + + # 5. Escape-normalized match — agents sometimes paste literal "\n", + # "\t", "\r" sequences instead of actual control chars. + if "\\n" in old_text or "\\t" in old_text or "\\r" in old_text: + unescaped = old_text.replace("\\n", "\n").replace("\\t", "\t").replace("\\r", "\r") + if unescaped != old_text and unescaped in content: + yield unescaped + + # 6. Trimmed-boundary match (only outer whitespace differs) + trimmed = old_text.strip() + if trimmed != old_text and trimmed in content: + yield trimmed + + # 7. Unicode-normalized match (smart quotes, em/en-dashes, ellipsis, + # NBSP). Walk the original content recovering the substring whose + # normalization equals the normalized search term — replacement + # happens in original space so length deltas don't corrupt the file. + norm_search = _unicode_normalize(old_text) + if norm_search != old_text: + norm_content = _unicode_normalize(content) + if norm_search in norm_content: + # Build a per-original-char index → normalized-position map. + pos_map = [] + np = 0 + for ch in content: + pos_map.append(np) + np += len(_unicode_normalize(ch)) + pos_map.append(np) + target = norm_content.find(norm_search) + if target >= 0: + target_end = target + len(norm_search) + # Locate boundaries in original space. + try: + orig_start = pos_map.index(target) + orig_end = pos_map.index(target_end, orig_start) + yield content[orig_start:orig_end] + except ValueError: + pass + + # 8. Block-anchor match — first and last lines match exactly (after + # trim), middle is allowed to drift if similarity is high enough. + # Thresholds (0.50 / 0.70) are deliberately tight; older 0.10/0.30 + # values silently matched unrelated blocks. if n_search >= 3: first_trimmed = search_lines[0].strip() last_trimmed = search_lines[-1].strip() @@ -639,35 +724,19 @@ def _fuzzy_find_candidates(content: str, old_text: str): candidates.append((sim, "\n".join(block))) if candidates: candidates.sort(key=lambda x: x[0], reverse=True) - if candidates[0][0] > 0.3: + threshold = 0.50 if len(candidates) == 1 else 0.70 + if candidates[0][0] >= threshold: yield candidates[0][1] - # Strategy 4: Whitespace-normalized match - normalized_search = re.sub(r"\s+", " ", old_text).strip() - for i in range(len(content_lines) - n_search + 1): - window = content_lines[i : i + n_search] - normalized_block = re.sub(r"\s+", " ", "\n".join(window)).strip() - if normalized_block == normalized_search: - yield "\n".join(window) - - # Strategy 5: Indentation-flexible match - def _strip_indent(lines): - non_empty = [ln for ln in lines if ln.strip()] - if not non_empty: - return "\n".join(lines) - min_indent = min(len(ln) - len(ln.lstrip()) for ln in non_empty) - return "\n".join(ln[min_indent:] for ln in lines) - - stripped_search = _strip_indent(search_lines) - for i in range(len(content_lines) - n_search + 1): - block = content_lines[i : i + n_search] - if _strip_indent(block) == stripped_search: - yield "\n".join(block) - - # Strategy 6: Trimmed-boundary match - trimmed = old_text.strip() - if trimmed != old_text and trimmed in content: - yield trimmed + # 9. Context-aware match — last resort. Per-line similarity with + # 50% threshold per line for heavily mangled but recognizable blocks. + if n_search >= 2: + for i in range(len(content_lines) - n_search + 1): + window = content_lines[i : i + n_search] + sims = [_similarity(cl.strip(), sl.strip()) for cl, sl in zip(window, search_lines, strict=True)] + if sims and min(sims) >= 0.50 and (sum(sims) / len(sims)) >= 0.65: + yield "\n".join(window) + break def _compute_diff(old: str, new: str, path: str) -> str: @@ -681,6 +750,690 @@ def _compute_diff(old: str, new: str, path: str) -> str: return result +# ── Multi-file structured patch (V4A) ───────────────────────────────────── +# +# A small, lenient parser for the structured patch format. The grammar: +# +# *** Begin Patch (optional) +# *** Update File: path +# @@ optional context @@ (optional, per hunk) +# context line ← space prefix +# -removed line +# +added line +# *** Add File: path +# +line +# +line +# *** Delete File: path +# *** Move File: src -> dst +# *** End Patch (optional) +# +# Two-phase apply: every operation is simulated against an in-memory +# copy of the touched files first; only when every op validates do we +# actually write to disk. This gives multi-file edits all-or-nothing +# semantics without needing a copy-aside / journal. + +_BEGIN_RE = re.compile(r"^\*\*\*\s*Begin\s+Patch\s*$") +_END_RE = re.compile(r"^\*\*\*\s*End\s+Patch\s*$") +_OP_RE = re.compile(r"^\*\*\*\s+(Update|Add|Delete|Move)\s+File:\s*(.+)$") +_HUNK_HINT_RE = re.compile(r"^@@\s*(.*?)\s*@@\s*$") + + +@dataclass +class _Hunk: + context_hint: str | None + lines: list[tuple[str, str]] # (prefix, content), prefix in {' ', '-', '+'} + + +@dataclass +class _PatchOp: + kind: str # "add" | "update" | "delete" | "move" + path: str + dest: str | None = None # only for "move" + hunks: list[_Hunk] = field(default_factory=list) # only for "update" + add_content: str = "" # only for "add" + + +def _is_op_marker(line: str) -> bool: + return bool(_OP_RE.match(line)) + + +def _parse_v4a(text: str) -> tuple[list[_PatchOp], str | None]: + """Parse V4A patch text into a list of operations. + + Returns ``(operations, error)``. On failure ``operations`` is empty + and ``error`` describes why parsing stopped. Lenient about markers: + ``Begin``/``End`` are optional, lines starting with ``\\`` (e.g. the + ``\\ No newline at end of file`` git artifact) are skipped, and a + line inside a hunk that lacks a ``+``/``-``/space prefix is treated + as an implicit context line (a common LLM mistake). + """ + lines = text.splitlines() + i = 0 + # Skip until Begin marker or first op marker. + while i < len(lines): + if _BEGIN_RE.match(lines[i].strip()): + i += 1 + break + if _is_op_marker(lines[i]): + break + i += 1 + + ops: list[_PatchOp] = [] + while i < len(lines): + raw = lines[i] + stripped = raw.strip() + if _END_RE.match(stripped): + break + m = _OP_RE.match(raw) + if not m: + i += 1 + continue + kind_word, rest = m.group(1), m.group(2).strip() + i += 1 + if kind_word == "Update": + hunks: list[_Hunk] = [] + while i < len(lines): + if _is_op_marker(lines[i]) or _END_RE.match(lines[i].strip()): + break + hunk, i = _parse_hunk(lines, i) + if hunk is None: + break + hunks.append(hunk) + if not hunks: + return [], f"Update {rest}: no hunks parsed" + ops.append(_PatchOp(kind="update", path=rest, hunks=hunks)) + elif kind_word == "Add": + content_lines: list[str] = [] + while i < len(lines): + if _is_op_marker(lines[i]) or _END_RE.match(lines[i].strip()): + break + if lines[i].startswith("+"): + content_lines.append(lines[i][1:]) + i += 1 + ops.append(_PatchOp(kind="add", path=rest, add_content="\n".join(content_lines))) + elif kind_word == "Delete": + ops.append(_PatchOp(kind="delete", path=rest)) + elif kind_word == "Move": + mv = re.match(r"^(.+?)\s*->\s*(.+)$", rest) + if not mv: + return [], f"Move requires 'src -> dst', got: {rest}" + ops.append(_PatchOp(kind="move", path=mv.group(1).strip(), dest=mv.group(2).strip())) + + if not ops: + return [], "patch text contained no operations" + + errors: list[str] = [] + for op in ops: + if not op.path: + errors.append(f"{op.kind}: empty path") + if op.kind == "move" and not op.dest: + errors.append(f"move {op.path}: missing destination") + if errors: + return [], "; ".join(errors) + return ops, None + + +def _parse_hunk(lines: list[str], start_idx: int) -> tuple[_Hunk | None, int]: + """Parse one hunk. Returns ``(hunk_or_none, next_idx)``.""" + i = start_idx + if i >= len(lines): + return None, i + context_hint: str | None = None + m = _HUNK_HINT_RE.match(lines[i]) + if m: + context_hint = m.group(1).strip() or None + i += 1 + hunk_lines: list[tuple[str, str]] = [] + started = False + while i < len(lines): + line = lines[i] + if _is_op_marker(line) or _END_RE.match(line.strip()) or _HUNK_HINT_RE.match(line): + break + if line.startswith("\\"): + # Git-diff artifact like "\ No newline at end of file" — skip + i += 1 + continue + if line.startswith((" ", "-", "+")): + hunk_lines.append((line[0], line[1:])) + started = True + i += 1 + continue + if started: + # Implicit context line — common LLM mistake of forgetting + # the leading space. Treat it as context, not hunk-end. + hunk_lines.append((" ", line)) + i += 1 + continue + # Blank prelude before hunk content — stop trying. + break + if not hunk_lines: + return None, i + return _Hunk(context_hint=context_hint, lines=hunk_lines), i + + +def _apply_hunk(content: str, hunk: _Hunk) -> tuple[str, str | None]: + """Apply one hunk to ``content``. Returns ``(new_content, error)``.""" + search_lines = [c for p, c in hunk.lines if p in (" ", "-")] + replace_lines = [c for p, c in hunk.lines if p in (" ", "+")] + + # Pure addition (no - or context lines, only +). Insert at hint or + # append to EOF if the hint is missing or unique. + if not search_lines and replace_lines: + addition = "\n".join(replace_lines) + if hunk.context_hint: + count = content.count(hunk.context_hint) + if count > 1: + return content, ( + f"addition-only hunk: context hint " + f"'{hunk.context_hint}' is ambiguous ({count} occurrences)" + ) + if count == 1: + idx = content.find(hunk.context_hint) + line_end = content.find("\n", idx) + if line_end < 0: + new = content + "\n" + addition + else: + new = content[: line_end + 1] + addition + "\n" + content[line_end + 1 :] + return new, None + # No hint, or hint not found: append to EOF. + if content and not content.endswith("\n"): + content += "\n" + return content + addition + "\n", None + + search = "\n".join(search_lines) + replace = "\n".join(replace_lines) + if not search: + return content, "hunk has neither context nor removed lines" + + # Try fuzzy match in the full content first. + matched: str | None = None + for candidate in _fuzzy_find_candidates(content, search): + first = content.find(candidate) + if first < 0: + continue + last = content.rfind(candidate) + if first == last: + matched = candidate + break + + if matched is None and hunk.context_hint: + # Asymmetric window around the hint. Hunks usually appear after + # their identifying function/class signature, so we look further + # forward than backward. + hint_pos = content.find(hunk.context_hint) + if hint_pos >= 0: + wstart = max(0, hint_pos - 500) + wend = min(len(content), hint_pos + 2000) + window = content[wstart:wend] + for candidate in _fuzzy_find_candidates(window, search): + first = window.find(candidate) + if first < 0: + continue + last = window.rfind(candidate) + if first == last: + new_window = window.replace(candidate, replace, 1) + return content[:wstart] + new_window + content[wend:], None + + if matched is None: + return content, "could not find a unique match for hunk" + + return content.replace(matched, replace, 1), None + + +def _apply_v4a( + ops: list[_PatchOp], + policy, + before_write: Callable[[], None] | None, +) -> tuple[str | None, str | None]: + """Two-phase apply for a V4A operation list. + + Phase 1 simulates every op against an in-memory copy of the touched + files. If anything fails, returns an error and writes nothing. + Phase 2 commits the simulated state to disk; a failure there is + annotated as potentially-partial and points the agent at git diff. + """ + fs_state: dict[str, str] = {} + fs_exists: dict[str, bool] = {} # True = should exist post-apply, False = deleted + original_existed: dict[str, bool] = {} + + def _ensure_loaded(resolved: str) -> str | None: + if resolved in fs_state: + return None + if not os.path.isfile(resolved): + return f"file not found: {resolved}" + try: + with open(resolved, encoding="utf-8") as f: + fs_state[resolved] = f.read() + fs_exists[resolved] = True + original_existed[resolved] = True + except Exception as e: + return f"failed to read: {e}" + return None + + errors: list[str] = [] + for op_idx, op in enumerate(ops): + try: + resolved = policy.write_path(op.path) + except ValueError as e: + errors.append(f"Op #{op_idx + 1} {op.kind} {op.path}: {e}") + continue + + if op.kind == "add": + if os.path.exists(resolved) and fs_exists.get(resolved, True): + errors.append(f"Op #{op_idx + 1} add {op.path}: file already exists") + continue + content = op.add_content + if content and not content.endswith("\n"): + content += "\n" + fs_state[resolved] = content + fs_exists[resolved] = True + original_existed.setdefault(resolved, os.path.exists(resolved)) + + elif op.kind == "delete": + err = _ensure_loaded(resolved) + if err: + errors.append(f"Op #{op_idx + 1} delete {op.path}: {err}") + continue + fs_exists[resolved] = False + + elif op.kind == "update": + err = _ensure_loaded(resolved) + if err: + errors.append(f"Op #{op_idx + 1} update {op.path}: {err}") + continue + content = fs_state[resolved] + for hunk_idx, hunk in enumerate(op.hunks): + new_content, herr = _apply_hunk(content, hunk) + if herr: + errors.append( + f"Op #{op_idx + 1} update {op.path} hunk #{hunk_idx + 1}: {herr}" + ) + break + content = new_content + fs_state[resolved] = content + + elif op.kind == "move": + try: + dst_resolved = policy.write_path(op.dest or "") + except ValueError as e: + errors.append(f"Op #{op_idx + 1} move {op.path}: {e}") + continue + err = _ensure_loaded(resolved) + if err: + errors.append(f"Op #{op_idx + 1} move {op.path}: {err}") + continue + if os.path.exists(dst_resolved) and fs_exists.get(dst_resolved, True): + errors.append( + f"Op #{op_idx + 1} move {op.path}: destination already exists" + ) + continue + fs_state[dst_resolved] = fs_state[resolved] + fs_exists[dst_resolved] = True + fs_exists[resolved] = False + original_existed.setdefault(dst_resolved, os.path.exists(dst_resolved)) + + if errors: + return None, "Patch validation failed (no files were modified):\n " + "\n ".join(errors) + + # Phase 2: commit + files_modified: list[str] = [] + files_created: list[str] = [] + files_deleted: list[str] = [] + diffs: list[str] = [] + apply_errors: list[str] = [] + + for resolved, will_exist in fs_exists.items(): + try: + existed = original_existed.get(resolved, os.path.isfile(resolved)) + if will_exist: + new_content = fs_state[resolved] + old_content = "" + if existed: + try: + with open(resolved, encoding="utf-8") as f: + old_content = f.read() + except Exception: + old_content = "" + if before_write: + before_write() + Path(resolved).parent.mkdir(parents=True, exist_ok=True) + with open(resolved, "w", encoding="utf-8") as f: + f.write(new_content) + try: + record_read(None, resolved, content_bytes=new_content.encode("utf-8")) + except Exception: + pass + if existed: + files_modified.append(resolved) + diff = _compute_diff(old_content, new_content, resolved) + if diff: + diffs.append(diff) + else: + files_created.append(resolved) + else: + if before_write: + before_write() + if os.path.isfile(resolved): + os.unlink(resolved) + files_deleted.append(resolved) + except Exception as e: + apply_errors.append(f"{resolved}: {e}") + + if apply_errors: + return None, ( + "Apply phase failed (state may be inconsistent — run `git diff` to assess):\n " + + "\n ".join(apply_errors) + ) + + summary_parts: list[str] = [] + if files_modified: + summary_parts.append(f"Modified {len(files_modified)} file(s): {', '.join(files_modified)}") + if files_created: + summary_parts.append(f"Created {len(files_created)} file(s): {', '.join(files_created)}") + if files_deleted: + summary_parts.append(f"Deleted {len(files_deleted)} file(s): {', '.join(files_deleted)}") + summary = "\n".join(summary_parts) or "Patch applied (no files changed)" + if diffs: + summary += "\n\n" + "\n\n".join(diffs) + return summary, None + + +# ── Patch tool implementations ──────────────────────────────────────────── +# +# The two modes of the unified ``patch`` tool live as module-level +# functions so they're easy to test in isolation. Both honor the same +# ``policy`` (path resolution + deny lists) and ``before_write`` hook. + + +def _patch_replace( + policy, + before_write: Callable[[], None] | None, + path: str, + old_string: str, + new_string: str, + replace_all: bool, +) -> str: + """Single-file fuzzy find-and-replace (the ``mode='replace'`` path).""" + if not path: + return "Error: replace mode requires a non-empty 'path'" + if not old_string: + return "Error: 'old_string' must not be empty" + if old_string == new_string: + return "Error: 'old_string' and 'new_string' are identical — nothing to do" + try: + resolved = policy.write_path(path) + except ValueError as e: + return f"Error: {e}" + if not os.path.isfile(resolved): + return f"Error: File not found: {path}" + + # Stale-edit guard: refuse unless a recent read is on record and the + # file on disk still matches it. Prevents the model from overwriting + # changes the user made between calling read_file and patch. + _fresh = check_fresh(None, resolved) + if _fresh.status is Freshness.UNREAD: + return ( + f"Refusing to edit '{path}': call read_file('{path}') first so the " + f"harness can track its state before you edit it." + ) + if _fresh.status is Freshness.STALE: + return ( + f"Refusing to edit '{path}': {_fresh.detail}. Re-read the file with " + f"read_file before editing." + ) + + try: + with open(resolved, encoding="utf-8") as f: + content = f.read() + + if before_write: + before_write() + + strategies = [ + "exact", + "line-trimmed", + "whitespace-normalized", + "indentation-flexible", + "escape-normalized", + "trimmed-boundary", + "unicode-normalized", + "block-anchor", + "context-aware", + ] + matched: str | None = None + strategy_used: str | None = None + for i, candidate in enumerate(_fuzzy_find_candidates(content, old_string)): + idx = content.find(candidate) + if idx < 0: + continue + if replace_all: + matched = candidate + strategy_used = strategies[min(i, len(strategies) - 1)] + break + last_idx = content.rfind(candidate) + if idx == last_idx: + matched = candidate + strategy_used = strategies[min(i, len(strategies) - 1)] + break + + if matched is None: + close = difflib.get_close_matches( + old_string[:200], content.split("\n"), n=3, cutoff=0.4 + ) + msg = ( + f"Error: Could not find a unique match for old_string in {path}. " + f"Use read_file to verify the current content, or search_files " + f"to locate the text." + ) + if close: + suggestions = "\n".join(f" {line}" for line in close) + msg += f"\n\nDid you mean one of these lines?\n{suggestions}" + return msg + + if replace_all: + count = content.count(matched) + new_content = content.replace(matched, new_string) + else: + count = 1 + new_content = content.replace(matched, new_string, 1) + + with open(resolved, "w", encoding="utf-8") as f: + f.write(new_content) + + try: + record_read(None, resolved, content_bytes=new_content.encode("utf-8")) + except Exception: + pass + + diff = _compute_diff(content, new_content, path) + match_info = f" (matched via {strategy_used})" if strategy_used != "exact" else "" + result = f"Replaced {count} occurrence(s) in {path}{match_info}" + if diff: + result += f"\n\n{diff}" + return result + except Exception as e: + return f"Error editing file: {e}" + + +def _patch_apply( + policy, + before_write: Callable[[], None] | None, + patch_text: str, +) -> str: + """Multi-file structured patch (the ``mode='patch'`` path).""" + if not patch_text: + return "Error: patch mode requires a non-empty 'patch_text'" + + ops, parse_error = _parse_v4a(patch_text) + if parse_error: + return f"Error: {parse_error}" + if not ops: + return "Error: patch text contained no operations" + + summary, apply_error = _apply_v4a(ops, policy, before_write) + if apply_error: + return f"Error: {apply_error}" + return summary or "Patch applied" + + +# ── Tool prompts ────────────────────────────────────────────────────────── +# +# Each tool's top-level description and per-parameter descriptions live in a +# co-located block here. The factory below references these constants from +# its tool registrations — descriptions never live inline in the function +# signatures. Module-level placement is required: ``from __future__ import +# annotations`` makes ``Annotated[..., Field(description=...)]`` resolve +# against module globals, not the factory's locals. + +# ── read_file prompts ──────────────────────────────────────────── + +READ_FILE_DOC = ( + "Read file contents with line numbers. Use this instead of `cat`. " + "Binary files are detected and rejected. Large files are auto-truncated " + "at 2000 lines or 50KB — use offset/limit to paginate. Reading a " + "directory returns its entries (use search_files for proper find/ls)." +) +READ_FILE_PARAMS = { + "path": ( + "File path to read. Relative paths anchor to the agent's home; " + "absolute paths used verbatim. Credential paths (~/.ssh, ~/.aws, " + "etc.) are denied." + ), + "offset": "Starting line number, 1-indexed. Default 1.", + "limit": "Max lines to return. 0 means up to 2000. Default 0.", + "hashline": ( + "If True, return lines in N:hhhh|content format with content-hash " + "anchors. Line truncation is disabled in this mode to preserve " + "hash integrity. Default False." + ), +} + +# ── write_file prompts ─────────────────────────────────────────── + +WRITE_FILE_DOC = ( + "Create or overwrite a file with the given content. Parent directories " + "are created automatically. Use this instead of `cat > file` or shell " + "redirects. For targeted edits in an existing file, prefer edit_file " + "(this tool overwrites the whole file). Existing files require a recent " + "read_file call first; brand-new files don't." +) +WRITE_FILE_PARAMS = { + "path": ( + "File path to write. Relative paths anchor to the agent's home; " + "absolute paths used verbatim. System and credential paths are denied." + ), + "content": "Complete file content to write.", +} + +# ── edit_file prompts ──────────────────────────────────────────── + +EDIT_FILE_DOC = ( + "Edit files: one string in one file (replace mode), or many edits " + "across many files (patch mode). Use this instead of `sed`, `awk`, or " + "shell redirects. Returns a unified diff. If old_string doesn't match " + "in replace mode, re-read the file with read_file or use search_files " + "to locate the exact text — don't retry blindly." +) +EDIT_FILE_PARAMS = { + "mode": ( + "Edit mode. 'replace' (default) for single-file find-and-replace. " + "'patch' for multi-file structured patches that can " + "Update/Add/Delete/Move files atomically." + ), + "path": ( + "Replace mode only. File path to edit. Relative paths anchor to " + "the agent's home; absolute paths used verbatim. Ignored in patch " + "mode (paths live inside patch_text)." + ), + "old_string": ( + "Replace mode only. Text to find. Must be unique in the file " + "unless replace_all=True; include surrounding context to " + "disambiguate. Fuzzy matching tolerates whitespace/indent drift, " + "tabs vs spaces, smart quotes vs ASCII, and literal \\n/\\t/\\r " + "vs real control chars." + ), + "new_string": ( + "Replace mode only. Replacement text. Pass an empty string to " + "delete the matched text." + ), + "replace_all": ( + "Replace mode only. Replace every occurrence instead of requiring " + "a unique match. Default False." + ), + "patch_text": ( + "Patch mode only. Structured patch body. File paths are embedded " + "inside the body via '*** Update File: ' / " + "'*** Add File: ' / '*** Delete File: ' / " + "'*** Move File: -> ' markers, so one call can touch " + "many files. Hunks use unified-diff syntax: lines starting with " + "' ' (space) are context, '-' lines are removed, '+' lines are " + "added. Optional '@@ hint @@' before a hunk narrows fuzzy " + "matching to a window around the hint. If any operation fails " + "validation, no files are written. Example:\n" + "*** Begin Patch\n" + "*** Update File: a.py\n" + "@@ def hello @@\n" + " def hello():\n" + "- return 1\n" + "+ return 42\n" + "*** Add File: new.py\n" + "+x = 1\n" + "*** Delete File: old.py\n" + "*** Move File: src.py -> dst.py\n" + "*** End Patch" + ), +} + +# ── search_files prompts ───────────────────────────────────────── + +SEARCH_FILES_DOC = ( + "Search file contents OR find files by name. Use this instead of " + "grep, find, or ls. target='content' (default) regex-greps inside " + "files; target='files' globs file names (mtime-sorted, newest first). " + "Pagination via limit/offset; truncated responses include a hint with " + "the next offset. Repeating the same exact query consecutively is " + "warned at 3 calls and blocked at 4 — use the results you already have." +) +SEARCH_FILES_PARAMS = { + "pattern": ( + "Regex (content mode) or glob (files mode, e.g. '*.py'). For an " + "'ls'-style listing pass '*' or '*.'." + ), + "target": ( + "'content' to grep inside files, 'files' to list/find files. " + "Legacy aliases: 'grep' -> 'content', 'find'/'ls' -> 'files'. " + "Default 'content'." + ), + "path": ( + "Directory (or, in content mode, a single file) to search. " + "Default '.'." + ), + "file_glob": ( + "Restrict content search to filenames matching this glob. " + "Ignored in files mode (use the 'pattern' argument instead)." + ), + "limit": "Max results to return. Default 50.", + "offset": "Skip first N results for pagination. Default 0.", + "output_mode": ( + "Content-mode output shape: 'content' (lines + line numbers, " + "default), 'files_only' (paths only), 'count' (per-file match " + "counts)." + ), + "context": ( + "Lines of context before and after each match (content mode " + "only). Default 0." + ), + "hashline": ( + "Content mode: include N:hhhh hash anchors in matched lines. " + "Default False." + ), + "task_id": ( + "Optional anti-loop scope key. Defaults to a shared bucket; pass " + "a per-task id when multiple agents share a process." + ), +} + + # ── Factory ─────────────────────────────────────────────────────────────── @@ -718,23 +1471,13 @@ def register_file_tools( """ policy = _FilePolicy(home=home, write_safe_root=write_safe_root) - @mcp.tool() - def read_file(path: str, offset: int = 1, limit: int = 0, hashline: bool = False) -> str: - """Read file contents with line numbers and byte-budget truncation. - - Binary files are detected and rejected. Large files are automatically - truncated at 2000 lines or 50KB. Use offset and limit to paginate. - - Set hashline=True to get N:hhhh|content format with content-hash - anchors for use with hashline_edit. Line truncation is disabled in - hashline mode to preserve hash integrity. - - Args: - path: Absolute file path to read. - offset: Starting line number, 1-indexed (default: 1). - limit: Max lines to return, 0 = up to 2000 (default: 0). - hashline: If True, return N:hhhh|content anchors (default: False). - """ + @mcp.tool(description=READ_FILE_DOC) + def read_file( + path: Annotated[str, Field(description=READ_FILE_PARAMS["path"])], + offset: Annotated[int, Field(description=READ_FILE_PARAMS["offset"])] = 1, + limit: Annotated[int, Field(description=READ_FILE_PARAMS["limit"])] = 0, + hashline: Annotated[bool, Field(description=READ_FILE_PARAMS["hashline"])] = False, + ) -> str: try: resolved = policy.read_path(path) except ValueError as e: @@ -809,18 +1552,11 @@ def register_file_tools( except Exception as e: return f"Error reading file: {e}" - @mcp.tool() - def write_file(path: str, content: str) -> str: - """Create or overwrite a file with the given content. - - Automatically creates parent directories. - - Args: - path: Relative paths anchor to the agent's home directory; - absolute paths are used verbatim. System and credential - paths are denied. - content: Complete file content to write. - """ + @mcp.tool(description=WRITE_FILE_DOC) + def write_file( + path: Annotated[str, Field(description=WRITE_FILE_PARAMS["path"])], + content: Annotated[str, Field(description=WRITE_FILE_PARAMS["content"])], + ) -> str: try: resolved = policy.write_path(path) except ValueError as e: @@ -879,223 +1615,41 @@ def register_file_tools( except Exception as e: return f"Error writing file: {e}" - @mcp.tool() - def edit_file(path: str, old_text: str, new_text: str, replace_all: bool = False) -> str: - """Replace text in a file using a fuzzy-match cascade. - - Tries exact match first, then falls back through increasingly fuzzy - strategies: line-trimmed, block-anchor, whitespace-normalized, - indentation-flexible, and trimmed-boundary matching. - - Args: - path: Relative paths anchor to home; absolute paths are used - verbatim. System and credential paths are denied. - old_text: Text to find (fuzzy matching applied if exact fails). - new_text: Replacement text. - replace_all: Replace all occurrences (default: first only). - """ - try: - resolved = policy.write_path(path) - except ValueError as e: - return f"Error: {e}" - if not os.path.isfile(resolved): - return f"Error: File not found: {path}" - - # Stale-edit guard: refuse unless a recent read is on record and - # the file on disk still matches it. Prevents the model from - # overwriting changes the user made in their editor between - # calling read_file and edit_file. - _fresh = check_fresh(None, resolved) - if _fresh.status is Freshness.UNREAD: - return ( - f"Refusing to edit '{path}': call read_file('{path}') " - f"first so the harness can track its state before you " - f"edit it." - ) - if _fresh.status is Freshness.STALE: - return f"Refusing to edit '{path}': {_fresh.detail}. Re-read the file with read_file before editing." - - try: - with open(resolved, encoding="utf-8") as f: - content = f.read() - - if before_write: - before_write() - - matched_text = None - strategy_used = None - strategies = [ - "exact", - "line-trimmed", - "block-anchor", - "whitespace-normalized", - "indentation-flexible", - "trimmed-boundary", - ] - - for i, candidate in enumerate(_fuzzy_find_candidates(content, old_text)): - idx = content.find(candidate) - if idx == -1: - continue - - if replace_all: - matched_text = candidate - strategy_used = strategies[min(i, len(strategies) - 1)] - break - - last_idx = content.rfind(candidate) - if idx == last_idx: - matched_text = candidate - strategy_used = strategies[min(i, len(strategies) - 1)] - break - - if matched_text is None: - close = difflib.get_close_matches(old_text[:200], content.split("\n"), n=3, cutoff=0.4) - msg = f"Error: Could not find a unique match for old_text in {path}." - if close: - suggestions = "\n".join(f" {line}" for line in close) - msg += f"\n\nDid you mean one of these lines?\n{suggestions}" - return msg - - if replace_all: - count = content.count(matched_text) - new_content = content.replace(matched_text, new_text) - else: - count = 1 - new_content = content.replace(matched_text, new_text, 1) - - with open(resolved, "w", encoding="utf-8") as f: - f.write(new_content) - - # Re-record post-write state so a second edit in the same - # turn doesn't trip its own stale guard. - try: - record_read(None, resolved, content_bytes=new_content.encode("utf-8")) - except Exception: - pass - - diff = _compute_diff(content, new_content, path) - match_info = f" (matched via {strategy_used})" if strategy_used != "exact" else "" - result = f"Replaced {count} occurrence(s) in {path}{match_info}" - if diff: - result += f"\n\n{diff}" - return result - except Exception as e: - return f"Error editing file: {e}" - - @mcp.tool() - def apply_patch(path: str, patch_text: str) -> str: - """Apply a diff-match-patch text to a file. - - Use this for batched, context-aware edits where you already have a - ``diff_match_patch``-format patch in hand. For single string edits - prefer ``edit_file``; for line-anchored multi-edit batches prefer - ``hashline_edit``. - - Args: - path: Relative paths anchor to home; absolute paths are used - verbatim. System and credential paths are denied. - patch_text: The diff-match-patch text produced by - ``dmp.patch_toText(dmp.patch_make(...))``. - """ - try: - import diff_match_patch as dmp_module - except ImportError: - return "Error: diff_match_patch is not installed" - - try: - resolved = policy.write_path(path) - except ValueError as e: - return f"Error: {e}" - if not os.path.isfile(resolved): - return f"Error: File not found: {path}" - - # Stale-edit guard mirrors edit_file/hashline_edit so the model - # can't patch over content it has never seen. - _fresh = check_fresh(None, resolved) - if _fresh.status is Freshness.UNREAD: - return ( - f"Error: Refusing to patch '{path}': call read_file('{path}') " - "first so the harness can track its state before you patch it." - ) - if _fresh.status is Freshness.STALE: - return f"Error: Refusing to patch '{path}': {_fresh.detail}. Re-read the file before patching." - - try: - with open(resolved, encoding="utf-8") as f: - content = f.read() - dmp = dmp_module.diff_match_patch() - patches = dmp.patch_fromText(patch_text) - if not patches: - return "Error: patch_text produced no patches" - - new_content, results = dmp.patch_apply(patches, content) - applied = sum(1 for r in results if r) - failed = len(results) - applied - - if failed: - return f"Error: applied {applied}/{len(results)} patches; {failed} failed. File not modified." - - if before_write: - try: - before_write() - except Exception: - pass - - with open(resolved, "w", encoding="utf-8") as f: - f.write(new_content) - f.flush() - os.fsync(f.fileno()) - - try: - record_read(None, resolved, content_bytes=new_content.encode("utf-8")) - except Exception: - pass - - return f"Applied {applied} patch(es) to {path}" - except Exception as e: - return f"Error applying patch: {e}" - - @mcp.tool() - def search_files( - pattern: str, - target: str = "content", - path: str = ".", - file_glob: str = "", - limit: int = 50, - offset: int = 0, - output_mode: str = "content", - context: int = 0, - hashline: bool = False, - task_id: str = "", + @mcp.tool(description=EDIT_FILE_DOC) + def edit_file( + mode: Annotated[str, Field(description=EDIT_FILE_PARAMS["mode"])] = "replace", + path: Annotated[str, Field(description=EDIT_FILE_PARAMS["path"])] = "", + old_string: Annotated[str, Field(description=EDIT_FILE_PARAMS["old_string"])] = "", + new_string: Annotated[str, Field(description=EDIT_FILE_PARAMS["new_string"])] = "", + replace_all: Annotated[bool, Field(description=EDIT_FILE_PARAMS["replace_all"])] = False, + patch_text: Annotated[str, Field(description=EDIT_FILE_PARAMS["patch_text"])] = "", ) -> str: - """Search file contents or find files by name. Use this instead of grep, find, or ls. + if mode == "replace": + return _patch_replace( + policy, + before_write, + path, + old_string, + new_string, + replace_all, + ) + if mode == "patch": + return _patch_apply(policy, before_write, patch_text) + return f"Error: unknown mode '{mode}'. Use mode='replace' or mode='patch'." - Two modes: - target='content' (default): Regex search inside files. Output modes: - 'content' (lines+numbers, default), 'files_only' (paths only), 'count' (per-file counts). - target='files': Find files by glob pattern (e.g. '*.py', '*config*'). - Also use this instead of ls — results sorted by modification time (newest first). - - Pagination: limit/offset both apply; the response includes a hint with the - next offset when truncated. The same query repeated back-to-back is warned - at the 3rd call and blocked at the 4th — use the results you already have. - - Args: - pattern: Regex (content mode) or glob (files mode, e.g. '*.py'). For - an "ls"-style listing pass '*' or '*.'. - target: 'content' to grep inside files, 'files' to list/find files. - Legacy aliases: 'grep' -> 'content', 'find'/'ls' -> 'files'. - path: Directory (or, in content mode, a single file) to search. - file_glob: Restrict content search to filenames matching this glob. - Ignored in files mode (use ``pattern``). - limit: Max results to return (default 50). - offset: Skip first N results for pagination (default 0). - output_mode: Content-mode output shape — 'content' | 'files_only' | 'count'. - context: Lines of context before and after each match (content mode only). - hashline: Content mode: include N:hhhh hash anchors for hashline_edit. - task_id: Optional anti-loop scope key (defaults to a shared bucket). - """ + @mcp.tool(description=SEARCH_FILES_DOC) + def search_files( + pattern: Annotated[str, Field(description=SEARCH_FILES_PARAMS["pattern"])], + target: Annotated[str, Field(description=SEARCH_FILES_PARAMS["target"])] = "content", + path: Annotated[str, Field(description=SEARCH_FILES_PARAMS["path"])] = ".", + file_glob: Annotated[str, Field(description=SEARCH_FILES_PARAMS["file_glob"])] = "", + limit: Annotated[int, Field(description=SEARCH_FILES_PARAMS["limit"])] = 50, + offset: Annotated[int, Field(description=SEARCH_FILES_PARAMS["offset"])] = 0, + output_mode: Annotated[str, Field(description=SEARCH_FILES_PARAMS["output_mode"])] = "content", + context: Annotated[int, Field(description=SEARCH_FILES_PARAMS["context"])] = 0, + hashline: Annotated[bool, Field(description=SEARCH_FILES_PARAMS["hashline"])] = False, + task_id: Annotated[str, Field(description=SEARCH_FILES_PARAMS["task_id"])] = "", + ) -> str: # Legacy aliases — keep older prompts working. if target in ("grep",): target = "content" @@ -1166,401 +1720,3 @@ def register_file_tools( ) return result - @mcp.tool() - def hashline_edit( - path: str, - edits: str, - auto_cleanup: bool = True, - encoding: str = "utf-8", - ) -> str: - """Edit a file using anchor-based line references (N:hash) for precise edits. - - After reading a file with read_file(hashline=True), use the anchors to make - targeted edits without reproducing exact file content. - - Anchors must match current file content (hash validation). All edits in a - batch are validated before any are applied (atomic). Overlapping line ranges - within a single call are rejected. - - Args: - path: Absolute file path to edit. - edits: JSON string containing a list of edit operations. Each op is a - dict with "op" key and operation-specific fields: - - set_line: anchor, content (single line replacement) - - replace_lines: start_anchor, end_anchor, content (multi-line) - - insert_after: anchor, content - - insert_before: anchor, content - - replace: old_content, new_content, allow_multiple - - append: content - auto_cleanup: Strip hashline prefixes and echoed context from edit - content (default: True). - encoding: File encoding (default: "utf-8"). - """ - # 1. Parse JSON - try: - edit_ops = json.loads(edits) - except (json.JSONDecodeError, TypeError) as e: - return f"Error: Invalid JSON in edits: {e}" - - if not isinstance(edit_ops, list): - return "Error: edits must be a JSON array of operations" - if not edit_ops: - return "Error: edits array is empty" - if len(edit_ops) > 100: - return "Error: Too many edits in one call (max 100). Split into multiple calls." - - # 2. Read file - try: - resolved = policy.write_path(path) - except ValueError as e: - return f"Error: {e}" - if not os.path.isfile(resolved): - return f"Error: File not found: {path}" - - # Stale-edit guard: require a prior read_file that still matches - # disk. hashline_edit already rehashes anchors, but anchor hashes - # only protect the exact lines touched - content drift around - # those lines (e.g. new imports the user added) would still slip - # through silently. This guard closes that gap. - _fresh = check_fresh(None, resolved) - if _fresh.status is Freshness.UNREAD: - return ( - f"Error: Refusing to edit '{path}': call read_file" - f"('{path}', hashline=True) first so the harness can " - f"track its state before you edit it." - ) - if _fresh.status is Freshness.STALE: - return ( - f"Error: Refusing to edit '{path}': {_fresh.detail}. " - f"Re-read the file with read_file(hashline=True) before " - f"editing." - ) - - try: - with open(resolved, "rb") as f: - raw_head = f.read(8192) - eol = "\r\n" if b"\r\n" in raw_head else "\n" - - with open(resolved, encoding=encoding) as f: - content = f.read() - except Exception as e: - return f"Error: Failed to read file: {e}" - - content_bytes = len(content.encode(encoding)) - if content_bytes > HASHLINE_MAX_FILE_BYTES: - return f"Error: File too large for hashline_edit ({content_bytes} bytes, max 10MB)" - - trailing_newline = content.endswith("\n") - lines = content.splitlines() - - # 3. Categorize and validate ops - splices = [] # (start_0idx, end_0idx, new_lines, op_index) - replaces = [] # (old_content, new_content, op_index, allow_multiple) - cleanup_actions: list[str] = [] - - for i, op in enumerate(edit_ops): - if not isinstance(op, dict): - return f"Error: Edit #{i + 1}: operation must be a dict" - - match op.get("op"): - case "set_line": - anchor = op.get("anchor", "") - err = validate_anchor(anchor, lines) - if err: - return f"Error: Edit #{i + 1} (set_line): {err}" - if "content" not in op: - return f"Error: Edit #{i + 1} (set_line): missing required field 'content'" - if not isinstance(op["content"], str): - return f"Error: Edit #{i + 1} (set_line): content must be a string" - if "\n" in op["content"] or "\r" in op["content"]: - return ( - f"Error: Edit #{i + 1} (set_line): content must be a single line. " - f"Use replace_lines for multi-line replacement." - ) - line_num, _ = parse_anchor(anchor) - idx = line_num - 1 - new_content = op["content"] - new_lines = [new_content] if new_content else [] - new_lines = maybe_strip( - new_lines, - strip_content_prefixes, - "prefix_strip", - auto_cleanup, - cleanup_actions, - ) - splices.append((idx, idx, new_lines, i)) - - case "replace_lines": - start_anchor = op.get("start_anchor", "") - end_anchor = op.get("end_anchor", "") - err = validate_anchor(start_anchor, lines) - if err: - return f"Error: Edit #{i + 1} (replace_lines start): {err}" - err = validate_anchor(end_anchor, lines) - if err: - return f"Error: Edit #{i + 1} (replace_lines end): {err}" - start_num, _ = parse_anchor(start_anchor) - end_num, _ = parse_anchor(end_anchor) - if start_num > end_num: - return f"Error: Edit #{i + 1} (replace_lines): start line {start_num} > end line {end_num}" - if "content" not in op: - return f"Error: Edit #{i + 1} (replace_lines): missing required field 'content'" - if not isinstance(op["content"], str): - return f"Error: Edit #{i + 1} (replace_lines): content must be a string" - new_content = op["content"] - new_lines = new_content.splitlines() if new_content else [] - new_lines = maybe_strip( - new_lines, - strip_content_prefixes, - "prefix_strip", - auto_cleanup, - cleanup_actions, - ) - new_lines = maybe_strip( - new_lines, - lambda nl, s=start_num, e=end_num: strip_boundary_echo(lines, s, e, nl), - "boundary_echo_strip", - auto_cleanup, - cleanup_actions, - ) - splices.append((start_num - 1, end_num - 1, new_lines, i)) - - case "insert_after": - anchor = op.get("anchor", "") - err = validate_anchor(anchor, lines) - if err: - return f"Error: Edit #{i + 1} (insert_after): {err}" - line_num, _ = parse_anchor(anchor) - idx = line_num - 1 - new_content = op.get("content", "") - if not isinstance(new_content, str): - return f"Error: Edit #{i + 1} (insert_after): content must be a string" - if not new_content: - return f"Error: Edit #{i + 1} (insert_after): content is empty" - new_lines = new_content.splitlines() - new_lines = maybe_strip( - new_lines, - strip_content_prefixes, - "prefix_strip", - auto_cleanup, - cleanup_actions, - ) - new_lines = maybe_strip( - new_lines, - lambda nl, _idx=idx: strip_insert_echo(lines[_idx], nl), - "insert_echo_strip", - auto_cleanup, - cleanup_actions, - ) - splices.append((idx + 1, idx, new_lines, i)) - - case "insert_before": - anchor = op.get("anchor", "") - err = validate_anchor(anchor, lines) - if err: - return f"Error: Edit #{i + 1} (insert_before): {err}" - line_num, _ = parse_anchor(anchor) - idx = line_num - 1 - new_content = op.get("content", "") - if not isinstance(new_content, str): - return f"Error: Edit #{i + 1} (insert_before): content must be a string" - if not new_content: - return f"Error: Edit #{i + 1} (insert_before): content is empty" - new_lines = new_content.splitlines() - new_lines = maybe_strip( - new_lines, - strip_content_prefixes, - "prefix_strip", - auto_cleanup, - cleanup_actions, - ) - new_lines = maybe_strip( - new_lines, - lambda nl, _idx=idx: strip_insert_echo(lines[_idx], nl, position="last"), - "insert_echo_strip", - auto_cleanup, - cleanup_actions, - ) - splices.append((idx, idx - 1, new_lines, i)) - - case "replace": - old_content = op.get("old_content") - new_content = op.get("new_content") - if old_content is None: - return f"Error: Edit #{i + 1} (replace): missing old_content" - if not isinstance(old_content, str): - return f"Error: Edit #{i + 1} (replace): old_content must be a string" - if not old_content: - return f"Error: Edit #{i + 1} (replace): old_content must not be empty" - if new_content is None: - return f"Error: Edit #{i + 1} (replace): missing new_content" - if not isinstance(new_content, str): - return f"Error: Edit #{i + 1} (replace): new_content must be a string" - allow_multiple = op.get("allow_multiple", False) - if not isinstance(allow_multiple, bool): - return f"Error: Edit #{i + 1} (replace): allow_multiple must be a boolean" - replaces.append((old_content, new_content, i, allow_multiple)) - - case "append": - new_content = op.get("content") - if new_content is None: - return f"Error: Edit #{i + 1} (append): missing content" - if not isinstance(new_content, str): - return f"Error: Edit #{i + 1} (append): content must be a string" - if not new_content: - return f"Error: Edit #{i + 1} (append): content must not be empty" - new_lines = new_content.splitlines() - new_lines = maybe_strip( - new_lines, - strip_content_prefixes, - "prefix_strip", - auto_cleanup, - cleanup_actions, - ) - insert_point = len(lines) - splices.append((insert_point, insert_point - 1, new_lines, i)) - - case unknown: - return f"Error: Edit #{i + 1}: unknown op '{unknown}'" - - # 4. Check for overlapping splice ranges - for j in range(len(splices)): - for k in range(j + 1, len(splices)): - s_a, e_a, _, idx_a = splices[j] - s_b, e_b, _, idx_b = splices[k] - is_insert_a = s_a > e_a - is_insert_b = s_b > e_b - - if is_insert_a and is_insert_b: - continue - if is_insert_a and not is_insert_b: - if s_b <= s_a <= e_b + 1: - return ( - f"Error: Overlapping edits: edit #{idx_a + 1} " - f"and edit #{idx_b + 1} affect overlapping line ranges" - ) - continue - if is_insert_b and not is_insert_a: - if s_a <= s_b <= e_a + 1: - return ( - f"Error: Overlapping edits: edit #{idx_a + 1} " - f"and edit #{idx_b + 1} affect overlapping line ranges" - ) - continue - if not (e_a < s_b or e_b < s_a): - return ( - f"Error: Overlapping edits: edit #{idx_a + 1} " - f"and edit #{idx_b + 1} affect overlapping line ranges" - ) - - # 5. Apply splices bottom-up - changes_made = 0 - working = list(lines) - for start, end, new_lines, _ in sorted(splices, key=lambda s: (s[0], s[3]), reverse=True): - if start > end: - changes_made += 1 - for k, nl in enumerate(new_lines): - working.insert(start + k, nl) - else: - old_slice = working[start : end + 1] - if old_slice != new_lines: - changes_made += 1 - working[start : end + 1] = new_lines - - # 6. Apply str_replace ops - joined = "\n".join(working) - replace_counts = [] - for old_content, new_content, op_idx, allow_multiple in replaces: - count = joined.count(old_content) - if count == 0: - return ( - f"Error: Edit #{op_idx + 1} (replace): " - f"old_content not found " - f"(note: anchor-based edits in this batch are applied first)" - ) - if count > 1 and not allow_multiple: - return ( - f"Error: Edit #{op_idx + 1} (replace): " - f"old_content found {count} times (must be unique). " - f"Include more surrounding context to make it unique, " - f"or use anchor-based ops instead." - ) - if allow_multiple: - joined = joined.replace(old_content, new_content) - replace_counts.append((op_idx, count)) - else: - joined = joined.replace(old_content, new_content, 1) - if count > 0 and old_content != new_content: - changes_made += 1 - - # 7. Restore trailing newline - if trailing_newline and joined and not joined.endswith("\n"): - joined += "\n" - - # 8. Restore original EOL style (only convert bare \n, not existing \r\n) - if eol == "\r\n": - joined = re.sub(r"(? preview_limit: - parts.append( - f"\n(Showing first {preview_limit} of {total_lines} lines. Use read_file with offset to see more.)" - ) - return "\n".join(parts) diff --git a/tools/tests/tools/test_file_ops.py b/tools/tests/tools/test_file_ops.py index cf611a37..55f35ebc 100644 --- a/tools/tests/tools/test_file_ops.py +++ b/tools/tests/tools/test_file_ops.py @@ -1,11 +1,9 @@ """Tests for aden_tools.file_ops — the unified file-tool surface. Covers the path policy (home anchoring, deny lists, write_safe_root), -plus the six file tools: read_file, write_file, edit_file, hashline_edit, -search_files, apply_patch. +plus the four file tools: read_file, write_file, search_files, patch. """ -import json import os from unittest.mock import patch @@ -261,70 +259,172 @@ class TestPathPolicyWriteSafeRoot: assert "Error" in result -class TestApplyPatchTool: - """apply_patch — diff_match_patch text → file.""" +class TestPatchToolReplaceMode: + """patch(mode='replace') — single-file fuzzy find/replace.""" - def test_apply_patch_modifies_file(self, file_ops_mcp, tmp_path): - """A valid patch applies and rewrites the file.""" - import diff_match_patch as dmp_module - - target = tmp_path / "patch_me.txt" + def test_replace_basic(self, file_ops_mcp, tmp_path): + """Exact match path: simple substitution.""" + target = tmp_path / "a.txt" target.write_text("Hello World", encoding="utf-8") - - dmp = dmp_module.diff_match_patch() - patches = dmp.patch_make("Hello World", "Hello Universe") - patch_text = dmp.patch_toText(patches) - - apply_fn = _get_tool_fn(file_ops_mcp, "apply_patch") - result = apply_fn(path="patch_me.txt", patch_text=patch_text) - - assert "Error" not in result - assert "Applied" in result + edit_fn = _get_tool_fn(file_ops_mcp, "edit_file") + result = edit_fn(mode="replace", path="a.txt", old_string="World", new_string="Universe") + assert "Replaced 1" in result assert target.read_text() == "Hello Universe" - def test_apply_patch_missing_file(self, file_ops_mcp): - """Patching a non-existent file returns an error string.""" - apply_fn = _get_tool_fn(file_ops_mcp, "apply_patch") - result = apply_fn(path="nope.txt", patch_text="garbage") + def test_replace_unicode_normalized(self, file_ops_mcp, tmp_path): + """Smart quotes in old_string match plain quotes on disk.""" + target = tmp_path / "b.py" + target.write_text('print("hi")\n', encoding="utf-8") + edit_fn = _get_tool_fn(file_ops_mcp, "edit_file") + # old_string uses smart quotes; file has straight quotes + result = edit_fn( + mode="replace", + path="b.py", + old_string='print(“hi”)', + new_string='print("HELLO")', + ) + assert "Error" not in result + assert target.read_text() == 'print("HELLO")\n' + + def test_replace_escape_normalized(self, file_ops_mcp, tmp_path): + """Literal '\\n' in old_string matches actual newline on disk.""" + target = tmp_path / "c.txt" + target.write_text("alpha\nbeta\n", encoding="utf-8") + edit_fn = _get_tool_fn(file_ops_mcp, "edit_file") + result = edit_fn( + mode="replace", + path="c.txt", + old_string="alpha\\nbeta", + new_string="X", + ) + assert "Error" not in result + assert target.read_text() == "X\n" + + def test_replace_missing_file(self, file_ops_mcp): + """Replacing in a non-existent file returns an error.""" + edit_fn = _get_tool_fn(file_ops_mcp, "edit_file") + result = edit_fn(mode="replace", path="nope.txt", old_string="x", new_string="y") assert "Error" in result assert "not found" in result.lower() - def test_apply_patch_garbage_text(self, file_ops_mcp, tmp_path): - """Patch text that produces no patches is rejected without writing.""" - target = tmp_path / "f.txt" - target.write_text("original", encoding="utf-8") - apply_fn = _get_tool_fn(file_ops_mcp, "apply_patch") - result = apply_fn(path="f.txt", patch_text="not a patch") + def test_replace_no_match(self, file_ops_mcp, tmp_path): + """Failure path includes a hint pointing at read_file/search_files.""" + target = tmp_path / "d.txt" + target.write_text("alpha\n", encoding="utf-8") + edit_fn = _get_tool_fn(file_ops_mcp, "edit_file") + result = edit_fn(mode="replace", path="d.txt", old_string="zzz", new_string="y") assert "Error" in result - assert target.read_text() == "original" + assert "read_file" in result or "search_files" in result - def test_apply_patch_write_denied_for_system_path(self, file_ops_mcp): - """The deny list applies to apply_patch just like write_file.""" - apply_fn = _get_tool_fn(file_ops_mcp, "apply_patch") - result = apply_fn(path="/etc/passwd", patch_text="x") + def test_replace_denied_for_system_path(self, file_ops_mcp): + """The deny list applies to replace mode.""" + edit_fn = _get_tool_fn(file_ops_mcp, "edit_file") + result = edit_fn(mode="replace", path="/etc/passwd", old_string="a", new_string="b") assert "Error" in result assert "denied" in result.lower() -class TestHashlineEditViaPolicy: - """hashline_edit honors the same path policy as the rest.""" +class TestPatchToolPatchMode: + """patch(mode='patch') — multi-file structured patch.""" - def test_hashline_edit_relative_path(self, file_ops_mcp, tmp_path): - """hashline_edit on a relative path lands in home.""" - from aden_tools.hashline import compute_line_hash + def test_patch_update_single_file(self, file_ops_mcp, tmp_path): + """A V4A Update hunk replaces matched lines and writes.""" + target = tmp_path / "u.py" + target.write_text("def f():\n return 1\n", encoding="utf-8") + body = ( + "*** Begin Patch\n" + "*** Update File: u.py\n" + " def f():\n" + "- return 1\n" + "+ return 42\n" + "*** End Patch\n" + ) + edit_fn = _get_tool_fn(file_ops_mcp, "edit_file") + result = edit_fn(mode="patch", patch_text=body) + assert "Error" not in result + assert "Modified" in result + assert target.read_text() == "def f():\n return 42\n" - target = tmp_path / "hl.txt" - target.write_text("aaa\nbbb\nccc\n", encoding="utf-8") + def test_patch_add_file(self, file_ops_mcp, tmp_path): + """Add File: creates a new file from + lines.""" + body = ( + "*** Begin Patch\n" + "*** Add File: new.py\n" + "+# new\n" + "+x = 1\n" + "*** End Patch\n" + ) + edit_fn = _get_tool_fn(file_ops_mcp, "edit_file") + result = edit_fn(mode="patch", patch_text=body) + assert "Error" not in result + assert "Created" in result + created = tmp_path / "new.py" + assert created.exists() + assert created.read_text() == "# new\nx = 1\n" - edits = json.dumps([{"op": "set_line", "anchor": f"2:{compute_line_hash('bbb')}", "content": "BBB"}]) - hashline_fn = _get_tool_fn(file_ops_mcp, "hashline_edit") - result = hashline_fn(path="hl.txt", edits=edits) - assert "Applied" in result - assert target.read_text() == "aaa\nBBB\nccc\n" + def test_patch_delete_file(self, file_ops_mcp, tmp_path): + """Delete File: removes the file.""" + target = tmp_path / "doomed.py" + target.write_text("bye\n", encoding="utf-8") + body = "*** Begin Patch\n*** Delete File: doomed.py\n*** End Patch\n" + edit_fn = _get_tool_fn(file_ops_mcp, "edit_file") + result = edit_fn(mode="patch", patch_text=body) + assert "Error" not in result + assert "Deleted" in result + assert not target.exists() - def test_hashline_edit_denied_for_system_path(self, file_ops_mcp): - """The deny list also covers hashline_edit.""" - hashline_fn = _get_tool_fn(file_ops_mcp, "hashline_edit") - result = hashline_fn(path="/etc/passwd", edits="[]") - # Either deny-list error or empty-edits error — both before the write. + def test_patch_move_file(self, file_ops_mcp, tmp_path): + """Move File: renames source to destination.""" + src = tmp_path / "src.py" + src.write_text("kept\n", encoding="utf-8") + body = "*** Begin Patch\n*** Move File: src.py -> dst.py\n*** End Patch\n" + edit_fn = _get_tool_fn(file_ops_mcp, "edit_file") + result = edit_fn(mode="patch", patch_text=body) + assert "Error" not in result + dst = tmp_path / "dst.py" + assert not src.exists() + assert dst.exists() + assert dst.read_text() == "kept\n" + + def test_patch_atomic_failure(self, file_ops_mcp, tmp_path): + """One failing op aborts the whole batch with no writes.""" + target = tmp_path / "real.py" + target.write_text("original\n", encoding="utf-8") + body = ( + "*** Begin Patch\n" + "*** Update File: real.py\n" + "-original\n" + "+changed\n" + "*** Update File: missing.py\n" + "-foo\n" + "+bar\n" + "*** End Patch\n" + ) + edit_fn = _get_tool_fn(file_ops_mcp, "edit_file") + result = edit_fn(mode="patch", patch_text=body) assert "Error" in result + # First file must be untouched — atomic semantics. + assert target.read_text() == "original\n" + + def test_patch_lenient_missing_markers(self, file_ops_mcp, tmp_path): + """Begin/End markers are optional.""" + target = tmp_path / "loose.py" + target.write_text("a\nb\nc\n", encoding="utf-8") + body = "*** Update File: loose.py\n-b\n+B\n" + edit_fn = _get_tool_fn(file_ops_mcp, "edit_file") + result = edit_fn(mode="patch", patch_text=body) + assert "Error" not in result + assert target.read_text() == "a\nB\nc\n" + + def test_patch_empty_body_rejected(self, file_ops_mcp): + """Empty patch_text returns an error before any work.""" + edit_fn = _get_tool_fn(file_ops_mcp, "edit_file") + result = edit_fn(mode="patch", patch_text="") + assert "Error" in result + + def test_patch_unknown_mode(self, file_ops_mcp): + """A typo in mode is surfaced as an error string.""" + edit_fn = _get_tool_fn(file_ops_mcp, "edit_file") + result = edit_fn(mode="bogus") + assert "Error" in result + assert "unknown mode" in result.lower() diff --git a/tools/tests/tools/test_file_ops_hashline.py b/tools/tests/tools/test_file_ops_hashline.py deleted file mode 100644 index db923fae..00000000 --- a/tools/tests/tools/test_file_ops_hashline.py +++ /dev/null @@ -1,563 +0,0 @@ -"""Tests for hashline support in file_ops (coder tools).""" - -import json -import os -import sys -from unittest.mock import patch - -import pytest -from fastmcp import FastMCP - -from aden_tools.hashline import compute_line_hash - - -@pytest.fixture(autouse=True) -def _bypass_stale_edit_guard(): - """These tests exercise edit logic directly without a prior read_file, - so the Gap 4 stale-edit guard would reject every call. Force - check_fresh to always return FRESH here; the cache itself is - covered by ``tools/tests/test_file_state_cache.py``. - """ - from aden_tools.file_state_cache import Freshness, FreshResult - - with patch( - "aden_tools.file_ops.check_fresh", - return_value=FreshResult(Freshness.FRESH), - ): - yield - - -def _anchor(line_num, line_text): - """Build an anchor string N:hhhh.""" - return f"{line_num}:{compute_line_hash(line_text)}" - - -@pytest.fixture -def tools(tmp_path): - """Register file_ops tools with tmp_path as project root.""" - from aden_tools.file_ops import register_file_tools - - mcp = FastMCP("test-server") - write_calls = [] - - def _before_write(): - write_calls.append(1) - - register_file_tools( - mcp, - home=str(tmp_path), - before_write=_before_write, - ) - tool_map = {name: t.fn for name, t in mcp._tool_manager._tools.items()} - return tool_map, write_calls - - -# ── read_file hashline ──────────────────────────────────────────────────── - - -class TestReadFileHashline: - def test_hashline_format(self, tools, tmp_path): - """hashline=True returns N:hhhh|content format.""" - read_file = tools[0]["read_file"] - (tmp_path / "f.txt").write_text("hello\nworld\n") - - result = read_file(path="f.txt", hashline=True) - lines = result.strip().split("\n") - # First two lines should be hashline formatted - h1 = compute_line_hash("hello") - h2 = compute_line_hash("world") - assert lines[0] == f"1:{h1}|hello" - assert lines[1] == f"2:{h2}|world" - - def test_hashline_false_unchanged(self, tools, tmp_path): - """Default (hashline=False) returns standard line-number format.""" - read_file = tools[0]["read_file"] - (tmp_path / "f.txt").write_text("hello\n") - - result = read_file(path="f.txt", hashline=False) - # Standard format uses tab-separated line numbers - assert "\t" in result - assert "hello" in result - - def test_hashline_offset_limit(self, tools, tmp_path): - """offset and limit work in hashline mode.""" - read_file = tools[0]["read_file"] - lines = [f"line{i}" for i in range(1, 11)] - (tmp_path / "f.txt").write_text("\n".join(lines) + "\n") - - result = read_file(path="f.txt", offset=3, limit=2, hashline=True) - output_lines = [ln for ln in result.split("\n") if ln and not ln.startswith("(")] - assert len(output_lines) == 2 - h3 = compute_line_hash("line3") - assert output_lines[0] == f"3:{h3}|line3" - - def test_hashline_no_line_truncation(self, tools, tmp_path): - """hashline mode doesn't truncate long lines (would corrupt hashes).""" - read_file = tools[0]["read_file"] - long_line = "x" * 3000 - (tmp_path / "f.txt").write_text(long_line + "\n") - - result = read_file(path="f.txt", hashline=True) - h = compute_line_hash(long_line) - assert f"1:{h}|{long_line}" in result - - -# ── search_files hashline ───────────────────────────────────────────────── - - -class TestSearchFilesHashline: - def test_hashline_in_results(self, tools, tmp_path): - """hashline=True adds hash anchors to search results.""" - search_files = tools[0]["search_files"] - (tmp_path / "f.py").write_text("def foo():\n pass\n") - - result = search_files(pattern="def foo", path=".", hashline=True) - # Result should contain hash anchor - h = compute_line_hash("def foo():") - assert h in result - assert f":{h}|" in result - - def test_hashline_false_unchanged(self, tools, tmp_path): - """Default search has no hash anchors.""" - search_files = tools[0]["search_files"] - (tmp_path / "f.py").write_text("def foo():\n pass\n") - - result = search_files(pattern="def foo", path=".", hashline=False) - h = compute_line_hash("def foo():") - assert f":{h}|" not in result - - -# ── hashline_edit ───────────────────────────────────────────────────────── - - -class TestHashlineEditBasic: - def test_returns_string(self, tools, tmp_path): - """hashline_edit returns a string, not a dict.""" - hashline_edit = tools[0]["hashline_edit"] - f = tmp_path / "f.txt" - f.write_text("aaa\nbbb\nccc\n") - - edits = json.dumps([{"op": "set_line", "anchor": _anchor(2, "bbb"), "content": "BBB"}]) - result = hashline_edit(path="f.txt", edits=edits) - assert isinstance(result, str) - assert "Applied" in result - - def test_calls_before_write(self, tools, tmp_path): - """hashline_edit calls the before_write hook.""" - hashline_edit = tools[0]["hashline_edit"] - write_calls = tools[1] - f = tmp_path / "f.txt" - f.write_text("aaa\nbbb\nccc\n") - - edits = json.dumps([{"op": "set_line", "anchor": _anchor(2, "bbb"), "content": "BBB"}]) - hashline_edit(path="f.txt", edits=edits) - assert len(write_calls) == 1 - - def test_invalid_json(self, tools, tmp_path): - """Invalid JSON returns error string.""" - hashline_edit = tools[0]["hashline_edit"] - (tmp_path / "f.txt").write_text("aaa\n") - result = hashline_edit(path="f.txt", edits="not json") - assert "Error" in result - assert "Invalid JSON" in result - - def test_empty_edits(self, tools, tmp_path): - """Empty edits array returns error.""" - hashline_edit = tools[0]["hashline_edit"] - (tmp_path / "f.txt").write_text("aaa\n") - result = hashline_edit(path="f.txt", edits="[]") - assert "Error" in result - assert "empty" in result - - def test_file_not_found(self, tools, tmp_path): - """Missing file returns error.""" - hashline_edit = tools[0]["hashline_edit"] - edits = json.dumps([{"op": "set_line", "anchor": "1:abcd", "content": "x"}]) - result = hashline_edit(path="nope.txt", edits=edits) - assert "Error" in result - assert "not found" in result - - -class TestHashlineEditSetLine: - def test_set_line(self, tools, tmp_path): - """set_line replaces a single line.""" - hashline_edit = tools[0]["hashline_edit"] - f = tmp_path / "f.txt" - f.write_text("aaa\nbbb\nccc\n") - - edits = json.dumps([{"op": "set_line", "anchor": _anchor(2, "bbb"), "content": "BBB"}]) - result = hashline_edit(path="f.txt", edits=edits) - assert "Applied 1 edit" in result - assert f.read_text() == "aaa\nBBB\nccc\n" - - def test_set_line_hash_mismatch(self, tools, tmp_path): - """set_line with wrong hash returns error.""" - hashline_edit = tools[0]["hashline_edit"] - f = tmp_path / "f.txt" - f.write_text("aaa\nbbb\nccc\n") - - edits = json.dumps([{"op": "set_line", "anchor": "2:ffff", "content": "BBB"}]) - result = hashline_edit(path="f.txt", edits=edits) - assert "Error" in result - assert "mismatch" in result.lower() - - def test_set_line_delete(self, tools, tmp_path): - """set_line with empty content deletes the line.""" - hashline_edit = tools[0]["hashline_edit"] - f = tmp_path / "f.txt" - f.write_text("aaa\nbbb\nccc\n") - - edits = json.dumps([{"op": "set_line", "anchor": _anchor(2, "bbb"), "content": ""}]) - result = hashline_edit(path="f.txt", edits=edits) - assert "Applied 1 edit" in result - assert f.read_text() == "aaa\nccc\n" - - -class TestHashlineEditReplaceLines: - def test_replace_lines(self, tools, tmp_path): - """replace_lines replaces a range.""" - hashline_edit = tools[0]["hashline_edit"] - f = tmp_path / "f.txt" - f.write_text("aaa\nbbb\nccc\nddd\n") - - edits = json.dumps( - [ - { - "op": "replace_lines", - "start_anchor": _anchor(2, "bbb"), - "end_anchor": _anchor(3, "ccc"), - "content": "XXX\nYYY\nZZZ", - } - ] - ) - result = hashline_edit(path="f.txt", edits=edits) - assert "Applied 1 edit" in result - assert f.read_text() == "aaa\nXXX\nYYY\nZZZ\nddd\n" - - -class TestHashlineEditInsert: - def test_insert_after(self, tools, tmp_path): - """insert_after adds lines after the anchor.""" - hashline_edit = tools[0]["hashline_edit"] - f = tmp_path / "f.txt" - f.write_text("aaa\nbbb\nccc\n") - - edits = json.dumps( - [ - { - "op": "insert_after", - "anchor": _anchor(1, "aaa"), - "content": "NEW", - } - ] - ) - result = hashline_edit(path="f.txt", edits=edits) - assert "Applied 1 edit" in result - assert f.read_text() == "aaa\nNEW\nbbb\nccc\n" - - def test_insert_before(self, tools, tmp_path): - """insert_before adds lines before the anchor.""" - hashline_edit = tools[0]["hashline_edit"] - f = tmp_path / "f.txt" - f.write_text("aaa\nbbb\nccc\n") - - edits = json.dumps( - [ - { - "op": "insert_before", - "anchor": _anchor(2, "bbb"), - "content": "NEW", - } - ] - ) - result = hashline_edit(path="f.txt", edits=edits) - assert "Applied 1 edit" in result - assert f.read_text() == "aaa\nNEW\nbbb\nccc\n" - - -class TestHashlineEditReplace: - def test_replace(self, tools, tmp_path): - """replace does string replacement.""" - hashline_edit = tools[0]["hashline_edit"] - f = tmp_path / "f.txt" - f.write_text("aaa\nbbb\nccc\n") - - edits = json.dumps( - [ - { - "op": "replace", - "old_content": "bbb", - "new_content": "BBB", - } - ] - ) - result = hashline_edit(path="f.txt", edits=edits) - assert "Applied 1 edit" in result - assert f.read_text() == "aaa\nBBB\nccc\n" - - def test_replace_not_found(self, tools, tmp_path): - """replace with missing old_content returns error.""" - hashline_edit = tools[0]["hashline_edit"] - f = tmp_path / "f.txt" - f.write_text("aaa\nbbb\nccc\n") - - edits = json.dumps( - [ - { - "op": "replace", - "old_content": "zzz", - "new_content": "ZZZ", - } - ] - ) - result = hashline_edit(path="f.txt", edits=edits) - assert "Error" in result - assert "not found" in result - - -class TestHashlineEditAppend: - def test_append(self, tools, tmp_path): - """append adds content at end of file.""" - hashline_edit = tools[0]["hashline_edit"] - f = tmp_path / "f.txt" - f.write_text("aaa\nbbb\n") - - edits = json.dumps([{"op": "append", "content": "ccc\nddd"}]) - result = hashline_edit(path="f.txt", edits=edits) - assert "Applied 1 edit" in result - assert f.read_text() == "aaa\nbbb\nccc\nddd\n" - - -class TestHashlineEditOverlap: - def test_overlapping_edits_rejected(self, tools, tmp_path): - """Overlapping splice ranges are rejected.""" - hashline_edit = tools[0]["hashline_edit"] - f = tmp_path / "f.txt" - f.write_text("aaa\nbbb\nccc\nddd\n") - - edits = json.dumps( - [ - {"op": "set_line", "anchor": _anchor(2, "bbb"), "content": "BBB"}, - { - "op": "replace_lines", - "start_anchor": _anchor(1, "aaa"), - "end_anchor": _anchor(3, "ccc"), - "content": "XXX", - }, - ] - ) - result = hashline_edit(path="f.txt", edits=edits) - assert "Error" in result - assert "Overlapping" in result - - -class TestHashlineEditAutoCleanup: - def test_strips_hashline_prefix_multiline(self, tools, tmp_path): - """auto_cleanup strips N:hhhh| prefixes from multi-line content.""" - hashline_edit = tools[0]["hashline_edit"] - f = tmp_path / "f.txt" - f.write_text("aaa\nbbb\nccc\nddd\n") - - h_bbb = compute_line_hash("bbb") - h_ccc = compute_line_hash("ccc") - # LLM echoes hashline prefixes in replace_lines content - edits = json.dumps( - [ - { - "op": "replace_lines", - "start_anchor": _anchor(2, "bbb"), - "end_anchor": _anchor(3, "ccc"), - "content": f"2:{h_bbb}|BBB\n3:{h_ccc}|CCC", - } - ] - ) - result = hashline_edit(path="f.txt", edits=edits) - assert "Applied 1 edit" in result - # Should have stripped the prefixes - assert f.read_text() == "aaa\nBBB\nCCC\nddd\n" - assert "cleanup" in result.lower() - - def test_no_cleanup_when_disabled(self, tools, tmp_path): - """auto_cleanup=False writes content as-is.""" - hashline_edit = tools[0]["hashline_edit"] - f = tmp_path / "f.txt" - f.write_text("aaa\nbbb\nccc\n") - - h = compute_line_hash("bbb") - raw_content = f"2:{h}|BBB" - edits = json.dumps( - [ - { - "op": "set_line", - "anchor": _anchor(2, "bbb"), - "content": raw_content, - } - ] - ) - result = hashline_edit(path="f.txt", edits=edits, auto_cleanup=False) - assert "Applied 1 edit" in result - assert f.read_text() == f"aaa\n{raw_content}\nccc\n" - - -class TestHashlineEditAtomicWrite: - @pytest.mark.skipif(sys.platform == "win32", reason="POSIX permissions not supported on Windows") - def test_preserves_permissions(self, tools, tmp_path): - """Atomic write preserves original file permissions.""" - hashline_edit = tools[0]["hashline_edit"] - f = tmp_path / "f.txt" - f.write_text("aaa\nbbb\n") - os.chmod(f, 0o755) - - edits = json.dumps([{"op": "set_line", "anchor": _anchor(1, "aaa"), "content": "AAA"}]) - hashline_edit(path="f.txt", edits=edits) - assert os.stat(f).st_mode & 0o777 == 0o755 - - @pytest.mark.skipif(sys.platform != "win32", reason="Windows-only ACL test") - def test_acl_preserved_after_edit_windows(self, tools, tmp_path): - """Atomic replace preserves the target file's DACL on Windows.""" - import ctypes - - advapi32 = ctypes.windll.advapi32 - kernel32 = ctypes.windll.kernel32 - SE_FILE_OBJECT = 1 - DACL_SECURITY_INFORMATION = 0x00000004 - - advapi32.GetNamedSecurityInfoW.argtypes = [ - ctypes.wintypes.LPCWSTR, # pObjectName - ctypes.c_uint, # ObjectType (SE_OBJECT_TYPE enum) - ctypes.wintypes.DWORD, # SecurityInfo - ctypes.c_void_p, # ppsidOwner - ctypes.c_void_p, # ppsidGroup - ctypes.c_void_p, # ppDacl - ctypes.c_void_p, # ppSacl - ctypes.c_void_p, # ppSecurityDescriptor - ] - advapi32.GetNamedSecurityInfoW.restype = ctypes.wintypes.DWORD - - advapi32.ConvertSecurityDescriptorToStringSecurityDescriptorW.argtypes = [ - ctypes.c_void_p, # SecurityDescriptor - ctypes.wintypes.DWORD, # RequestedStringSDRevision - ctypes.wintypes.DWORD, # SecurityInformation - ctypes.c_void_p, # StringSecurityDescriptor (out) - ctypes.c_void_p, # StringSecurityDescriptorLen (out, optional) - ] - advapi32.ConvertSecurityDescriptorToStringSecurityDescriptorW.restype = ctypes.wintypes.BOOL - - kernel32.LocalFree.argtypes = [ctypes.c_void_p] - kernel32.LocalFree.restype = ctypes.c_void_p - - hashline_edit = tools[0]["hashline_edit"] - f = tmp_path / "f.txt" - f.write_text("aaa\nbbb\n") - - def _read_dacl_sddl(path): - sd = ctypes.c_void_p() - dacl = ctypes.c_void_p() - rc = advapi32.GetNamedSecurityInfoW( - str(path), - SE_FILE_OBJECT, - DACL_SECURITY_INFORMATION, - None, - None, - ctypes.byref(dacl), - None, - ctypes.byref(sd), - ) - assert rc == 0, f"GetNamedSecurityInfoW failed: {rc}" - sddl = ctypes.c_wchar_p() - assert advapi32.ConvertSecurityDescriptorToStringSecurityDescriptorW( - sd, - 1, - DACL_SECURITY_INFORMATION, - ctypes.byref(sddl), - None, - ) - value = sddl.value - kernel32.LocalFree(sddl) - kernel32.LocalFree(sd) - return value - - acl_before = _read_dacl_sddl(f) - - edits = json.dumps([{"op": "set_line", "anchor": _anchor(1, "aaa"), "content": "AAA"}]) - hashline_edit(path="f.txt", edits=edits) - - acl_after = _read_dacl_sddl(f) - - assert acl_before == acl_after, f"ACL changed after edit: {acl_before} -> {acl_after}" - - @pytest.mark.skipif(sys.platform != "win32", reason="Windows-only ACL test") - def test_edit_succeeds_when_dacl_unavailable_windows(self, tools, tmp_path): - """Edit still works on volumes without ACL support (e.g. FAT32).""" - from aden_tools import _win32_atomic - - hashline_edit = tools[0]["hashline_edit"] - f = tmp_path / "f.txt" - f.write_text("aaa\nbbb\n") - - with patch.object(_win32_atomic, "snapshot_dacl", return_value=None): - edits = json.dumps([{"op": "set_line", "anchor": _anchor(1, "aaa"), "content": "AAA"}]) - hashline_edit(path="f.txt", edits=edits) - - assert f.read_text().splitlines()[0].endswith("AAA") - - def test_preserves_trailing_newline(self, tools, tmp_path): - """Files with trailing newline keep it after edit.""" - hashline_edit = tools[0]["hashline_edit"] - f = tmp_path / "f.txt" - f.write_text("aaa\nbbb\n") - - edits = json.dumps([{"op": "set_line", "anchor": _anchor(1, "aaa"), "content": "AAA"}]) - hashline_edit(path="f.txt", edits=edits) - assert f.read_text().endswith("\n") - - def test_unknown_op(self, tools, tmp_path): - """Unknown op returns error.""" - hashline_edit = tools[0]["hashline_edit"] - f = tmp_path / "f.txt" - f.write_text("aaa\n") - - edits = json.dumps([{"op": "delete_line", "anchor": "1:abcd"}]) - result = hashline_edit(path="f.txt", edits=edits) - assert "Error" in result - assert "unknown op" in result - - def test_crlf_replace_op_no_double_conversion(self, tools, tmp_path): - """Replace op on a CRLF file should not corrupt \\r\\n in new_content.""" - hashline_edit = tools[0]["hashline_edit"] - f = tmp_path / "f.txt" - f.write_bytes(b"aaa\r\nbbb\r\nccc\r\n") - - edits = json.dumps([{"op": "replace", "old_content": "aaa", "new_content": "x\r\ny"}]) - result = hashline_edit(path="f.txt", edits=edits) - assert "Error" not in result - - raw = f.read_bytes() - assert b"\r\r\n" not in raw - assert raw == b"x\r\ny\r\nbbb\r\nccc\r\n" - - -class TestHashlineEditResponseFormat: - def test_shows_updated_content(self, tools, tmp_path): - """Response includes updated hashline content.""" - hashline_edit = tools[0]["hashline_edit"] - f = tmp_path / "f.txt" - f.write_text("aaa\nbbb\nccc\n") - - edits = json.dumps([{"op": "set_line", "anchor": _anchor(2, "bbb"), "content": "BBB"}]) - result = hashline_edit(path="f.txt", edits=edits) - # Should show updated content in hashline format - h_new = compute_line_hash("BBB") - assert f"2:{h_new}|BBB" in result - - def test_pagination_hint_for_large_files(self, tools, tmp_path): - """Response includes pagination hint when file > 200 lines.""" - hashline_edit = tools[0]["hashline_edit"] - f = tmp_path / "f.txt" - lines = [f"line{i}" for i in range(300)] - f.write_text("\n".join(lines) + "\n") - - edits = json.dumps([{"op": "set_line", "anchor": _anchor(1, "line0"), "content": "FIRST"}]) - result = hashline_edit(path="f.txt", edits=edits) - assert "Showing first 200" in result - assert "300 lines" in result