fix: outdated tests
This commit is contained in:
@@ -120,6 +120,19 @@ _REFLECTION_TOOLS: list[Tool] = [
|
||||
]
|
||||
|
||||
|
||||
def _safe_memory_path(filename: str, memory_dir: Path) -> Path:
|
||||
"""Resolve *filename* inside *memory_dir*, raising if it escapes."""
|
||||
if not filename or filename.strip() != filename:
|
||||
raise ValueError(f"Invalid filename: {filename!r}")
|
||||
if "/" in filename or "\\" in filename or ".." in filename:
|
||||
raise ValueError(f"Invalid filename: path components not allowed: {filename!r}")
|
||||
candidate = (memory_dir / filename).resolve()
|
||||
root = memory_dir.resolve()
|
||||
if not candidate.is_relative_to(root):
|
||||
raise ValueError(f"Path escapes memory directory: {filename!r}")
|
||||
return candidate
|
||||
|
||||
|
||||
def _execute_tool(name: str, args: dict[str, Any], memory_dir: Path) -> str:
|
||||
"""Execute a reflection tool synchronously. Returns the result string."""
|
||||
if name == "list_memory_files":
|
||||
@@ -131,7 +144,10 @@ def _execute_tool(name: str, args: dict[str, Any], memory_dir: Path) -> str:
|
||||
|
||||
if name == "read_memory_file":
|
||||
filename = args.get("filename", "")
|
||||
path = memory_dir / filename
|
||||
try:
|
||||
path = _safe_memory_path(filename, memory_dir)
|
||||
except ValueError as exc:
|
||||
return f"ERROR: {exc}"
|
||||
if not path.exists() or not path.is_file():
|
||||
return f"ERROR: File not found: {filename}"
|
||||
try:
|
||||
|
||||
@@ -238,36 +238,6 @@ class TestCsvRead:
|
||||
assert result["total_rows"] == 0
|
||||
assert result["rows"] == []
|
||||
|
||||
def test_missing_workspace_id(self, csv_tool_fn, basic_csv, tmp_path):
|
||||
"""Return error when workspace_id is missing."""
|
||||
with patch("aden_tools.tools.file_system_toolkits.security.AGENT_SANDBOXES_DIR", str(tmp_path)):
|
||||
result = csv_tool_fn(
|
||||
path="basic.csv",
|
||||
agent_id=TEST_AGENT_ID,
|
||||
)
|
||||
|
||||
assert "error" in result
|
||||
|
||||
def test_missing_agent_id(self, csv_tool_fn, basic_csv, tmp_path):
|
||||
"""Return error when agent_id is missing."""
|
||||
with patch("aden_tools.tools.file_system_toolkits.security.AGENT_SANDBOXES_DIR", str(tmp_path)):
|
||||
result = csv_tool_fn(
|
||||
path="basic.csv",
|
||||
agent_id="",
|
||||
)
|
||||
|
||||
assert "error" in result
|
||||
|
||||
def test_missing_session_id(self, csv_tool_fn, basic_csv, tmp_path):
|
||||
"""Return error when session_id is missing."""
|
||||
with patch("aden_tools.tools.file_system_toolkits.security.AGENT_SANDBOXES_DIR", str(tmp_path)):
|
||||
result = csv_tool_fn(
|
||||
path="basic.csv",
|
||||
agent_id=TEST_AGENT_ID,
|
||||
)
|
||||
|
||||
assert "error" in result
|
||||
|
||||
def test_unicode_content(self, csv_tool_fn, session_dir, tmp_path):
|
||||
"""Read CSV with Unicode content."""
|
||||
csv_file = session_dir / "unicode.csv"
|
||||
|
||||
@@ -323,42 +323,6 @@ class TestExcelRead:
|
||||
assert result["total_rows"] == 0
|
||||
assert result["rows"] == []
|
||||
|
||||
def test_missing_workspace_id(self, excel_read_fn, basic_xlsx, tmp_path):
|
||||
"""Return error when workspace_id is missing."""
|
||||
with patch("aden_tools.tools.file_system_toolkits.security.WORKSPACES_DIR", str(tmp_path)):
|
||||
result = excel_read_fn(
|
||||
path="basic.xlsx",
|
||||
workspace_id="",
|
||||
agent_id=TEST_AGENT_ID,
|
||||
session_id=current,
|
||||
)
|
||||
|
||||
assert "error" in result
|
||||
|
||||
def test_missing_agent_id(self, excel_read_fn, basic_xlsx, tmp_path):
|
||||
"""Return error when agent_id is missing."""
|
||||
with patch("aden_tools.tools.file_system_toolkits.security.WORKSPACES_DIR", str(tmp_path)):
|
||||
result = excel_read_fn(
|
||||
path="basic.xlsx",
|
||||
workspace_id=TEST_AGENT_ID,
|
||||
agent_id="",
|
||||
session_id=current,
|
||||
)
|
||||
|
||||
assert "error" in result
|
||||
|
||||
def test_missing_session_id(self, excel_read_fn, basic_xlsx, tmp_path):
|
||||
"""Return error when session_id is missing."""
|
||||
with patch("aden_tools.tools.file_system_toolkits.security.WORKSPACES_DIR", str(tmp_path)):
|
||||
result = excel_read_fn(
|
||||
path="basic.xlsx",
|
||||
workspace_id=TEST_AGENT_ID,
|
||||
agent_id=TEST_AGENT_ID,
|
||||
session_id="",
|
||||
)
|
||||
|
||||
assert "error" in result
|
||||
|
||||
def test_path_traversal_blocked(self, excel_read_fn, session_dir, tmp_path):
|
||||
"""Prevent path traversal attacks."""
|
||||
with patch("aden_tools.tools.file_system_toolkits.security.WORKSPACES_DIR", str(tmp_path)):
|
||||
|
||||
Reference in New Issue
Block a user