Merge pull request #5786 from osb910/fix/charmap-decode-error

fix(core): add utf-8 encoding to backend open calls (micro-fix)
This commit is contained in:
Timothy @aden
2026-03-04 08:39:10 -08:00
committed by GitHub
39 changed files with 221 additions and 139 deletions
+1
View File
@@ -79,3 +79,4 @@ core/tests/*dumps/*
screenshots/* screenshots/*
.gemini/*
+1 -1
View File
@@ -10,7 +10,7 @@ def _load_preferred_model() -> str:
config_path = Path.home() / ".hive" / "configuration.json" config_path = Path.home() / ".hive" / "configuration.json"
if config_path.exists(): if config_path.exists():
try: try:
with open(config_path) as f: with open(config_path, encoding="utf-8") as f:
config = json.load(f) config = json.load(f)
llm = config.get("llm", {}) llm = config.get("llm", {})
if llm.get("provider") and llm.get("model"): if llm.get("provider") and llm.get("model"):
@@ -7,11 +7,11 @@ from framework.graph import NodeSpec
# Load reference docs at import time so they're always in the system prompt. # Load reference docs at import time so they're always in the system prompt.
# No voluntary read_file() calls needed — the LLM gets everything upfront. # No voluntary read_file() calls needed — the LLM gets everything upfront.
_ref_dir = Path(__file__).parent.parent / "reference" _ref_dir = Path(__file__).parent.parent / "reference"
_framework_guide = (_ref_dir / "framework_guide.md").read_text() _framework_guide = (_ref_dir / "framework_guide.md").read_text(encoding="utf-8")
_file_templates = (_ref_dir / "file_templates.md").read_text() _file_templates = (_ref_dir / "file_templates.md").read_text(encoding="utf-8")
_anti_patterns = (_ref_dir / "anti_patterns.md").read_text() _anti_patterns = (_ref_dir / "anti_patterns.md").read_text(encoding="utf-8")
_gcu_guide_path = _ref_dir / "gcu_guide.md" _gcu_guide_path = _ref_dir / "gcu_guide.md"
_gcu_guide = _gcu_guide_path.read_text() if _gcu_guide_path.exists() else "" _gcu_guide = _gcu_guide_path.read_text(encoding="utf-8") if _gcu_guide_path.exists() else ""
def _is_gcu_enabled() -> bool: def _is_gcu_enabled() -> bool:
+2 -2
View File
@@ -660,7 +660,7 @@ class GraphBuilder:
# Generate Python code # Generate Python code
code = self._generate_code(graph) code = self._generate_code(graph)
Path(path).write_text(code) Path(path).write_text(code, encoding="utf-8")
self.session.phase = BuildPhase.EXPORTED self.session.phase = BuildPhase.EXPORTED
self._save_session() self._save_session()
@@ -754,7 +754,7 @@ class GraphBuilder:
"""Save session to disk.""" """Save session to disk."""
self.session.updated_at = datetime.now() self.session.updated_at = datetime.now()
path = self.storage_path / f"{self.session.id}.json" path = self.storage_path / f"{self.session.id}.json"
path.write_text(self.session.model_dump_json(indent=2)) path.write_text(self.session.model_dump_json(indent=2), encoding="utf-8")
def _load_session(self, session_id: str) -> BuildSession: def _load_session(self, session_id: str) -> BuildSession:
"""Load session from disk.""" """Load session from disk."""
+1 -1
View File
@@ -69,7 +69,7 @@ def save_credential_key(key: str) -> Path:
# Restrict the secrets directory itself # Restrict the secrets directory itself
path.parent.chmod(stat.S_IRWXU) # 0o700 path.parent.chmod(stat.S_IRWXU) # 0o700
path.write_text(key) path.write_text(key, encoding="utf-8")
path.chmod(stat.S_IRUSR | stat.S_IWUSR) # 0o600 path.chmod(stat.S_IRUSR | stat.S_IWUSR) # 0o600
os.environ[CREDENTIAL_KEY_ENV_VAR] = key os.environ[CREDENTIAL_KEY_ENV_VAR] = key
+1 -1
View File
@@ -568,7 +568,7 @@ def _load_nodes_from_python_agent(agent_path: Path) -> list:
def _load_nodes_from_json_agent(agent_json: Path) -> list: def _load_nodes_from_json_agent(agent_json: Path) -> list:
"""Load nodes from a JSON-based agent.""" """Load nodes from a JSON-based agent."""
try: try:
with open(agent_json) as f: with open(agent_json, encoding="utf-8") as f:
data = json.load(f) data = json.load(f)
from framework.graph import NodeSpec from framework.graph import NodeSpec
+3 -3
View File
@@ -227,7 +227,7 @@ class EncryptedFileStorage(CredentialStorage):
index_path = self.base_path / "metadata" / "index.json" index_path = self.base_path / "metadata" / "index.json"
if not index_path.exists(): if not index_path.exists():
return [] return []
with open(index_path) as f: with open(index_path, encoding="utf-8") as f:
index = json.load(f) index = json.load(f)
return list(index.get("credentials", {}).keys()) return list(index.get("credentials", {}).keys())
@@ -268,7 +268,7 @@ class EncryptedFileStorage(CredentialStorage):
index_path = self.base_path / "metadata" / "index.json" index_path = self.base_path / "metadata" / "index.json"
if index_path.exists(): if index_path.exists():
with open(index_path) as f: with open(index_path, encoding="utf-8") as f:
index = json.load(f) index = json.load(f)
else: else:
index = {"credentials": {}, "version": "1.0"} index = {"credentials": {}, "version": "1.0"}
@@ -283,7 +283,7 @@ class EncryptedFileStorage(CredentialStorage):
index["last_modified"] = datetime.now(UTC).isoformat() index["last_modified"] = datetime.now(UTC).isoformat()
with open(index_path, "w") as f: with open(index_path, "w", encoding="utf-8") as f:
json.dump(index, f, indent=2) json.dump(index, f, indent=2)
+1 -1
View File
@@ -170,7 +170,7 @@ def _dump_failed_request(
"temperature": kwargs.get("temperature"), "temperature": kwargs.get("temperature"),
} }
with open(filepath, "w") as f: with open(filepath, "w", encoding="utf-8") as f:
json.dump(dump_data, f, indent=2, default=str) json.dump(dump_data, f, indent=2, default=str)
return str(filepath) return str(filepath)
+7 -5
View File
@@ -162,7 +162,7 @@ def _load_session(session_id: str) -> BuildSession:
if not session_file.exists(): if not session_file.exists():
raise ValueError(f"Session '{session_id}' not found") raise ValueError(f"Session '{session_id}' not found")
with open(session_file) as f: with open(session_file, encoding="utf-8") as f:
data = json.load(f) data = json.load(f)
return BuildSession.from_dict(data) return BuildSession.from_dict(data)
@@ -174,7 +174,7 @@ def _load_active_session() -> BuildSession | None:
return None return None
try: try:
with open(ACTIVE_SESSION_FILE) as f: with open(ACTIVE_SESSION_FILE, encoding="utf-8") as f:
session_id = f.read().strip() session_id = f.read().strip()
if session_id: if session_id:
@@ -228,7 +228,7 @@ def list_sessions() -> str:
if SESSIONS_DIR.exists(): if SESSIONS_DIR.exists():
for session_file in SESSIONS_DIR.glob("*.json"): for session_file in SESSIONS_DIR.glob("*.json"):
try: try:
with open(session_file) as f: with open(session_file, encoding="utf-8") as f:
data = json.load(f) data = json.load(f)
sessions.append( sessions.append(
{ {
@@ -248,7 +248,7 @@ def list_sessions() -> str:
active_id = None active_id = None
if ACTIVE_SESSION_FILE.exists(): if ACTIVE_SESSION_FILE.exists():
try: try:
with open(ACTIVE_SESSION_FILE) as f: with open(ACTIVE_SESSION_FILE, encoding="utf-8") as f:
active_id = f.read().strip() active_id = f.read().strip()
except Exception: except Exception:
pass pass
@@ -310,7 +310,7 @@ def delete_session(session_id: Annotated[str, "ID of the session to delete"]) ->
_session = None _session = None
if ACTIVE_SESSION_FILE.exists(): if ACTIVE_SESSION_FILE.exists():
with open(ACTIVE_SESSION_FILE) as f: with open(ACTIVE_SESSION_FILE, encoding="utf-8") as f:
active_id = f.read().strip() active_id = f.read().strip()
if active_id == session_id: if active_id == session_id:
ACTIVE_SESSION_FILE.unlink() ACTIVE_SESSION_FILE.unlink()
@@ -2894,6 +2894,7 @@ def run_tests(
try: try:
result = subprocess.run( result = subprocess.run(
cmd, cmd,
encoding="utf-8",
capture_output=True, capture_output=True,
text=True, text=True,
timeout=600, # 10 minute timeout timeout=600, # 10 minute timeout
@@ -3085,6 +3086,7 @@ def debug_test(
try: try:
result = subprocess.run( result = subprocess.run(
cmd, cmd,
encoding="utf-8",
capture_output=True, capture_output=True,
text=True, text=True,
timeout=120, # 2 minute timeout for single test timeout=120, # 2 minute timeout for single test
+15 -5
View File
@@ -517,7 +517,7 @@ def cmd_run(args: argparse.Namespace) -> int:
return 1 return 1
elif args.input_file: elif args.input_file:
try: try:
with open(args.input_file) as f: with open(args.input_file, encoding="utf-8") as f:
context = json.load(f) context = json.load(f)
except (FileNotFoundError, json.JSONDecodeError) as e: except (FileNotFoundError, json.JSONDecodeError) as e:
print(f"Error reading input file: {e}", file=sys.stderr) print(f"Error reading input file: {e}", file=sys.stderr)
@@ -659,7 +659,7 @@ def cmd_run(args: argparse.Namespace) -> int:
# Output results # Output results
if args.output: if args.output:
with open(args.output, "w") as f: with open(args.output, "w", encoding="utf-8") as f:
json.dump(output, f, indent=2, default=str) json.dump(output, f, indent=2, default=str)
if not args.quiet: if not args.quiet:
print(f"Results written to {args.output}") print(f"Results written to {args.output}")
@@ -1517,7 +1517,7 @@ def _extract_python_agent_metadata(agent_path: Path) -> tuple[str, str]:
return fallback_name, fallback_desc return fallback_name, fallback_desc
try: try:
with open(config_path) as f: with open(config_path, encoding="utf-8") as f:
tree = ast.parse(f.read()) tree = ast.parse(f.read())
# Find AgentMetadata class definition # Find AgentMetadata class definition
@@ -1932,10 +1932,18 @@ def _open_browser(url: str) -> None:
try: try:
if sys.platform == "darwin": if sys.platform == "darwin":
subprocess.Popen(["open", url], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) subprocess.Popen(
["open", url],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
encoding="utf-8",
)
elif sys.platform == "linux": elif sys.platform == "linux":
subprocess.Popen( subprocess.Popen(
["xdg-open", url], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL ["xdg-open", url],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
encoding="utf-8",
) )
except Exception: except Exception:
pass # Best-effort — don't crash if browser can't open pass # Best-effort — don't crash if browser can't open
@@ -1980,12 +1988,14 @@ def _build_frontend() -> bool:
# Ensure deps are installed # Ensure deps are installed
subprocess.run( subprocess.run(
["npm", "install", "--no-fund", "--no-audit"], ["npm", "install", "--no-fund", "--no-audit"],
encoding="utf-8",
cwd=frontend_dir, cwd=frontend_dir,
check=True, check=True,
capture_output=True, capture_output=True,
) )
subprocess.run( subprocess.run(
["npm", "run", "build"], ["npm", "run", "build"],
encoding="utf-8",
cwd=frontend_dir, cwd=frontend_dir,
check=True, check=True,
capture_output=True, capture_output=True,
+7 -7
View File
@@ -96,7 +96,7 @@ def _save_refreshed_credentials(token_data: dict) -> None:
return return
try: try:
with open(CLAUDE_CREDENTIALS_FILE) as f: with open(CLAUDE_CREDENTIALS_FILE, encoding="utf-8") as f:
creds = json.load(f) creds = json.load(f)
oauth = creds.get("claudeAiOauth", {}) oauth = creds.get("claudeAiOauth", {})
@@ -107,7 +107,7 @@ def _save_refreshed_credentials(token_data: dict) -> None:
oauth["expiresAt"] = int((time.time() + token_data["expires_in"]) * 1000) oauth["expiresAt"] = int((time.time() + token_data["expires_in"]) * 1000)
creds["claudeAiOauth"] = oauth creds["claudeAiOauth"] = oauth
with open(CLAUDE_CREDENTIALS_FILE, "w") as f: with open(CLAUDE_CREDENTIALS_FILE, "w", encoding="utf-8") as f:
json.dump(creds, f, indent=2) json.dump(creds, f, indent=2)
logger.debug("Claude Code credentials refreshed successfully") logger.debug("Claude Code credentials refreshed successfully")
except (json.JSONDecodeError, OSError, KeyError) as exc: except (json.JSONDecodeError, OSError, KeyError) as exc:
@@ -132,7 +132,7 @@ def get_claude_code_token() -> str | None:
return None return None
try: try:
with open(CLAUDE_CREDENTIALS_FILE) as f: with open(CLAUDE_CREDENTIALS_FILE, encoding="utf-8") as f:
creds = json.load(f) creds = json.load(f)
except (json.JSONDecodeError, OSError): except (json.JSONDecodeError, OSError):
return None return None
@@ -212,7 +212,7 @@ def _read_codex_keychain() -> dict | None:
"-w", "-w",
], ],
capture_output=True, capture_output=True,
text=True, encoding="utf-8",
timeout=5, timeout=5,
) )
if result.returncode != 0: if result.returncode != 0:
@@ -231,7 +231,7 @@ def _read_codex_auth_file() -> dict | None:
if not CODEX_AUTH_FILE.exists(): if not CODEX_AUTH_FILE.exists():
return None return None
try: try:
with open(CODEX_AUTH_FILE) as f: with open(CODEX_AUTH_FILE, encoding="utf-8") as f:
return json.load(f) return json.load(f)
except (json.JSONDecodeError, OSError): except (json.JSONDecodeError, OSError):
return None return None
@@ -324,7 +324,7 @@ def _save_refreshed_codex_credentials(auth_data: dict, token_data: dict) -> None
CODEX_AUTH_FILE.parent.mkdir(parents=True, exist_ok=True, mode=0o700) CODEX_AUTH_FILE.parent.mkdir(parents=True, exist_ok=True, mode=0o700)
fd = os.open(CODEX_AUTH_FILE, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600) fd = os.open(CODEX_AUTH_FILE, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600)
with os.fdopen(fd, "w") as f: with os.fdopen(fd, "w", encoding="utf-8") as f:
json.dump(auth_data, f, indent=2) json.dump(auth_data, f, indent=2)
logger.debug("Codex credentials refreshed successfully") logger.debug("Codex credentials refreshed successfully")
except (OSError, KeyError) as exc: except (OSError, KeyError) as exc:
@@ -869,7 +869,7 @@ class AgentRunner:
if not agent_json_path.exists(): if not agent_json_path.exists():
raise FileNotFoundError(f"No agent.py or agent.json found in {agent_path}") raise FileNotFoundError(f"No agent.py or agent.json found in {agent_path}")
with open(agent_json_path) as f: with open(agent_json_path, encoding="utf-8") as f:
graph, goal = load_agent_export(f.read()) graph, goal = load_agent_export(f.read())
return cls( return cls(
+1 -1
View File
@@ -340,7 +340,7 @@ class ToolRegistry:
self._mcp_config_path = Path(config_path) self._mcp_config_path = Path(config_path)
try: try:
with open(config_path) as f: with open(config_path, encoding="utf-8") as f:
config = json.load(f) config = json.load(f)
except Exception as e: except Exception as e:
logger.warning(f"Failed to load MCP config from {config_path}: {e}") logger.warning(f"Failed to load MCP config from {config_path}: {e}")
+2 -2
View File
@@ -270,10 +270,10 @@ def _edit_test_code(code: str) -> str:
try: try:
# Open editor # Open editor
subprocess.run([editor, temp_path], check=True) subprocess.run([editor, temp_path], check=True, encoding="utf-8")
# Read edited code # Read edited code
with open(temp_path) as f: with open(temp_path, encoding="utf-8") as f:
return f.read() return f.read()
except subprocess.CalledProcessError: except subprocess.CalledProcessError:
print("Editor failed, keeping original code") print("Editor failed, keeping original code")
+2
View File
@@ -190,6 +190,7 @@ def cmd_test_run(args: argparse.Namespace) -> int:
try: try:
result = subprocess.run( result = subprocess.run(
cmd, cmd,
encoding="utf-8",
env=env, env=env,
timeout=600, # 10 minute timeout timeout=600, # 10 minute timeout
) )
@@ -248,6 +249,7 @@ def cmd_test_debug(args: argparse.Namespace) -> int:
try: try:
result = subprocess.run( result = subprocess.run(
cmd, cmd,
encoding="utf-8",
env=env, env=env,
timeout=120, # 2 minute timeout for single test timeout=120, # 2 minute timeout for single test
) )
+1 -1
View File
@@ -256,7 +256,7 @@ class AdenTUI(App):
"""Override to use native `open` for file:// URLs on macOS.""" """Override to use native `open` for file:// URLs on macOS."""
if url.startswith("file://") and platform.system() == "Darwin": if url.startswith("file://") and platform.system() == "Darwin":
path = url.removeprefix("file://") path = url.removeprefix("file://")
subprocess.Popen(["open", path]) subprocess.Popen(["open", path], encoding="utf-8")
else: else:
super().open_url(url, new_tab=new_tab) super().open_url(url, new_tab=new_tab)
+6 -6
View File
@@ -488,7 +488,7 @@ class ChatRepl(Vertical):
if not state_file.exists(): if not state_file.exists():
continue continue
with open(state_file) as f: with open(state_file, encoding="utf-8") as f:
state = json.load(f) state = json.load(f)
status = state.get("status", "").lower() status = state.get("status", "").lower()
@@ -547,7 +547,7 @@ class ChatRepl(Vertical):
# Read session state # Read session state
try: try:
with open(state_file) as f: with open(state_file, encoding="utf-8") as f:
state = json.load(f) state = json.load(f)
# Track this session for /resume <number> lookup # Track this session for /resume <number> lookup
@@ -599,7 +599,7 @@ class ChatRepl(Vertical):
try: try:
import json import json
with open(state_file) as f: with open(state_file, encoding="utf-8") as f:
state = json.load(f) state = json.load(f)
# Basic info # Basic info
@@ -640,7 +640,7 @@ class ChatRepl(Vertical):
# Load and show checkpoints # Load and show checkpoints
for i, cp_file in enumerate(checkpoint_files[-5:], 1): # Last 5 for i, cp_file in enumerate(checkpoint_files[-5:], 1): # Last 5
try: try:
with open(cp_file) as f: with open(cp_file, encoding="utf-8") as f:
cp_data = json.load(f) cp_data = json.load(f)
cp_id = cp_data.get("checkpoint_id", cp_file.stem) cp_id = cp_data.get("checkpoint_id", cp_file.stem)
@@ -687,7 +687,7 @@ class ChatRepl(Vertical):
import json import json
with open(state_file) as f: with open(state_file, encoding="utf-8") as f:
state = json.load(f) state = json.load(f)
# Resume from session state (not checkpoint) # Resume from session state (not checkpoint)
@@ -1102,7 +1102,7 @@ class ChatRepl(Vertical):
continue continue
try: try:
with open(state_file) as f: with open(state_file, encoding="utf-8") as f:
state = json.load(f) state = json.load(f)
status = state.get("status", "").lower() status = state.get("status", "").lower()
@@ -38,6 +38,7 @@ def _linux_file_dialog() -> subprocess.CompletedProcess | None:
"--title=Select a PDF file", "--title=Select a PDF file",
"--file-filter=PDF files (*.pdf)|*.pdf", "--file-filter=PDF files (*.pdf)|*.pdf",
], ],
encoding="utf-8",
capture_output=True, capture_output=True,
text=True, text=True,
timeout=300, timeout=300,
@@ -54,6 +55,7 @@ def _linux_file_dialog() -> subprocess.CompletedProcess | None:
".", ".",
"PDF files (*.pdf)", "PDF files (*.pdf)",
], ],
encoding="utf-8",
capture_output=True, capture_output=True,
text=True, text=True,
timeout=300, timeout=300,
@@ -79,6 +81,7 @@ def _pick_pdf_subprocess() -> Path | None:
'POSIX path of (choose file of type {"com.adobe.pdf"} ' 'POSIX path of (choose file of type {"com.adobe.pdf"} '
'with prompt "Select a PDF file")', 'with prompt "Select a PDF file")',
], ],
encoding="utf-8",
capture_output=True, capture_output=True,
text=True, text=True,
timeout=300, timeout=300,
@@ -93,6 +96,7 @@ def _pick_pdf_subprocess() -> Path | None:
) )
result = subprocess.run( result = subprocess.run(
["powershell", "-NoProfile", "-Command", ps_script], ["powershell", "-NoProfile", "-Command", ps_script],
encoding="utf-8",
capture_output=True, capture_output=True,
text=True, text=True,
timeout=300, timeout=300,
@@ -199,10 +199,11 @@ def _copy_to_clipboard(text: str) -> None:
"""Copy text to system clipboard using platform-native tools.""" """Copy text to system clipboard using platform-native tools."""
try: try:
if sys.platform == "darwin": if sys.platform == "darwin":
subprocess.run(["pbcopy"], input=text.encode(), check=True, timeout=5) subprocess.run(["pbcopy"], encoding="utf-8", input=text.encode(), check=True, timeout=5)
elif sys.platform == "win32": elif sys.platform == "win32":
subprocess.run( subprocess.run(
["clip.exe"], ["clip.exe"],
encoding="utf-8",
input=text.encode("utf-16le"), input=text.encode("utf-16le"),
check=True, check=True,
timeout=5, timeout=5,
@@ -211,6 +212,7 @@ def _copy_to_clipboard(text: str) -> None:
try: try:
subprocess.run( subprocess.run(
["xclip", "-selection", "clipboard"], ["xclip", "-selection", "clipboard"],
encoding="utf-8",
input=text.encode(), input=text.encode(),
check=True, check=True,
timeout=5, timeout=5,
@@ -218,6 +220,7 @@ def _copy_to_clipboard(text: str) -> None:
except (subprocess.SubprocessError, FileNotFoundError): except (subprocess.SubprocessError, FileNotFoundError):
subprocess.run( subprocess.run(
["xsel", "--clipboard", "--input"], ["xsel", "--clipboard", "--input"],
encoding="utf-8",
input=text.encode(), input=text.encode(),
check=True, check=True,
timeout=5, timeout=5,
+10 -3
View File
@@ -53,7 +53,13 @@ def log_error(message: str):
def run_command(cmd: list, error_msg: str) -> bool: def run_command(cmd: list, error_msg: str) -> bool:
"""Run a command and return success status.""" """Run a command and return success status."""
try: try:
subprocess.run(cmd, check=True, capture_output=True, text=True) subprocess.run(
cmd,
check=True,
capture_output=True,
text=True,
encoding="utf-8",
)
return True return True
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
log_error(error_msg) log_error(error_msg)
@@ -97,7 +103,7 @@ def main():
if mcp_config_path.exists(): if mcp_config_path.exists():
log_success("MCP configuration found at .mcp.json") log_success("MCP configuration found at .mcp.json")
logger.info("Configuration:") logger.info("Configuration:")
with open(mcp_config_path) as f: with open(mcp_config_path, encoding="utf-8") as f:
config = json.load(f) config = json.load(f)
logger.info(json.dumps(config, indent=2)) logger.info(json.dumps(config, indent=2))
else: else:
@@ -114,7 +120,7 @@ def main():
} }
} }
with open(mcp_config_path, "w") as f: with open(mcp_config_path, "w", encoding="utf-8") as f:
json.dump(config, f, indent=2) json.dump(config, f, indent=2)
log_success("Created .mcp.json") log_success("Created .mcp.json")
@@ -129,6 +135,7 @@ def main():
check=True, check=True,
capture_output=True, capture_output=True,
text=True, text=True,
encoding="utf-8",
) )
log_success("MCP server module verified") log_success("MCP server module verified")
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
+5
View File
@@ -68,6 +68,7 @@ class TestFrameworkModule:
[sys.executable, "-m", "framework", "--help"], [sys.executable, "-m", "framework", "--help"],
capture_output=True, capture_output=True,
text=True, text=True,
encoding="utf-8",
cwd=str(project_root / "core"), cwd=str(project_root / "core"),
) )
assert result.returncode == 0 assert result.returncode == 0
@@ -79,6 +80,7 @@ class TestFrameworkModule:
[sys.executable, "-m", "framework", "list", "--help"], [sys.executable, "-m", "framework", "list", "--help"],
capture_output=True, capture_output=True,
text=True, text=True,
encoding="utf-8",
cwd=str(project_root / "core"), cwd=str(project_root / "core"),
) )
assert result.returncode == 0 assert result.returncode == 0
@@ -104,6 +106,7 @@ class TestHiveEntryPoint:
["hive", "--help"], ["hive", "--help"],
capture_output=True, capture_output=True,
text=True, text=True,
encoding="utf-8",
) )
assert result.returncode == 0 assert result.returncode == 0
assert "run" in result.stdout.lower() assert "run" in result.stdout.lower()
@@ -115,6 +118,7 @@ class TestHiveEntryPoint:
["hive", "list", "--help"], ["hive", "list", "--help"],
capture_output=True, capture_output=True,
text=True, text=True,
encoding="utf-8",
) )
assert result.returncode == 0 assert result.returncode == 0
@@ -124,5 +128,6 @@ class TestHiveEntryPoint:
["hive", "run", "nonexistent_agent_xyz"], ["hive", "run", "nonexistent_agent_xyz"],
capture_output=True, capture_output=True,
text=True, text=True,
encoding="utf-8",
) )
assert result.returncode != 0 assert result.returncode != 0
+2 -2
View File
@@ -232,7 +232,7 @@ async def test_shared_session_reuses_directory_and_memory(tmp_path):
# Verify primary session's state.json exists and has the primary entry_point # Verify primary session's state.json exists and has the primary entry_point
primary_state_path = tmp_path / "sessions" / primary_exec_id / "state.json" primary_state_path = tmp_path / "sessions" / primary_exec_id / "state.json"
assert primary_state_path.exists() assert primary_state_path.exists()
primary_state = json.loads(primary_state_path.read_text()) primary_state = json.loads(primary_state_path.read_text(encoding="utf-8"))
assert primary_state["entry_point"] == "primary" assert primary_state["entry_point"] == "primary"
# Async stream — simulates a webhook entry point sharing the session # Async stream — simulates a webhook entry point sharing the session
@@ -275,7 +275,7 @@ async def test_shared_session_reuses_directory_and_memory(tmp_path):
# State.json should NOT have been overwritten by the async execution # State.json should NOT have been overwritten by the async execution
# (it should still show the primary entry point) # (it should still show the primary entry point)
final_state = json.loads(primary_state_path.read_text()) final_state = json.loads(primary_state_path.read_text(encoding="utf-8"))
assert final_state["entry_point"] == "primary" assert final_state["entry_point"] == "primary"
# Verify only ONE session directory exists (not two) # Verify only ONE session directory exists (not two)
+2 -2
View File
@@ -184,7 +184,7 @@ class TestPathTraversalWithActualFiles:
# Create a secret file outside storage # Create a secret file outside storage
secret_file = tmpdir_path / "secret.txt" secret_file = tmpdir_path / "secret.txt"
secret_file.write_text("SENSITIVE_DATA") secret_file.write_text("SENSITIVE_DATA", encoding="utf-8")
storage = FileStorage(storage_dir) storage = FileStorage(storage_dir)
@@ -193,7 +193,7 @@ class TestPathTraversalWithActualFiles:
storage.get_runs_by_goal("../secret") storage.get_runs_by_goal("../secret")
# Verify the secret file was not accessed (still contains original data) # Verify the secret file was not accessed (still contains original data)
assert secret_file.read_text() == "SENSITIVE_DATA" assert secret_file.read_text(encoding="utf-8") == "SENSITIVE_DATA"
def test_cannot_write_outside_storage(self): def test_cannot_write_outside_storage(self):
"""Verify that we can't write files outside storage directory.""" """Verify that we can't write files outside storage directory."""
+5 -2
View File
@@ -353,7 +353,9 @@ class TestRuntimeLogger:
# Verify the file exists and has one line # Verify the file exists and has one line
jsonl_path = tmp_path / "logs" / "sessions" / run_id / "logs" / "tool_logs.jsonl" jsonl_path = tmp_path / "logs" / "sessions" / run_id / "logs" / "tool_logs.jsonl"
assert jsonl_path.exists() assert jsonl_path.exists()
lines = [line for line in jsonl_path.read_text().strip().split("\n") if line] lines = [
line for line in jsonl_path.read_text(encoding="utf-8").strip().split("\n") if line
]
assert len(lines) == 1 assert len(lines) == 1
data = json.loads(lines[0]) data = json.loads(lines[0])
@@ -376,7 +378,8 @@ class TestRuntimeLogger:
jsonl_path = tmp_path / "logs" / "sessions" / run_id / "logs" / "details.jsonl" jsonl_path = tmp_path / "logs" / "sessions" / run_id / "logs" / "details.jsonl"
assert jsonl_path.exists() assert jsonl_path.exists()
lines = [line for line in jsonl_path.read_text().strip().split("\n") if line] content = jsonl_path.read_text(encoding="utf-8").strip()
lines = [line for line in content.split("\n") if line]
assert len(lines) == 1 assert len(lines) == 1
data = json.loads(lines[0]) data = json.loads(lines[0])
+1 -1
View File
@@ -98,7 +98,7 @@ class TestFileStorageRunOperations:
assert run_file.exists() assert run_file.exists()
# Verify it's valid JSON # Verify it's valid JSON
with open(run_file) as f: with open(run_file, encoding="utf-8") as f:
data = json.load(f) data = json.load(f)
assert data["id"] == "my_run" assert data["id"] == "my_run"
+14 -3
View File
@@ -71,6 +71,7 @@ def main():
capture_output=True, capture_output=True,
text=True, text=True,
check=True, check=True,
encoding="utf-8",
) )
framework_path = result.stdout.strip() framework_path = result.stdout.strip()
success(f"installed at {framework_path}") success(f"installed at {framework_path}")
@@ -84,7 +85,12 @@ def main():
missing_deps = [] missing_deps = []
for dep in ["mcp", "fastmcp"]: for dep in ["mcp", "fastmcp"]:
try: try:
subprocess.run([sys.executable, "-c", f"import {dep}"], capture_output=True, check=True) subprocess.run(
[sys.executable, "-c", f"import {dep}"],
capture_output=True,
check=True,
encoding="utf-8",
)
except subprocess.CalledProcessError: except subprocess.CalledProcessError:
missing_deps.append(dep) missing_deps.append(dep)
@@ -103,6 +109,7 @@ def main():
capture_output=True, capture_output=True,
text=True, text=True,
check=True, check=True,
encoding="utf-8",
) )
success("loads successfully") success("loads successfully")
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
@@ -115,7 +122,7 @@ def main():
mcp_config = script_dir / ".mcp.json" mcp_config = script_dir / ".mcp.json"
if mcp_config.exists(): if mcp_config.exists():
try: try:
with open(mcp_config) as f: with open(mcp_config, encoding="utf-8") as f:
config = json.load(f) config = json.load(f)
if "mcpServers" in config and "agent-builder" in config["mcpServers"]: if "mcpServers" in config and "agent-builder" in config["mcpServers"]:
@@ -149,7 +156,10 @@ def main():
for module in modules_to_check: for module in modules_to_check:
try: try:
subprocess.run( subprocess.run(
[sys.executable, "-c", f"import {module}"], capture_output=True, check=True [sys.executable, "-c", f"import {module}"],
capture_output=True,
check=True,
encoding="utf-8",
) )
except subprocess.CalledProcessError: except subprocess.CalledProcessError:
failed_modules.append(module) failed_modules.append(module)
@@ -174,6 +184,7 @@ def main():
text=True, text=True,
check=True, check=True,
timeout=5, timeout=5,
encoding="utf-8",
) )
if "OK" in result.stdout: if "OK" in result.stdout:
success("server can start") success("server can start")
+2
View File
@@ -20,6 +20,7 @@ def test_check_requirements():
[sys.executable, "scripts/check_requirements.py", "json", "sys", "os"], [sys.executable, "scripts/check_requirements.py", "json", "sys", "os"],
capture_output=True, capture_output=True,
text=True, text=True,
encoding="utf-8",
) )
print(f"Exit code: {result.returncode}") print(f"Exit code: {result.returncode}")
print(f"Output:\n{result.stdout}") print(f"Output:\n{result.stdout}")
@@ -39,6 +40,7 @@ def test_check_requirements():
[sys.executable, "scripts/check_requirements.py", "json", "nonexistent_module"], [sys.executable, "scripts/check_requirements.py", "json", "nonexistent_module"],
capture_output=True, capture_output=True,
text=True, text=True,
encoding="utf-8",
) )
print(f"Exit code: {result.returncode}") print(f"Exit code: {result.returncode}")
print(f"Output:\n{result.stdout}") print(f"Output:\n{result.stdout}")
+11 -1
View File
@@ -90,7 +90,13 @@ def _resolve_path(path: str) -> str:
def _snapshot_git(*args: str) -> str: def _snapshot_git(*args: str) -> str:
"""Run a git command with the snapshot GIT_DIR and PROJECT_ROOT worktree.""" """Run a git command with the snapshot GIT_DIR and PROJECT_ROOT worktree."""
cmd = ["git", "--git-dir", SNAPSHOT_DIR, "--work-tree", PROJECT_ROOT, *args] cmd = ["git", "--git-dir", SNAPSHOT_DIR, "--work-tree", PROJECT_ROOT, *args]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=30,
encoding="utf-8",
)
return result.stdout.strip() return result.stdout.strip()
@@ -104,6 +110,7 @@ def _ensure_snapshot_repo():
["git", "init", "--bare", SNAPSHOT_DIR], ["git", "init", "--bare", SNAPSHOT_DIR],
capture_output=True, capture_output=True,
timeout=10, timeout=10,
encoding="utf-8",
) )
_snapshot_git("config", "core.autocrlf", "false") _snapshot_git("config", "core.autocrlf", "false")
@@ -152,6 +159,7 @@ def run_command(command: str, cwd: str = "", timeout: int = 120) -> str:
capture_output=True, capture_output=True,
text=True, text=True,
timeout=timeout, timeout=timeout,
encoding="utf-8",
env={ env={
**os.environ, **os.environ,
"PYTHONPATH": ( "PYTHONPATH": (
@@ -228,6 +236,7 @@ def undo_changes(path: str = "") -> str:
capture_output=True, capture_output=True,
text=True, text=True,
timeout=10, timeout=10,
encoding="utf-8",
) )
return f"Restored: {path}" return f"Restored: {path}"
else: else:
@@ -1021,6 +1030,7 @@ def run_agent_tests(
text=True, text=True,
timeout=120, timeout=120,
env=env, env=env,
encoding="utf-8",
) )
except subprocess.TimeoutExpired: except subprocess.TimeoutExpired:
return json.dumps( return json.dumps(
@@ -40,6 +40,7 @@ def open_browser(url: str) -> tuple[bool, str]:
["open", url], ["open", url],
check=True, check=True,
capture_output=True, capture_output=True,
encoding="utf-8",
) )
return True, "Opened in browser" return True, "Opened in browser"
@@ -50,6 +51,7 @@ def open_browser(url: str) -> tuple[bool, str]:
["xdg-open", url], ["xdg-open", url],
check=True, check=True,
capture_output=True, capture_output=True,
encoding="utf-8",
) )
return True, "Opened in browser" return True, "Opened in browser"
except FileNotFoundError: except FileNotFoundError:
@@ -84,7 +84,7 @@ def check_env_var_in_shell_config(
if not config_path.exists(): if not config_path.exists():
return False, None return False, None
content = config_path.read_text() content = config_path.read_text(encoding="utf-8")
# Look for export ENV_VAR=value or export ENV_VAR="value" # Look for export ENV_VAR=value or export ENV_VAR="value"
pattern = rf"^export\s+{re.escape(env_var)}=(.+)$" pattern = rf"^export\s+{re.escape(env_var)}=(.+)$"
@@ -130,7 +130,7 @@ def add_env_var_to_shell_config(
try: try:
if config_path.exists(): if config_path.exists():
content = config_path.read_text() content = config_path.read_text(encoding="utf-8")
# Check if already exists # Check if already exists
pattern = rf"^export\s+{re.escape(env_var)}=.*$" pattern = rf"^export\s+{re.escape(env_var)}=.*$"
@@ -142,11 +142,11 @@ def add_env_var_to_shell_config(
content, content,
flags=re.MULTILINE, flags=re.MULTILINE,
) )
config_path.write_text(new_content) config_path.write_text(new_content, encoding="utf-8")
return True, str(config_path) return True, str(config_path)
# Append to file # Append to file
with open(config_path, "a") as f: with open(config_path, "a", encoding="utf-8") as f:
f.write(f"\n# {comment}\n") f.write(f"\n# {comment}\n")
f.write(f"{export_line}\n") f.write(f"{export_line}\n")
@@ -178,7 +178,7 @@ def remove_env_var_from_shell_config(
return True, "Config file does not exist" return True, "Config file does not exist"
try: try:
content = config_path.read_text() content = config_path.read_text(encoding="utf-8")
lines = content.split("\n") lines = content.split("\n")
new_lines = [] new_lines = []
@@ -206,7 +206,7 @@ def remove_env_var_from_shell_config(
new_lines.append(line) new_lines.append(line)
config_path.write_text("\n".join(new_lines)) config_path.write_text("\n".join(new_lines), encoding="utf-8")
return True, str(config_path) return True, str(config_path)
except PermissionError: except PermissionError:
+7 -1
View File
@@ -501,7 +501,13 @@ def register_file_tools(
cmd.extend(["--glob", include]) cmd.extend(["--glob", include])
cmd.append(resolved) cmd.append(resolved)
rg_result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) rg_result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=30,
encoding="utf-8",
)
if rg_result.returncode <= 1: if rg_result.returncode <= 1:
output = rg_result.stdout.strip() output = rg_result.stdout.strip()
if not output: if not output:
@@ -48,7 +48,13 @@ def register_tools(mcp: FastMCP) -> None:
secure_cwd = session_root secure_cwd = session_root
result = subprocess.run( result = subprocess.run(
command, shell=True, cwd=secure_cwd, capture_output=True, text=True, timeout=60 command,
shell=True,
cwd=secure_cwd,
capture_output=True,
text=True,
timeout=60,
encoding="utf-8",
) )
return { return {
+2 -2
View File
@@ -70,7 +70,7 @@ def allocate_port(profile: str, storage_path: Path | None = None) -> int:
# Check for stored port # Check for stored port
if port_file and port_file.exists(): if port_file and port_file.exists():
try: try:
stored_port = int(port_file.read_text().strip()) stored_port = int(port_file.read_text(encoding="utf-8").strip())
if CDP_PORT_MIN <= stored_port <= CDP_PORT_MAX: if CDP_PORT_MIN <= stored_port <= CDP_PORT_MAX:
if _is_port_available(stored_port): if _is_port_available(stored_port):
_allocated_ports.add(stored_port) _allocated_ports.add(stored_port)
@@ -87,7 +87,7 @@ def allocate_port(profile: str, storage_path: Path | None = None) -> int:
# Persist port assignment # Persist port assignment
if port_file: if port_file:
try: try:
port_file.write_text(str(port)) port_file.write_text(str(port), encoding="utf-8")
except OSError as e: except OSError as e:
logger.warning(f"Failed to save port to file: {e}") logger.warning(f"Failed to save port to file: {e}")
return port return port
+2 -2
View File
@@ -53,7 +53,7 @@ def _discover_tool_modules() -> list[tuple[str, str]]:
continue continue
if item.is_dir() and (item / "__init__.py").exists(): if item.is_dir() and (item / "__init__.py").exists():
init_text = (item / "__init__.py").read_text() init_text = (item / "__init__.py").read_text(encoding="utf-8")
if "register_tools" in init_text: if "register_tools" in init_text:
# Direct tool package (e.g., web_search_tool, email_tool) # Direct tool package (e.g., web_search_tool, email_tool)
@@ -64,7 +64,7 @@ def _discover_tool_modules() -> list[tuple[str, str]]:
if sub.name.startswith("_") or sub.name == "__pycache__": if sub.name.startswith("_") or sub.name == "__pycache__":
continue continue
if sub.is_dir() and (sub / "__init__.py").exists(): if sub.is_dir() and (sub / "__init__.py").exists():
sub_init_text = (sub / "__init__.py").read_text() sub_init_text = (sub / "__init__.py").read_text(encoding="utf-8")
if "register_tools" in sub_init_text: if "register_tools" in sub_init_text:
modules.append( modules.append(
( (
+19 -14
View File
@@ -49,7 +49,10 @@ def session_dir(tmp_path: Path) -> Path:
def basic_csv(session_dir: Path) -> Path: def basic_csv(session_dir: Path) -> Path:
"""Create a basic CSV file for testing.""" """Create a basic CSV file for testing."""
csv_file = session_dir / "basic.csv" csv_file = session_dir / "basic.csv"
csv_file.write_text("name,age,city\nAlice,30,NYC\nBob,25,LA\nCharlie,35,Chicago\n") csv_file.write_text(
"name,age,city\nAlice,30,NYC\nBob,25,LA\nCharlie,35,Chicago\n",
encoding="utf-8",
)
return csv_file return csv_file
@@ -60,7 +63,7 @@ def large_csv(session_dir: Path) -> Path:
lines = ["id,value"] lines = ["id,value"]
for i in range(100): for i in range(100):
lines.append(f"{i},{i * 10}") lines.append(f"{i},{i * 10}")
csv_file.write_text("\n".join(lines) + "\n") csv_file.write_text("\n".join(lines) + "\n", encoding="utf-8")
return csv_file return csv_file
@@ -68,7 +71,7 @@ def large_csv(session_dir: Path) -> Path:
def empty_csv(session_dir: Path) -> Path: def empty_csv(session_dir: Path) -> Path:
"""Create an empty CSV file (no content).""" """Create an empty CSV file (no content)."""
csv_file = session_dir / "empty.csv" csv_file = session_dir / "empty.csv"
csv_file.write_text("") csv_file.write_text("", encoding="utf-8")
return csv_file return csv_file
@@ -76,7 +79,7 @@ def empty_csv(session_dir: Path) -> Path:
def headers_only_csv(session_dir: Path) -> Path: def headers_only_csv(session_dir: Path) -> Path:
"""Create a CSV file with only headers.""" """Create a CSV file with only headers."""
csv_file = session_dir / "headers_only.csv" csv_file = session_dir / "headers_only.csv"
csv_file.write_text("name,age,city\n") csv_file.write_text("name,age,city\n", encoding="utf-8")
return csv_file return csv_file
@@ -217,7 +220,7 @@ class TestCsvRead:
"""Return error for non-CSV file extension.""" """Return error for non-CSV file extension."""
# Create a text file # Create a text file
txt_file = session_dir / "data.txt" txt_file = session_dir / "data.txt"
txt_file.write_text("name,age\nAlice,30\n") txt_file.write_text("name,age\nAlice,30\n", encoding="utf-8")
with patch("aden_tools.tools.file_system_toolkits.security.WORKSPACES_DIR", str(tmp_path)): with patch("aden_tools.tools.file_system_toolkits.security.WORKSPACES_DIR", str(tmp_path)):
result = csv_tool_fn( result = csv_tool_fn(
@@ -317,7 +320,8 @@ class TestCsvRead:
"""Read CSV with quoted fields containing commas.""" """Read CSV with quoted fields containing commas."""
csv_file = session_dir / "quoted.csv" csv_file = session_dir / "quoted.csv"
csv_file.write_text( csv_file.write_text(
'name,address,note\n"Smith, John","123 Main St, Apt 4","Hello, world"\n' 'name,address,note\n"Smith, John","123 Main St, Apt 4","Hello, world"\n',
encoding="utf-8",
) )
with patch("aden_tools.tools.file_system_toolkits.security.WORKSPACES_DIR", str(tmp_path)): with patch("aden_tools.tools.file_system_toolkits.security.WORKSPACES_DIR", str(tmp_path)):
@@ -385,7 +389,7 @@ class TestCsvWrite:
assert result["rows_written"] == 2 assert result["rows_written"] == 2
# Verify file content # Verify file content
content = (session_dir / "output.csv").read_text() content = (session_dir / "output.csv").read_text(encoding="utf-8")
assert "name,age,city" in content assert "name,age,city" in content
assert "Alice,30,NYC" in content assert "Alice,30,NYC" in content
assert "Bob,25,LA" in content assert "Bob,25,LA" in content
@@ -449,7 +453,7 @@ class TestCsvWrite:
assert result["success"] is True assert result["success"] is True
content = (session_dir / "output.csv").read_text() content = (session_dir / "output.csv").read_text(encoding="utf-8")
assert "extra" not in content assert "extra" not in content
assert "ignored" not in content assert "ignored" not in content
@@ -468,7 +472,7 @@ class TestCsvWrite:
assert result["success"] is True assert result["success"] is True
assert result["rows_written"] == 0 assert result["rows_written"] == 0
content = (session_dir / "output.csv").read_text() content = (session_dir / "output.csv").read_text(encoding="utf-8")
assert "name,age" in content assert "name,age" in content
def test_write_unicode_content(self, csv_tools, session_dir, tmp_path): def test_write_unicode_content(self, csv_tools, session_dir, tmp_path):
@@ -511,7 +515,7 @@ class TestCsvWrite:
csv_file = session_dir / "data.csv" csv_file = session_dir / "data.csv"
assert csv_file.exists() assert csv_file.exists()
content = csv_file.read_text() content = csv_file.read_text(encoding="utf-8")
assert "id,value" in content assert "id,value" in content
assert "1,test1" in content assert "1,test1" in content
assert "2,test2" in content assert "2,test2" in content
@@ -579,7 +583,7 @@ class TestCsvAppend:
assert result["success"] is True assert result["success"] is True
content = (session_dir / "basic.csv").read_text() content = (session_dir / "basic.csv").read_text(encoding="utf-8")
assert "extra" not in content assert "extra" not in content
assert "ignored" not in content assert "ignored" not in content
assert "David" in content assert "David" in content
@@ -587,7 +591,7 @@ class TestCsvAppend:
def test_append_non_csv_extension_error(self, csv_tools, session_dir, tmp_path): def test_append_non_csv_extension_error(self, csv_tools, session_dir, tmp_path):
"""Return error for non-CSV file extension.""" """Return error for non-CSV file extension."""
txt_file = session_dir / "data.txt" txt_file = session_dir / "data.txt"
txt_file.write_text("name\nAlice\n") txt_file.write_text("name\nAlice\n", encoding="utf-8")
with patch("aden_tools.tools.file_system_toolkits.security.WORKSPACES_DIR", str(tmp_path)): with patch("aden_tools.tools.file_system_toolkits.security.WORKSPACES_DIR", str(tmp_path)):
result = csv_tools["csv_append"]( result = csv_tools["csv_append"](
@@ -679,7 +683,7 @@ class TestCsvInfo:
def test_get_info_non_csv_extension_error(self, csv_tools, session_dir, tmp_path): def test_get_info_non_csv_extension_error(self, csv_tools, session_dir, tmp_path):
"""Return error for non-CSV file extension.""" """Return error for non-CSV file extension."""
txt_file = session_dir / "data.txt" txt_file = session_dir / "data.txt"
txt_file.write_text("name\nAlice\n") txt_file.write_text("name\nAlice\n", encoding="utf-8")
with patch("aden_tools.tools.file_system_toolkits.security.WORKSPACES_DIR", str(tmp_path)): with patch("aden_tools.tools.file_system_toolkits.security.WORKSPACES_DIR", str(tmp_path)):
result = csv_tools["csv_info"]( result = csv_tools["csv_info"](
@@ -707,7 +711,8 @@ class TestCsvSql:
"2,MacBook,Electronics,1999,30\n" "2,MacBook,Electronics,1999,30\n"
"3,Coffee Mug,Kitchen,15,200\n" "3,Coffee Mug,Kitchen,15,200\n"
"4,Headphones,Electronics,299,75\n" "4,Headphones,Electronics,299,75\n"
"5,Water Bottle,Kitchen,25,150\n" "5,Water Bottle,Kitchen,25,150\n",
encoding="utf-8",
) )
return csv_file return csv_file
+4 -4
View File
@@ -280,7 +280,7 @@ class TestExcelRead:
"""Return error for non-Excel file extension.""" """Return error for non-Excel file extension."""
# Create a text file # Create a text file
txt_file = session_dir / "data.txt" txt_file = session_dir / "data.txt"
txt_file.write_text("name,age\nAlice,30\n") txt_file.write_text("name,age\nAlice,30\n", encoding="utf-8")
with patch("aden_tools.tools.file_system_toolkits.security.WORKSPACES_DIR", str(tmp_path)): with patch("aden_tools.tools.file_system_toolkits.security.WORKSPACES_DIR", str(tmp_path)):
result = excel_read_fn( result = excel_read_fn(
@@ -602,7 +602,7 @@ class TestExcelAppend:
def test_append_non_xlsx_extension_error(self, excel_tools, session_dir, tmp_path): def test_append_non_xlsx_extension_error(self, excel_tools, session_dir, tmp_path):
"""Return error for non-Excel file extension.""" """Return error for non-Excel file extension."""
txt_file = session_dir / "data.txt" txt_file = session_dir / "data.txt"
txt_file.write_text("name\nAlice\n") txt_file.write_text("name\nAlice\n", encoding="utf-8")
with patch("aden_tools.tools.file_system_toolkits.security.WORKSPACES_DIR", str(tmp_path)): with patch("aden_tools.tools.file_system_toolkits.security.WORKSPACES_DIR", str(tmp_path)):
result = excel_tools["excel_append"]( result = excel_tools["excel_append"](
@@ -672,7 +672,7 @@ class TestExcelInfo:
def test_get_info_non_xlsx_extension_error(self, excel_tools, session_dir, tmp_path): def test_get_info_non_xlsx_extension_error(self, excel_tools, session_dir, tmp_path):
"""Return error for non-Excel file extension.""" """Return error for non-Excel file extension."""
txt_file = session_dir / "data.txt" txt_file = session_dir / "data.txt"
txt_file.write_text("name\nAlice\n") txt_file.write_text("name\nAlice\n", encoding="utf-8")
with patch("aden_tools.tools.file_system_toolkits.security.WORKSPACES_DIR", str(tmp_path)): with patch("aden_tools.tools.file_system_toolkits.security.WORKSPACES_DIR", str(tmp_path)):
result = excel_tools["excel_info"]( result = excel_tools["excel_info"](
@@ -735,7 +735,7 @@ class TestExcelSheetList:
def test_list_sheets_non_xlsx_extension_error(self, excel_tools, session_dir, tmp_path): def test_list_sheets_non_xlsx_extension_error(self, excel_tools, session_dir, tmp_path):
"""Return error for non-Excel file extension.""" """Return error for non-Excel file extension."""
txt_file = session_dir / "data.txt" txt_file = session_dir / "data.txt"
txt_file.write_text("name\nAlice\n") txt_file.write_text("name\nAlice\n", encoding="utf-8")
with patch("aden_tools.tools.file_system_toolkits.security.WORKSPACES_DIR", str(tmp_path)): with patch("aden_tools.tools.file_system_toolkits.security.WORKSPACES_DIR", str(tmp_path)):
result = excel_tools["excel_sheet_list"]( result = excel_tools["excel_sheet_list"](
+50 -49
View File
@@ -86,7 +86,7 @@ class TestViewFileTool:
def test_view_existing_file(self, view_file_fn, mock_workspace, mock_secure_path, tmp_path): def test_view_existing_file(self, view_file_fn, mock_workspace, mock_secure_path, tmp_path):
"""Viewing an existing file returns content and metadata.""" """Viewing an existing file returns content and metadata."""
test_file = tmp_path / "test.txt" test_file = tmp_path / "test.txt"
test_file.write_text("Hello, World!") test_file.write_text("Hello, World!", encoding="utf-8")
result = view_file_fn(path="test.txt", **mock_workspace) result = view_file_fn(path="test.txt", **mock_workspace)
@@ -106,7 +106,7 @@ class TestViewFileTool:
"""Viewing a multiline file returns correct line count.""" """Viewing a multiline file returns correct line count."""
test_file = tmp_path / "multiline.txt" test_file = tmp_path / "multiline.txt"
content = "Line 1\nLine 2\nLine 3\nLine 4\n" content = "Line 1\nLine 2\nLine 3\nLine 4\n"
test_file.write_text(content) test_file.write_text(content, encoding="utf-8")
result = view_file_fn(path="multiline.txt", **mock_workspace) result = view_file_fn(path="multiline.txt", **mock_workspace)
@@ -117,7 +117,7 @@ class TestViewFileTool:
def test_view_empty_file(self, view_file_fn, mock_workspace, mock_secure_path, tmp_path): def test_view_empty_file(self, view_file_fn, mock_workspace, mock_secure_path, tmp_path):
"""Viewing an empty file returns empty content.""" """Viewing an empty file returns empty content."""
test_file = tmp_path / "empty.txt" test_file = tmp_path / "empty.txt"
test_file.write_text("") test_file.write_text("", encoding="utf-8")
result = view_file_fn(path="empty.txt", **mock_workspace) result = view_file_fn(path="empty.txt", **mock_workspace)
@@ -143,7 +143,7 @@ class TestViewFileTool:
nested = tmp_path / "nested" / "dir" nested = tmp_path / "nested" / "dir"
nested.mkdir(parents=True) nested.mkdir(parents=True)
test_file = nested / "file.txt" test_file = nested / "file.txt"
test_file.write_text("nested content") test_file.write_text("nested content", encoding="utf-8")
result = view_file_fn(path="nested/dir/file.txt", **mock_workspace) result = view_file_fn(path="nested/dir/file.txt", **mock_workspace)
@@ -156,7 +156,7 @@ class TestViewFileTool:
"""Viewing a file with max_size truncates content when exceeding limit.""" """Viewing a file with max_size truncates content when exceeding limit."""
test_file = tmp_path / "large.txt" test_file = tmp_path / "large.txt"
content = "x" * 1000 content = "x" * 1000
test_file.write_text(content) test_file.write_text(content, encoding="utf-8")
result = view_file_fn(path="large.txt", max_size=100, **mock_workspace) result = view_file_fn(path="large.txt", max_size=100, **mock_workspace)
@@ -171,7 +171,7 @@ class TestViewFileTool:
): ):
"""Viewing a file with negative max_size returns error.""" """Viewing a file with negative max_size returns error."""
test_file = tmp_path / "test.txt" test_file = tmp_path / "test.txt"
test_file.write_text("content") test_file.write_text("content", encoding="utf-8")
result = view_file_fn(path="test.txt", max_size=-1, **mock_workspace) result = view_file_fn(path="test.txt", max_size=-1, **mock_workspace)
@@ -196,7 +196,7 @@ class TestViewFileTool:
): ):
"""Viewing a file with invalid encoding returns error.""" """Viewing a file with invalid encoding returns error."""
test_file = tmp_path / "test.txt" test_file = tmp_path / "test.txt"
test_file.write_text("content") test_file.write_text("content", encoding="utf-8")
result = view_file_fn(path="test.txt", encoding="invalid-encoding", **mock_workspace) result = view_file_fn(path="test.txt", encoding="invalid-encoding", **mock_workspace)
@@ -225,12 +225,12 @@ class TestWriteToFileTool:
# Verify file was created # Verify file was created
created_file = tmp_path / "new_file.txt" created_file = tmp_path / "new_file.txt"
assert created_file.exists() assert created_file.exists()
assert created_file.read_text() == "Test content" assert created_file.read_text(encoding="utf-8") == "Test content"
def test_write_append_mode(self, write_to_file_fn, mock_workspace, mock_secure_path, tmp_path): def test_write_append_mode(self, write_to_file_fn, mock_workspace, mock_secure_path, tmp_path):
"""Writing with append=True appends to existing file.""" """Writing with append=True appends to existing file."""
test_file = tmp_path / "append_test.txt" test_file = tmp_path / "append_test.txt"
test_file.write_text("Line 1\n") test_file.write_text("Line 1\n", encoding="utf-8")
result = write_to_file_fn( result = write_to_file_fn(
path="append_test.txt", content="Line 2\n", append=True, **mock_workspace path="append_test.txt", content="Line 2\n", append=True, **mock_workspace
@@ -238,20 +238,20 @@ class TestWriteToFileTool:
assert result["success"] is True assert result["success"] is True
assert result["mode"] == "appended" assert result["mode"] == "appended"
assert test_file.read_text() == "Line 1\nLine 2\n" assert test_file.read_text(encoding="utf-8") == "Line 1\nLine 2\n"
def test_write_overwrite_existing( def test_write_overwrite_existing(
self, write_to_file_fn, mock_workspace, mock_secure_path, tmp_path self, write_to_file_fn, mock_workspace, mock_secure_path, tmp_path
): ):
"""Writing to existing file overwrites it by default.""" """Writing to existing file overwrites it by default."""
test_file = tmp_path / "overwrite.txt" test_file = tmp_path / "overwrite.txt"
test_file.write_text("Original content") test_file.write_text("Original content", encoding="utf-8")
result = write_to_file_fn(path="overwrite.txt", content="New content", **mock_workspace) result = write_to_file_fn(path="overwrite.txt", content="New content", **mock_workspace)
assert result["success"] is True assert result["success"] is True
assert result["mode"] == "written" assert result["mode"] == "written"
assert test_file.read_text() == "New content" assert test_file.read_text(encoding="utf-8") == "New content"
def test_write_creates_parent_directories( def test_write_creates_parent_directories(
self, write_to_file_fn, mock_workspace, mock_secure_path, tmp_path self, write_to_file_fn, mock_workspace, mock_secure_path, tmp_path
@@ -262,7 +262,7 @@ class TestWriteToFileTool:
assert result["success"] is True assert result["success"] is True
created_file = tmp_path / "nested" / "dir" / "file.txt" created_file = tmp_path / "nested" / "dir" / "file.txt"
assert created_file.exists() assert created_file.exists()
assert created_file.read_text() == "Test" assert created_file.read_text(encoding="utf-8") == "Test"
def test_write_empty_content( def test_write_empty_content(
self, write_to_file_fn, mock_workspace, mock_secure_path, tmp_path self, write_to_file_fn, mock_workspace, mock_secure_path, tmp_path
@@ -274,7 +274,7 @@ class TestWriteToFileTool:
assert result["bytes_written"] == 0 assert result["bytes_written"] == 0
created_file = tmp_path / "empty.txt" created_file = tmp_path / "empty.txt"
assert created_file.exists() assert created_file.exists()
assert created_file.read_text() == "" assert created_file.read_text(encoding="utf-8") == ""
class TestListDirTool: class TestListDirTool:
@@ -290,8 +290,8 @@ class TestListDirTool:
def test_list_directory(self, list_dir_fn, mock_workspace, mock_secure_path, tmp_path): def test_list_directory(self, list_dir_fn, mock_workspace, mock_secure_path, tmp_path):
"""Listing a directory returns all entries.""" """Listing a directory returns all entries."""
# Create test files and directories # Create test files and directories
(tmp_path / "file1.txt").write_text("content") (tmp_path / "file1.txt").write_text("content", encoding="utf-8")
(tmp_path / "file2.txt").write_text("content") (tmp_path / "file2.txt").write_text("content", encoding="utf-8")
(tmp_path / "subdir").mkdir() (tmp_path / "subdir").mkdir()
result = list_dir_fn(path=".", **mock_workspace) result = list_dir_fn(path=".", **mock_workspace)
@@ -328,8 +328,8 @@ class TestListDirTool:
self, list_dir_fn, mock_workspace, mock_secure_path, tmp_path self, list_dir_fn, mock_workspace, mock_secure_path, tmp_path
): ):
"""Listing a directory returns file sizes for files.""" """Listing a directory returns file sizes for files."""
(tmp_path / "small.txt").write_text("hi") (tmp_path / "small.txt").write_text("hi", encoding="utf-8")
(tmp_path / "larger.txt").write_text("hello world") (tmp_path / "larger.txt").write_text("hello world", encoding="utf-8")
(tmp_path / "subdir").mkdir() (tmp_path / "subdir").mkdir()
result = list_dir_fn(path=".", **mock_workspace) result = list_dir_fn(path=".", **mock_workspace)
@@ -366,7 +366,7 @@ class TestReplaceFileContentTool:
): ):
"""Replacing content in a file works correctly.""" """Replacing content in a file works correctly."""
test_file = tmp_path / "replace_test.txt" test_file = tmp_path / "replace_test.txt"
test_file.write_text("Hello World! Hello again!") test_file.write_text("Hello World! Hello again!", encoding="utf-8")
result = replace_file_content_fn( result = replace_file_content_fn(
path="replace_test.txt", target="Hello", replacement="Hi", **mock_workspace path="replace_test.txt", target="Hello", replacement="Hi", **mock_workspace
@@ -374,14 +374,14 @@ class TestReplaceFileContentTool:
assert result["success"] is True assert result["success"] is True
assert result["occurrences_replaced"] == 2 assert result["occurrences_replaced"] == 2
assert test_file.read_text() == "Hi World! Hi again!" assert test_file.read_text(encoding="utf-8") == "Hi World! Hi again!"
def test_replace_target_not_found( def test_replace_target_not_found(
self, replace_file_content_fn, mock_workspace, mock_secure_path, tmp_path self, replace_file_content_fn, mock_workspace, mock_secure_path, tmp_path
): ):
"""Replacing non-existent target returns error.""" """Replacing non-existent target returns error."""
test_file = tmp_path / "test.txt" test_file = tmp_path / "test.txt"
test_file.write_text("Hello World") test_file.write_text("Hello World", encoding="utf-8")
result = replace_file_content_fn( result = replace_file_content_fn(
path="test.txt", target="nonexistent", replacement="new", **mock_workspace path="test.txt", target="nonexistent", replacement="new", **mock_workspace
@@ -406,7 +406,7 @@ class TestReplaceFileContentTool:
): ):
"""Replacing content with single occurrence works correctly.""" """Replacing content with single occurrence works correctly."""
test_file = tmp_path / "single.txt" test_file = tmp_path / "single.txt"
test_file.write_text("Hello World") test_file.write_text("Hello World", encoding="utf-8")
result = replace_file_content_fn( result = replace_file_content_fn(
path="single.txt", target="Hello", replacement="Hi", **mock_workspace path="single.txt", target="Hello", replacement="Hi", **mock_workspace
@@ -414,14 +414,14 @@ class TestReplaceFileContentTool:
assert result["success"] is True assert result["success"] is True
assert result["occurrences_replaced"] == 1 assert result["occurrences_replaced"] == 1
assert test_file.read_text() == "Hi World" assert test_file.read_text(encoding="utf-8") == "Hi World"
def test_replace_multiline_content( def test_replace_multiline_content(
self, replace_file_content_fn, mock_workspace, mock_secure_path, tmp_path self, replace_file_content_fn, mock_workspace, mock_secure_path, tmp_path
): ):
"""Replacing content across multiple lines works correctly.""" """Replacing content across multiple lines works correctly."""
test_file = tmp_path / "multiline.txt" test_file = tmp_path / "multiline.txt"
test_file.write_text("Line 1\nTODO: fix this\nLine 3\nTODO: add tests\n") test_file.write_text("Line 1\nTODO: fix this\nLine 3\nTODO: add tests\n", encoding="utf-8")
result = replace_file_content_fn( result = replace_file_content_fn(
path="multiline.txt", target="TODO:", replacement="DONE:", **mock_workspace path="multiline.txt", target="TODO:", replacement="DONE:", **mock_workspace
@@ -429,7 +429,8 @@ class TestReplaceFileContentTool:
assert result["success"] is True assert result["success"] is True
assert result["occurrences_replaced"] == 2 assert result["occurrences_replaced"] == 2
assert test_file.read_text() == "Line 1\nDONE: fix this\nLine 3\nDONE: add tests\n" expected = "Line 1\nDONE: fix this\nLine 3\nDONE: add tests\n"
assert test_file.read_text(encoding="utf-8") == expected
class TestGrepSearchTool: class TestGrepSearchTool:
@@ -447,7 +448,7 @@ class TestGrepSearchTool:
): ):
"""Searching a single file returns matches.""" """Searching a single file returns matches."""
test_file = tmp_path / "search_test.txt" test_file = tmp_path / "search_test.txt"
test_file.write_text("Line 1\nLine 2 with pattern\nLine 3") test_file.write_text("Line 1\nLine 2 with pattern\nLine 3", encoding="utf-8")
result = grep_search_fn(path="search_test.txt", pattern="pattern", **mock_workspace) result = grep_search_fn(path="search_test.txt", pattern="pattern", **mock_workspace)
@@ -462,7 +463,7 @@ class TestGrepSearchTool:
): ):
"""Searching with no matches returns empty list.""" """Searching with no matches returns empty list."""
test_file = tmp_path / "test.txt" test_file = tmp_path / "test.txt"
test_file.write_text("Hello World") test_file.write_text("Hello World", encoding="utf-8")
result = grep_search_fn(path="test.txt", pattern="nonexistent", **mock_workspace) result = grep_search_fn(path="test.txt", pattern="nonexistent", **mock_workspace)
@@ -475,13 +476,13 @@ class TestGrepSearchTool:
): ):
"""Searching directory non-recursively only searches immediate files.""" """Searching directory non-recursively only searches immediate files."""
# Create files in root # Create files in root
(tmp_path / "file1.txt").write_text("pattern here") (tmp_path / "file1.txt").write_text("pattern here", encoding="utf-8")
(tmp_path / "file2.txt").write_text("no match here") (tmp_path / "file2.txt").write_text("no match here", encoding="utf-8")
# Create nested directory with file # Create nested directory with file
nested = tmp_path / "nested" nested = tmp_path / "nested"
nested.mkdir() nested.mkdir()
(nested / "nested_file.txt").write_text("pattern in nested") (nested / "nested_file.txt").write_text("pattern in nested", encoding="utf-8")
result = grep_search_fn(path=".", pattern="pattern", recursive=False, **mock_workspace) result = grep_search_fn(path=".", pattern="pattern", recursive=False, **mock_workspace)
@@ -494,12 +495,12 @@ class TestGrepSearchTool:
): ):
"""Searching directory recursively finds matches in subdirectories.""" """Searching directory recursively finds matches in subdirectories."""
# Create files in root # Create files in root
(tmp_path / "file1.txt").write_text("pattern here") (tmp_path / "file1.txt").write_text("pattern here", encoding="utf-8")
# Create nested directory with file # Create nested directory with file
nested = tmp_path / "nested" nested = tmp_path / "nested"
nested.mkdir() nested.mkdir()
(nested / "nested_file.txt").write_text("pattern in nested") (nested / "nested_file.txt").write_text("pattern in nested", encoding="utf-8")
result = grep_search_fn(path=".", pattern="pattern", recursive=True, **mock_workspace) result = grep_search_fn(path=".", pattern="pattern", recursive=True, **mock_workspace)
@@ -512,7 +513,7 @@ class TestGrepSearchTool:
): ):
"""Searching with regex pattern finds complex matches.""" """Searching with regex pattern finds complex matches."""
test_file = tmp_path / "regex_test.txt" test_file = tmp_path / "regex_test.txt"
test_file.write_text("foo123bar\nfoo456bar\nbaz789baz\n") test_file.write_text("foo123bar\nfoo456bar\nbaz789baz\n", encoding="utf-8")
result = grep_search_fn(path="regex_test.txt", pattern=r"foo\d+bar", **mock_workspace) result = grep_search_fn(path="regex_test.txt", pattern=r"foo\d+bar", **mock_workspace)
@@ -526,7 +527,7 @@ class TestGrepSearchTool:
): ):
"""Searching returns one match per line even with multiple occurrences.""" """Searching returns one match per line even with multiple occurrences."""
test_file = tmp_path / "multi_match.txt" test_file = tmp_path / "multi_match.txt"
test_file.write_text("hello hello hello\nworld\nhello again") test_file.write_text("hello hello hello\nworld\nhello again", encoding="utf-8")
result = grep_search_fn(path="multi_match.txt", pattern="hello", **mock_workspace) result = grep_search_fn(path="multi_match.txt", pattern="hello", **mock_workspace)
@@ -573,7 +574,7 @@ class TestExecuteCommandTool:
): ):
"""Executing ls command lists files.""" """Executing ls command lists files."""
# Create a test file # Create a test file
(tmp_path / "testfile.txt").write_text("content") (tmp_path / "testfile.txt").write_text("content", encoding="utf-8")
result = execute_command_fn(command=f"ls {tmp_path}", **mock_workspace) result = execute_command_fn(command=f"ls {tmp_path}", **mock_workspace)
@@ -610,7 +611,7 @@ class TestApplyDiffTool:
def test_apply_diff_successful(self, apply_diff_fn, mock_workspace, mock_secure_path, tmp_path): def test_apply_diff_successful(self, apply_diff_fn, mock_workspace, mock_secure_path, tmp_path):
"""Applying a valid diff successfully modifies the file.""" """Applying a valid diff successfully modifies the file."""
test_file = tmp_path / "diff_test.txt" test_file = tmp_path / "diff_test.txt"
test_file.write_text("Hello World") test_file.write_text("Hello World", encoding="utf-8")
# Create a simple diff using diff_match_patch format # Create a simple diff using diff_match_patch format
import diff_match_patch as dmp_module import diff_match_patch as dmp_module
@@ -624,13 +625,13 @@ class TestApplyDiffTool:
assert result["success"] is True assert result["success"] is True
assert result["all_successful"] is True assert result["all_successful"] is True
assert result["patches_applied"] > 0 assert result["patches_applied"] > 0
assert test_file.read_text() == "Hello Universe" assert test_file.read_text(encoding="utf-8") == "Hello Universe"
def test_apply_diff_multiline(self, apply_diff_fn, mock_workspace, mock_secure_path, tmp_path): def test_apply_diff_multiline(self, apply_diff_fn, mock_workspace, mock_secure_path, tmp_path):
"""Applying diff to multiline content works correctly.""" """Applying diff to multiline content works correctly."""
test_file = tmp_path / "multiline.txt" test_file = tmp_path / "multiline.txt"
original = "Line 1\nLine 2\nLine 3\n" original = "Line 1\nLine 2\nLine 3\n"
test_file.write_text(original) test_file.write_text(original, encoding="utf-8")
import diff_match_patch as dmp_module import diff_match_patch as dmp_module
@@ -643,7 +644,7 @@ class TestApplyDiffTool:
assert result["success"] is True assert result["success"] is True
assert result["all_successful"] is True assert result["all_successful"] is True
assert test_file.read_text() == modified assert test_file.read_text(encoding="utf-8") == modified
def test_apply_diff_invalid_patch( def test_apply_diff_invalid_patch(
self, apply_diff_fn, mock_workspace, mock_secure_path, tmp_path self, apply_diff_fn, mock_workspace, mock_secure_path, tmp_path
@@ -651,7 +652,7 @@ class TestApplyDiffTool:
"""Applying an invalid diff handles gracefully.""" """Applying an invalid diff handles gracefully."""
test_file = tmp_path / "test.txt" test_file = tmp_path / "test.txt"
original_content = "Original content" original_content = "Original content"
test_file.write_text(original_content) test_file.write_text(original_content, encoding="utf-8")
# Invalid diff text # Invalid diff text
result = apply_diff_fn(path="test.txt", diff_text="invalid diff format", **mock_workspace) result = apply_diff_fn(path="test.txt", diff_text="invalid diff format", **mock_workspace)
@@ -660,7 +661,7 @@ class TestApplyDiffTool:
if "error" not in result: if "error" not in result:
assert result.get("patches_applied", 0) == 0 assert result.get("patches_applied", 0) == 0
# File should remain unchanged # File should remain unchanged
assert test_file.read_text() == original_content assert test_file.read_text(encoding="utf-8") == original_content
class TestApplyPatchTool: class TestApplyPatchTool:
@@ -685,7 +686,7 @@ class TestApplyPatchTool:
): ):
"""Applying a valid patch successfully modifies the file.""" """Applying a valid patch successfully modifies the file."""
test_file = tmp_path / "patch_test.txt" test_file = tmp_path / "patch_test.txt"
test_file.write_text("Hello World") test_file.write_text("Hello World", encoding="utf-8")
# Create a simple patch using diff_match_patch format # Create a simple patch using diff_match_patch format
import diff_match_patch as dmp_module import diff_match_patch as dmp_module
@@ -699,7 +700,7 @@ class TestApplyPatchTool:
assert result["success"] is True assert result["success"] is True
assert result["all_successful"] is True assert result["all_successful"] is True
assert result["patches_applied"] > 0 assert result["patches_applied"] > 0
assert test_file.read_text() == "Hello Python" assert test_file.read_text(encoding="utf-8") == "Hello Python"
def test_apply_patch_multiline( def test_apply_patch_multiline(
self, apply_patch_fn, mock_workspace, mock_secure_path, tmp_path self, apply_patch_fn, mock_workspace, mock_secure_path, tmp_path
@@ -707,7 +708,7 @@ class TestApplyPatchTool:
"""Applying patch to multiline content works correctly.""" """Applying patch to multiline content works correctly."""
test_file = tmp_path / "multiline.txt" test_file = tmp_path / "multiline.txt"
original = "Line 1\nLine 2\nLine 3\n" original = "Line 1\nLine 2\nLine 3\n"
test_file.write_text(original) test_file.write_text(original, encoding="utf-8")
import diff_match_patch as dmp_module import diff_match_patch as dmp_module
@@ -720,7 +721,7 @@ class TestApplyPatchTool:
assert result["success"] is True assert result["success"] is True
assert result["all_successful"] is True assert result["all_successful"] is True
assert test_file.read_text() == modified assert test_file.read_text(encoding="utf-8") == modified
def test_apply_patch_invalid_patch( def test_apply_patch_invalid_patch(
self, apply_patch_fn, mock_workspace, mock_secure_path, tmp_path self, apply_patch_fn, mock_workspace, mock_secure_path, tmp_path
@@ -728,7 +729,7 @@ class TestApplyPatchTool:
"""Applying an invalid patch handles gracefully.""" """Applying an invalid patch handles gracefully."""
test_file = tmp_path / "test.txt" test_file = tmp_path / "test.txt"
original_content = "Original content" original_content = "Original content"
test_file.write_text(original_content) test_file.write_text(original_content, encoding="utf-8")
# Invalid patch text # Invalid patch text
result = apply_patch_fn( result = apply_patch_fn(
@@ -739,7 +740,7 @@ class TestApplyPatchTool:
if "error" not in result: if "error" not in result:
assert result.get("patches_applied", 0) == 0 assert result.get("patches_applied", 0) == 0
# File should remain unchanged # File should remain unchanged
assert test_file.read_text() == original_content assert test_file.read_text(encoding="utf-8") == original_content
def test_apply_patch_multiple_changes( def test_apply_patch_multiple_changes(
self, apply_patch_fn, mock_workspace, mock_secure_path, tmp_path self, apply_patch_fn, mock_workspace, mock_secure_path, tmp_path
@@ -747,7 +748,7 @@ class TestApplyPatchTool:
"""Applying patch with multiple changes works correctly.""" """Applying patch with multiple changes works correctly."""
test_file = tmp_path / "complex.txt" test_file = tmp_path / "complex.txt"
original = "Function foo() {\n return 42;\n}\n" original = "Function foo() {\n return 42;\n}\n"
test_file.write_text(original) test_file.write_text(original, encoding="utf-8")
import diff_match_patch as dmp_module import diff_match_patch as dmp_module
@@ -760,4 +761,4 @@ class TestApplyPatchTool:
assert result["success"] is True assert result["success"] is True
assert result["all_successful"] is True assert result["all_successful"] is True
assert test_file.read_text() == modified assert test_file.read_text(encoding="utf-8") == modified
+1 -1
View File
@@ -28,7 +28,7 @@ class TestPdfReadTool:
def test_read_pdf_invalid_extension(self, pdf_read_fn, tmp_path: Path): def test_read_pdf_invalid_extension(self, pdf_read_fn, tmp_path: Path):
"""Reading non-PDF file returns error.""" """Reading non-PDF file returns error."""
txt_file = tmp_path / "test.txt" txt_file = tmp_path / "test.txt"
txt_file.write_text("not a pdf") txt_file.write_text("not a pdf", encoding="utf-8")
result = pdf_read_fn(file_path=str(txt_file)) result = pdf_read_fn(file_path=str(txt_file))
+4 -2
View File
@@ -47,7 +47,8 @@ def runtime_logs_dir(tmp_path: Path) -> Path:
"duration_ms": 3000, "duration_ms": 3000,
"execution_quality": "clean", "execution_quality": "clean",
} }
) ),
encoding="utf-8",
) )
_write_jsonl( _write_jsonl(
run1_dir / "details.jsonl", run1_dir / "details.jsonl",
@@ -143,7 +144,8 @@ def runtime_logs_dir(tmp_path: Path) -> Path:
"duration_ms": 60000, "duration_ms": 60000,
"execution_quality": "failed", "execution_quality": "failed",
} }
) ),
encoding="utf-8",
) )
_write_jsonl( _write_jsonl(
run2_dir / "details.jsonl", run2_dir / "details.jsonl",
+2 -2
View File
@@ -238,7 +238,7 @@ class TestGetSecurePath:
# Create a target file and a symlink to it # Create a target file and a symlink to it
target_file = session_dir / "target.txt" target_file = session_dir / "target.txt"
target_file.write_text("content") target_file.write_text("content", encoding="utf-8")
symlink_path = session_dir / "link_to_target" symlink_path = session_dir / "link_to_target"
symlink_path.symlink_to(target_file) symlink_path.symlink_to(target_file)
@@ -263,7 +263,7 @@ class TestGetSecurePath:
# Create a symlink inside session pointing outside # Create a symlink inside session pointing outside
outside_target = self.workspaces_dir / "outside_file.txt" outside_target = self.workspaces_dir / "outside_file.txt"
outside_target.write_text("sensitive data") outside_target.write_text("sensitive data", encoding="utf-8")
symlink_path = session_dir / "escape_link" symlink_path = session_dir / "escape_link"
symlink_path.symlink_to(outside_target) symlink_path.symlink_to(outside_target)