feat: refactor edit file and update default tools

This commit is contained in:
Richard Tang
2026-05-01 12:40:53 -07:00
parent 574c4bbe33
commit 35bd497750
7 changed files with 1112 additions and 1392 deletions
+6 -3
View File
@@ -2,9 +2,12 @@
"""
File Tools MCP Server
Minimal FastMCP server exposing 5 file tools (read_file, write_file, edit_file,
hashline_edit, search_files) with no path sandboxing. ``search_files`` is
Minimal FastMCP server exposing 4 file tools (read_file, write_file,
search_files, edit_file) with no path sandboxing. ``search_files`` is
unified covers grep, find, and ls via target='content' / target='files'.
``edit_file`` is unified covers single-file fuzzy find/replace
(mode='replace') and multi-file structured patches with two-phase apply
(mode='patch').
Usage:
# Run with STDIO transport (for agent integration)
@@ -82,7 +85,7 @@ def main() -> None:
args = parser.parse_args()
if not args.stdio:
logger.info("Registered 5 file tools: read_file, write_file, edit_file, hashline_edit, search_files")
logger.info("Registered 4 file tools: read_file, write_file, search_files, edit_file")
if args.stdio:
mcp.run(transport="stdio")
File diff suppressed because it is too large Load Diff
+152 -52
View File
@@ -1,11 +1,9 @@
"""Tests for aden_tools.file_ops — the unified file-tool surface.
Covers the path policy (home anchoring, deny lists, write_safe_root),
plus the six file tools: read_file, write_file, edit_file, hashline_edit,
search_files, apply_patch.
plus the four file tools: read_file, write_file, search_files, patch.
"""
import json
import os
from unittest.mock import patch
@@ -261,70 +259,172 @@ class TestPathPolicyWriteSafeRoot:
assert "Error" in result
class TestApplyPatchTool:
"""apply_patch — diff_match_patch text → file."""
class TestPatchToolReplaceMode:
"""patch(mode='replace') — single-file fuzzy find/replace."""
def test_apply_patch_modifies_file(self, file_ops_mcp, tmp_path):
"""A valid patch applies and rewrites the file."""
import diff_match_patch as dmp_module
target = tmp_path / "patch_me.txt"
def test_replace_basic(self, file_ops_mcp, tmp_path):
"""Exact match path: simple substitution."""
target = tmp_path / "a.txt"
target.write_text("Hello World", encoding="utf-8")
dmp = dmp_module.diff_match_patch()
patches = dmp.patch_make("Hello World", "Hello Universe")
patch_text = dmp.patch_toText(patches)
apply_fn = _get_tool_fn(file_ops_mcp, "apply_patch")
result = apply_fn(path="patch_me.txt", patch_text=patch_text)
assert "Error" not in result
assert "Applied" in result
edit_fn = _get_tool_fn(file_ops_mcp, "edit_file")
result = edit_fn(mode="replace", path="a.txt", old_string="World", new_string="Universe")
assert "Replaced 1" in result
assert target.read_text() == "Hello Universe"
def test_apply_patch_missing_file(self, file_ops_mcp):
"""Patching a non-existent file returns an error string."""
apply_fn = _get_tool_fn(file_ops_mcp, "apply_patch")
result = apply_fn(path="nope.txt", patch_text="garbage")
def test_replace_unicode_normalized(self, file_ops_mcp, tmp_path):
"""Smart quotes in old_string match plain quotes on disk."""
target = tmp_path / "b.py"
target.write_text('print("hi")\n', encoding="utf-8")
edit_fn = _get_tool_fn(file_ops_mcp, "edit_file")
# old_string uses smart quotes; file has straight quotes
result = edit_fn(
mode="replace",
path="b.py",
old_string='print(“hi”)',
new_string='print("HELLO")',
)
assert "Error" not in result
assert target.read_text() == 'print("HELLO")\n'
def test_replace_escape_normalized(self, file_ops_mcp, tmp_path):
"""Literal '\\n' in old_string matches actual newline on disk."""
target = tmp_path / "c.txt"
target.write_text("alpha\nbeta\n", encoding="utf-8")
edit_fn = _get_tool_fn(file_ops_mcp, "edit_file")
result = edit_fn(
mode="replace",
path="c.txt",
old_string="alpha\\nbeta",
new_string="X",
)
assert "Error" not in result
assert target.read_text() == "X\n"
def test_replace_missing_file(self, file_ops_mcp):
"""Replacing in a non-existent file returns an error."""
edit_fn = _get_tool_fn(file_ops_mcp, "edit_file")
result = edit_fn(mode="replace", path="nope.txt", old_string="x", new_string="y")
assert "Error" in result
assert "not found" in result.lower()
def test_apply_patch_garbage_text(self, file_ops_mcp, tmp_path):
"""Patch text that produces no patches is rejected without writing."""
target = tmp_path / "f.txt"
target.write_text("original", encoding="utf-8")
apply_fn = _get_tool_fn(file_ops_mcp, "apply_patch")
result = apply_fn(path="f.txt", patch_text="not a patch")
def test_replace_no_match(self, file_ops_mcp, tmp_path):
"""Failure path includes a hint pointing at read_file/search_files."""
target = tmp_path / "d.txt"
target.write_text("alpha\n", encoding="utf-8")
edit_fn = _get_tool_fn(file_ops_mcp, "edit_file")
result = edit_fn(mode="replace", path="d.txt", old_string="zzz", new_string="y")
assert "Error" in result
assert target.read_text() == "original"
assert "read_file" in result or "search_files" in result
def test_apply_patch_write_denied_for_system_path(self, file_ops_mcp):
"""The deny list applies to apply_patch just like write_file."""
apply_fn = _get_tool_fn(file_ops_mcp, "apply_patch")
result = apply_fn(path="/etc/passwd", patch_text="x")
def test_replace_denied_for_system_path(self, file_ops_mcp):
"""The deny list applies to replace mode."""
edit_fn = _get_tool_fn(file_ops_mcp, "edit_file")
result = edit_fn(mode="replace", path="/etc/passwd", old_string="a", new_string="b")
assert "Error" in result
assert "denied" in result.lower()
class TestHashlineEditViaPolicy:
"""hashline_edit honors the same path policy as the rest."""
class TestPatchToolPatchMode:
"""patch(mode='patch') — multi-file structured patch."""
def test_hashline_edit_relative_path(self, file_ops_mcp, tmp_path):
"""hashline_edit on a relative path lands in home."""
from aden_tools.hashline import compute_line_hash
def test_patch_update_single_file(self, file_ops_mcp, tmp_path):
"""A V4A Update hunk replaces matched lines and writes."""
target = tmp_path / "u.py"
target.write_text("def f():\n return 1\n", encoding="utf-8")
body = (
"*** Begin Patch\n"
"*** Update File: u.py\n"
" def f():\n"
"- return 1\n"
"+ return 42\n"
"*** End Patch\n"
)
edit_fn = _get_tool_fn(file_ops_mcp, "edit_file")
result = edit_fn(mode="patch", patch_text=body)
assert "Error" not in result
assert "Modified" in result
assert target.read_text() == "def f():\n return 42\n"
target = tmp_path / "hl.txt"
target.write_text("aaa\nbbb\nccc\n", encoding="utf-8")
def test_patch_add_file(self, file_ops_mcp, tmp_path):
"""Add File: creates a new file from + lines."""
body = (
"*** Begin Patch\n"
"*** Add File: new.py\n"
"+# new\n"
"+x = 1\n"
"*** End Patch\n"
)
edit_fn = _get_tool_fn(file_ops_mcp, "edit_file")
result = edit_fn(mode="patch", patch_text=body)
assert "Error" not in result
assert "Created" in result
created = tmp_path / "new.py"
assert created.exists()
assert created.read_text() == "# new\nx = 1\n"
edits = json.dumps([{"op": "set_line", "anchor": f"2:{compute_line_hash('bbb')}", "content": "BBB"}])
hashline_fn = _get_tool_fn(file_ops_mcp, "hashline_edit")
result = hashline_fn(path="hl.txt", edits=edits)
assert "Applied" in result
assert target.read_text() == "aaa\nBBB\nccc\n"
def test_patch_delete_file(self, file_ops_mcp, tmp_path):
"""Delete File: removes the file."""
target = tmp_path / "doomed.py"
target.write_text("bye\n", encoding="utf-8")
body = "*** Begin Patch\n*** Delete File: doomed.py\n*** End Patch\n"
edit_fn = _get_tool_fn(file_ops_mcp, "edit_file")
result = edit_fn(mode="patch", patch_text=body)
assert "Error" not in result
assert "Deleted" in result
assert not target.exists()
def test_hashline_edit_denied_for_system_path(self, file_ops_mcp):
"""The deny list also covers hashline_edit."""
hashline_fn = _get_tool_fn(file_ops_mcp, "hashline_edit")
result = hashline_fn(path="/etc/passwd", edits="[]")
# Either deny-list error or empty-edits error — both before the write.
def test_patch_move_file(self, file_ops_mcp, tmp_path):
"""Move File: renames source to destination."""
src = tmp_path / "src.py"
src.write_text("kept\n", encoding="utf-8")
body = "*** Begin Patch\n*** Move File: src.py -> dst.py\n*** End Patch\n"
edit_fn = _get_tool_fn(file_ops_mcp, "edit_file")
result = edit_fn(mode="patch", patch_text=body)
assert "Error" not in result
dst = tmp_path / "dst.py"
assert not src.exists()
assert dst.exists()
assert dst.read_text() == "kept\n"
def test_patch_atomic_failure(self, file_ops_mcp, tmp_path):
"""One failing op aborts the whole batch with no writes."""
target = tmp_path / "real.py"
target.write_text("original\n", encoding="utf-8")
body = (
"*** Begin Patch\n"
"*** Update File: real.py\n"
"-original\n"
"+changed\n"
"*** Update File: missing.py\n"
"-foo\n"
"+bar\n"
"*** End Patch\n"
)
edit_fn = _get_tool_fn(file_ops_mcp, "edit_file")
result = edit_fn(mode="patch", patch_text=body)
assert "Error" in result
# First file must be untouched — atomic semantics.
assert target.read_text() == "original\n"
def test_patch_lenient_missing_markers(self, file_ops_mcp, tmp_path):
"""Begin/End markers are optional."""
target = tmp_path / "loose.py"
target.write_text("a\nb\nc\n", encoding="utf-8")
body = "*** Update File: loose.py\n-b\n+B\n"
edit_fn = _get_tool_fn(file_ops_mcp, "edit_file")
result = edit_fn(mode="patch", patch_text=body)
assert "Error" not in result
assert target.read_text() == "a\nB\nc\n"
def test_patch_empty_body_rejected(self, file_ops_mcp):
"""Empty patch_text returns an error before any work."""
edit_fn = _get_tool_fn(file_ops_mcp, "edit_file")
result = edit_fn(mode="patch", patch_text="")
assert "Error" in result
def test_patch_unknown_mode(self, file_ops_mcp):
"""A typo in mode is surfaced as an error string."""
edit_fn = _get_tool_fn(file_ops_mcp, "edit_file")
result = edit_fn(mode="bogus")
assert "Error" in result
assert "unknown mode" in result.lower()
-563
View File
@@ -1,563 +0,0 @@
"""Tests for hashline support in file_ops (coder tools)."""
import json
import os
import sys
from unittest.mock import patch
import pytest
from fastmcp import FastMCP
from aden_tools.hashline import compute_line_hash
@pytest.fixture(autouse=True)
def _bypass_stale_edit_guard():
"""These tests exercise edit logic directly without a prior read_file,
so the Gap 4 stale-edit guard would reject every call. Force
check_fresh to always return FRESH here; the cache itself is
covered by ``tools/tests/test_file_state_cache.py``.
"""
from aden_tools.file_state_cache import Freshness, FreshResult
with patch(
"aden_tools.file_ops.check_fresh",
return_value=FreshResult(Freshness.FRESH),
):
yield
def _anchor(line_num, line_text):
"""Build an anchor string N:hhhh."""
return f"{line_num}:{compute_line_hash(line_text)}"
@pytest.fixture
def tools(tmp_path):
"""Register file_ops tools with tmp_path as project root."""
from aden_tools.file_ops import register_file_tools
mcp = FastMCP("test-server")
write_calls = []
def _before_write():
write_calls.append(1)
register_file_tools(
mcp,
home=str(tmp_path),
before_write=_before_write,
)
tool_map = {name: t.fn for name, t in mcp._tool_manager._tools.items()}
return tool_map, write_calls
# ── read_file hashline ────────────────────────────────────────────────────
class TestReadFileHashline:
def test_hashline_format(self, tools, tmp_path):
"""hashline=True returns N:hhhh|content format."""
read_file = tools[0]["read_file"]
(tmp_path / "f.txt").write_text("hello\nworld\n")
result = read_file(path="f.txt", hashline=True)
lines = result.strip().split("\n")
# First two lines should be hashline formatted
h1 = compute_line_hash("hello")
h2 = compute_line_hash("world")
assert lines[0] == f"1:{h1}|hello"
assert lines[1] == f"2:{h2}|world"
def test_hashline_false_unchanged(self, tools, tmp_path):
"""Default (hashline=False) returns standard line-number format."""
read_file = tools[0]["read_file"]
(tmp_path / "f.txt").write_text("hello\n")
result = read_file(path="f.txt", hashline=False)
# Standard format uses tab-separated line numbers
assert "\t" in result
assert "hello" in result
def test_hashline_offset_limit(self, tools, tmp_path):
"""offset and limit work in hashline mode."""
read_file = tools[0]["read_file"]
lines = [f"line{i}" for i in range(1, 11)]
(tmp_path / "f.txt").write_text("\n".join(lines) + "\n")
result = read_file(path="f.txt", offset=3, limit=2, hashline=True)
output_lines = [ln for ln in result.split("\n") if ln and not ln.startswith("(")]
assert len(output_lines) == 2
h3 = compute_line_hash("line3")
assert output_lines[0] == f"3:{h3}|line3"
def test_hashline_no_line_truncation(self, tools, tmp_path):
"""hashline mode doesn't truncate long lines (would corrupt hashes)."""
read_file = tools[0]["read_file"]
long_line = "x" * 3000
(tmp_path / "f.txt").write_text(long_line + "\n")
result = read_file(path="f.txt", hashline=True)
h = compute_line_hash(long_line)
assert f"1:{h}|{long_line}" in result
# ── search_files hashline ─────────────────────────────────────────────────
class TestSearchFilesHashline:
def test_hashline_in_results(self, tools, tmp_path):
"""hashline=True adds hash anchors to search results."""
search_files = tools[0]["search_files"]
(tmp_path / "f.py").write_text("def foo():\n pass\n")
result = search_files(pattern="def foo", path=".", hashline=True)
# Result should contain hash anchor
h = compute_line_hash("def foo():")
assert h in result
assert f":{h}|" in result
def test_hashline_false_unchanged(self, tools, tmp_path):
"""Default search has no hash anchors."""
search_files = tools[0]["search_files"]
(tmp_path / "f.py").write_text("def foo():\n pass\n")
result = search_files(pattern="def foo", path=".", hashline=False)
h = compute_line_hash("def foo():")
assert f":{h}|" not in result
# ── hashline_edit ─────────────────────────────────────────────────────────
class TestHashlineEditBasic:
def test_returns_string(self, tools, tmp_path):
"""hashline_edit returns a string, not a dict."""
hashline_edit = tools[0]["hashline_edit"]
f = tmp_path / "f.txt"
f.write_text("aaa\nbbb\nccc\n")
edits = json.dumps([{"op": "set_line", "anchor": _anchor(2, "bbb"), "content": "BBB"}])
result = hashline_edit(path="f.txt", edits=edits)
assert isinstance(result, str)
assert "Applied" in result
def test_calls_before_write(self, tools, tmp_path):
"""hashline_edit calls the before_write hook."""
hashline_edit = tools[0]["hashline_edit"]
write_calls = tools[1]
f = tmp_path / "f.txt"
f.write_text("aaa\nbbb\nccc\n")
edits = json.dumps([{"op": "set_line", "anchor": _anchor(2, "bbb"), "content": "BBB"}])
hashline_edit(path="f.txt", edits=edits)
assert len(write_calls) == 1
def test_invalid_json(self, tools, tmp_path):
"""Invalid JSON returns error string."""
hashline_edit = tools[0]["hashline_edit"]
(tmp_path / "f.txt").write_text("aaa\n")
result = hashline_edit(path="f.txt", edits="not json")
assert "Error" in result
assert "Invalid JSON" in result
def test_empty_edits(self, tools, tmp_path):
"""Empty edits array returns error."""
hashline_edit = tools[0]["hashline_edit"]
(tmp_path / "f.txt").write_text("aaa\n")
result = hashline_edit(path="f.txt", edits="[]")
assert "Error" in result
assert "empty" in result
def test_file_not_found(self, tools, tmp_path):
"""Missing file returns error."""
hashline_edit = tools[0]["hashline_edit"]
edits = json.dumps([{"op": "set_line", "anchor": "1:abcd", "content": "x"}])
result = hashline_edit(path="nope.txt", edits=edits)
assert "Error" in result
assert "not found" in result
class TestHashlineEditSetLine:
def test_set_line(self, tools, tmp_path):
"""set_line replaces a single line."""
hashline_edit = tools[0]["hashline_edit"]
f = tmp_path / "f.txt"
f.write_text("aaa\nbbb\nccc\n")
edits = json.dumps([{"op": "set_line", "anchor": _anchor(2, "bbb"), "content": "BBB"}])
result = hashline_edit(path="f.txt", edits=edits)
assert "Applied 1 edit" in result
assert f.read_text() == "aaa\nBBB\nccc\n"
def test_set_line_hash_mismatch(self, tools, tmp_path):
"""set_line with wrong hash returns error."""
hashline_edit = tools[0]["hashline_edit"]
f = tmp_path / "f.txt"
f.write_text("aaa\nbbb\nccc\n")
edits = json.dumps([{"op": "set_line", "anchor": "2:ffff", "content": "BBB"}])
result = hashline_edit(path="f.txt", edits=edits)
assert "Error" in result
assert "mismatch" in result.lower()
def test_set_line_delete(self, tools, tmp_path):
"""set_line with empty content deletes the line."""
hashline_edit = tools[0]["hashline_edit"]
f = tmp_path / "f.txt"
f.write_text("aaa\nbbb\nccc\n")
edits = json.dumps([{"op": "set_line", "anchor": _anchor(2, "bbb"), "content": ""}])
result = hashline_edit(path="f.txt", edits=edits)
assert "Applied 1 edit" in result
assert f.read_text() == "aaa\nccc\n"
class TestHashlineEditReplaceLines:
def test_replace_lines(self, tools, tmp_path):
"""replace_lines replaces a range."""
hashline_edit = tools[0]["hashline_edit"]
f = tmp_path / "f.txt"
f.write_text("aaa\nbbb\nccc\nddd\n")
edits = json.dumps(
[
{
"op": "replace_lines",
"start_anchor": _anchor(2, "bbb"),
"end_anchor": _anchor(3, "ccc"),
"content": "XXX\nYYY\nZZZ",
}
]
)
result = hashline_edit(path="f.txt", edits=edits)
assert "Applied 1 edit" in result
assert f.read_text() == "aaa\nXXX\nYYY\nZZZ\nddd\n"
class TestHashlineEditInsert:
def test_insert_after(self, tools, tmp_path):
"""insert_after adds lines after the anchor."""
hashline_edit = tools[0]["hashline_edit"]
f = tmp_path / "f.txt"
f.write_text("aaa\nbbb\nccc\n")
edits = json.dumps(
[
{
"op": "insert_after",
"anchor": _anchor(1, "aaa"),
"content": "NEW",
}
]
)
result = hashline_edit(path="f.txt", edits=edits)
assert "Applied 1 edit" in result
assert f.read_text() == "aaa\nNEW\nbbb\nccc\n"
def test_insert_before(self, tools, tmp_path):
"""insert_before adds lines before the anchor."""
hashline_edit = tools[0]["hashline_edit"]
f = tmp_path / "f.txt"
f.write_text("aaa\nbbb\nccc\n")
edits = json.dumps(
[
{
"op": "insert_before",
"anchor": _anchor(2, "bbb"),
"content": "NEW",
}
]
)
result = hashline_edit(path="f.txt", edits=edits)
assert "Applied 1 edit" in result
assert f.read_text() == "aaa\nNEW\nbbb\nccc\n"
class TestHashlineEditReplace:
def test_replace(self, tools, tmp_path):
"""replace does string replacement."""
hashline_edit = tools[0]["hashline_edit"]
f = tmp_path / "f.txt"
f.write_text("aaa\nbbb\nccc\n")
edits = json.dumps(
[
{
"op": "replace",
"old_content": "bbb",
"new_content": "BBB",
}
]
)
result = hashline_edit(path="f.txt", edits=edits)
assert "Applied 1 edit" in result
assert f.read_text() == "aaa\nBBB\nccc\n"
def test_replace_not_found(self, tools, tmp_path):
"""replace with missing old_content returns error."""
hashline_edit = tools[0]["hashline_edit"]
f = tmp_path / "f.txt"
f.write_text("aaa\nbbb\nccc\n")
edits = json.dumps(
[
{
"op": "replace",
"old_content": "zzz",
"new_content": "ZZZ",
}
]
)
result = hashline_edit(path="f.txt", edits=edits)
assert "Error" in result
assert "not found" in result
class TestHashlineEditAppend:
def test_append(self, tools, tmp_path):
"""append adds content at end of file."""
hashline_edit = tools[0]["hashline_edit"]
f = tmp_path / "f.txt"
f.write_text("aaa\nbbb\n")
edits = json.dumps([{"op": "append", "content": "ccc\nddd"}])
result = hashline_edit(path="f.txt", edits=edits)
assert "Applied 1 edit" in result
assert f.read_text() == "aaa\nbbb\nccc\nddd\n"
class TestHashlineEditOverlap:
def test_overlapping_edits_rejected(self, tools, tmp_path):
"""Overlapping splice ranges are rejected."""
hashline_edit = tools[0]["hashline_edit"]
f = tmp_path / "f.txt"
f.write_text("aaa\nbbb\nccc\nddd\n")
edits = json.dumps(
[
{"op": "set_line", "anchor": _anchor(2, "bbb"), "content": "BBB"},
{
"op": "replace_lines",
"start_anchor": _anchor(1, "aaa"),
"end_anchor": _anchor(3, "ccc"),
"content": "XXX",
},
]
)
result = hashline_edit(path="f.txt", edits=edits)
assert "Error" in result
assert "Overlapping" in result
class TestHashlineEditAutoCleanup:
def test_strips_hashline_prefix_multiline(self, tools, tmp_path):
"""auto_cleanup strips N:hhhh| prefixes from multi-line content."""
hashline_edit = tools[0]["hashline_edit"]
f = tmp_path / "f.txt"
f.write_text("aaa\nbbb\nccc\nddd\n")
h_bbb = compute_line_hash("bbb")
h_ccc = compute_line_hash("ccc")
# LLM echoes hashline prefixes in replace_lines content
edits = json.dumps(
[
{
"op": "replace_lines",
"start_anchor": _anchor(2, "bbb"),
"end_anchor": _anchor(3, "ccc"),
"content": f"2:{h_bbb}|BBB\n3:{h_ccc}|CCC",
}
]
)
result = hashline_edit(path="f.txt", edits=edits)
assert "Applied 1 edit" in result
# Should have stripped the prefixes
assert f.read_text() == "aaa\nBBB\nCCC\nddd\n"
assert "cleanup" in result.lower()
def test_no_cleanup_when_disabled(self, tools, tmp_path):
"""auto_cleanup=False writes content as-is."""
hashline_edit = tools[0]["hashline_edit"]
f = tmp_path / "f.txt"
f.write_text("aaa\nbbb\nccc\n")
h = compute_line_hash("bbb")
raw_content = f"2:{h}|BBB"
edits = json.dumps(
[
{
"op": "set_line",
"anchor": _anchor(2, "bbb"),
"content": raw_content,
}
]
)
result = hashline_edit(path="f.txt", edits=edits, auto_cleanup=False)
assert "Applied 1 edit" in result
assert f.read_text() == f"aaa\n{raw_content}\nccc\n"
class TestHashlineEditAtomicWrite:
@pytest.mark.skipif(sys.platform == "win32", reason="POSIX permissions not supported on Windows")
def test_preserves_permissions(self, tools, tmp_path):
"""Atomic write preserves original file permissions."""
hashline_edit = tools[0]["hashline_edit"]
f = tmp_path / "f.txt"
f.write_text("aaa\nbbb\n")
os.chmod(f, 0o755)
edits = json.dumps([{"op": "set_line", "anchor": _anchor(1, "aaa"), "content": "AAA"}])
hashline_edit(path="f.txt", edits=edits)
assert os.stat(f).st_mode & 0o777 == 0o755
@pytest.mark.skipif(sys.platform != "win32", reason="Windows-only ACL test")
def test_acl_preserved_after_edit_windows(self, tools, tmp_path):
"""Atomic replace preserves the target file's DACL on Windows."""
import ctypes
advapi32 = ctypes.windll.advapi32
kernel32 = ctypes.windll.kernel32
SE_FILE_OBJECT = 1
DACL_SECURITY_INFORMATION = 0x00000004
advapi32.GetNamedSecurityInfoW.argtypes = [
ctypes.wintypes.LPCWSTR, # pObjectName
ctypes.c_uint, # ObjectType (SE_OBJECT_TYPE enum)
ctypes.wintypes.DWORD, # SecurityInfo
ctypes.c_void_p, # ppsidOwner
ctypes.c_void_p, # ppsidGroup
ctypes.c_void_p, # ppDacl
ctypes.c_void_p, # ppSacl
ctypes.c_void_p, # ppSecurityDescriptor
]
advapi32.GetNamedSecurityInfoW.restype = ctypes.wintypes.DWORD
advapi32.ConvertSecurityDescriptorToStringSecurityDescriptorW.argtypes = [
ctypes.c_void_p, # SecurityDescriptor
ctypes.wintypes.DWORD, # RequestedStringSDRevision
ctypes.wintypes.DWORD, # SecurityInformation
ctypes.c_void_p, # StringSecurityDescriptor (out)
ctypes.c_void_p, # StringSecurityDescriptorLen (out, optional)
]
advapi32.ConvertSecurityDescriptorToStringSecurityDescriptorW.restype = ctypes.wintypes.BOOL
kernel32.LocalFree.argtypes = [ctypes.c_void_p]
kernel32.LocalFree.restype = ctypes.c_void_p
hashline_edit = tools[0]["hashline_edit"]
f = tmp_path / "f.txt"
f.write_text("aaa\nbbb\n")
def _read_dacl_sddl(path):
sd = ctypes.c_void_p()
dacl = ctypes.c_void_p()
rc = advapi32.GetNamedSecurityInfoW(
str(path),
SE_FILE_OBJECT,
DACL_SECURITY_INFORMATION,
None,
None,
ctypes.byref(dacl),
None,
ctypes.byref(sd),
)
assert rc == 0, f"GetNamedSecurityInfoW failed: {rc}"
sddl = ctypes.c_wchar_p()
assert advapi32.ConvertSecurityDescriptorToStringSecurityDescriptorW(
sd,
1,
DACL_SECURITY_INFORMATION,
ctypes.byref(sddl),
None,
)
value = sddl.value
kernel32.LocalFree(sddl)
kernel32.LocalFree(sd)
return value
acl_before = _read_dacl_sddl(f)
edits = json.dumps([{"op": "set_line", "anchor": _anchor(1, "aaa"), "content": "AAA"}])
hashline_edit(path="f.txt", edits=edits)
acl_after = _read_dacl_sddl(f)
assert acl_before == acl_after, f"ACL changed after edit: {acl_before} -> {acl_after}"
@pytest.mark.skipif(sys.platform != "win32", reason="Windows-only ACL test")
def test_edit_succeeds_when_dacl_unavailable_windows(self, tools, tmp_path):
"""Edit still works on volumes without ACL support (e.g. FAT32)."""
from aden_tools import _win32_atomic
hashline_edit = tools[0]["hashline_edit"]
f = tmp_path / "f.txt"
f.write_text("aaa\nbbb\n")
with patch.object(_win32_atomic, "snapshot_dacl", return_value=None):
edits = json.dumps([{"op": "set_line", "anchor": _anchor(1, "aaa"), "content": "AAA"}])
hashline_edit(path="f.txt", edits=edits)
assert f.read_text().splitlines()[0].endswith("AAA")
def test_preserves_trailing_newline(self, tools, tmp_path):
"""Files with trailing newline keep it after edit."""
hashline_edit = tools[0]["hashline_edit"]
f = tmp_path / "f.txt"
f.write_text("aaa\nbbb\n")
edits = json.dumps([{"op": "set_line", "anchor": _anchor(1, "aaa"), "content": "AAA"}])
hashline_edit(path="f.txt", edits=edits)
assert f.read_text().endswith("\n")
def test_unknown_op(self, tools, tmp_path):
"""Unknown op returns error."""
hashline_edit = tools[0]["hashline_edit"]
f = tmp_path / "f.txt"
f.write_text("aaa\n")
edits = json.dumps([{"op": "delete_line", "anchor": "1:abcd"}])
result = hashline_edit(path="f.txt", edits=edits)
assert "Error" in result
assert "unknown op" in result
def test_crlf_replace_op_no_double_conversion(self, tools, tmp_path):
"""Replace op on a CRLF file should not corrupt \\r\\n in new_content."""
hashline_edit = tools[0]["hashline_edit"]
f = tmp_path / "f.txt"
f.write_bytes(b"aaa\r\nbbb\r\nccc\r\n")
edits = json.dumps([{"op": "replace", "old_content": "aaa", "new_content": "x\r\ny"}])
result = hashline_edit(path="f.txt", edits=edits)
assert "Error" not in result
raw = f.read_bytes()
assert b"\r\r\n" not in raw
assert raw == b"x\r\ny\r\nbbb\r\nccc\r\n"
class TestHashlineEditResponseFormat:
def test_shows_updated_content(self, tools, tmp_path):
"""Response includes updated hashline content."""
hashline_edit = tools[0]["hashline_edit"]
f = tmp_path / "f.txt"
f.write_text("aaa\nbbb\nccc\n")
edits = json.dumps([{"op": "set_line", "anchor": _anchor(2, "bbb"), "content": "BBB"}])
result = hashline_edit(path="f.txt", edits=edits)
# Should show updated content in hashline format
h_new = compute_line_hash("BBB")
assert f"2:{h_new}|BBB" in result
def test_pagination_hint_for_large_files(self, tools, tmp_path):
"""Response includes pagination hint when file > 200 lines."""
hashline_edit = tools[0]["hashline_edit"]
f = tmp_path / "f.txt"
lines = [f"line{i}" for i in range(300)]
f.write_text("\n".join(lines) + "\n")
edits = json.dumps([{"op": "set_line", "anchor": _anchor(1, "line0"), "content": "FIRST"}])
result = hashline_edit(path="f.txt", edits=edits)
assert "Showing first 200" in result
assert "300 lines" in result