Compare commits
37 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 41388efc31 | |||
| fab5ce6fd0 | |||
| 207d6baee5 | |||
| fec72bb2b6 | |||
| c4c4c24c59 | |||
| 2005ba2dca | |||
| 557d5fd6e5 | |||
| 79d2a15f95 | |||
| ab32e44128 | |||
| 047059f85f | |||
| e8364f616d | |||
| 9098c9b6c6 | |||
| 84fd9ebac8 | |||
| 23d5d76d56 | |||
| b0c86588b6 | |||
| 5aff1f9489 | |||
| 199cb3d8cc | |||
| a98a4ca0b6 | |||
| c4f49aadfa | |||
| ca5ac389cf | |||
| 7a658f7953 | |||
| e05fc99da7 | |||
| 787090667e | |||
| 80b36b4052 | |||
| 1c58ccb0c1 | |||
| 79b80fe817 | |||
| c0f3841af7 | |||
| 2b7d9bc471 | |||
| 98dc493a39 | |||
| cfaa57b28d | |||
| 704a0fd63a | |||
| bf4101ac38 | |||
| bc349e8fde | |||
| 8470c6a980 | |||
| c313ea7ee2 | |||
| 99d41d8cc6 | |||
| e1db3a4af9 |
@@ -62,8 +62,11 @@ jobs:
|
||||
uv run pytest tests/ -v
|
||||
|
||||
test-tools:
|
||||
name: Test Tools
|
||||
runs-on: ubuntu-latest
|
||||
name: Test Tools (${{ matrix.os }})
|
||||
runs-on: ${{ matrix.os }}
|
||||
strategy:
|
||||
matrix:
|
||||
os: [ubuntu-latest, windows-latest]
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
|
||||
@@ -20,8 +20,20 @@ check: ## Run all checks without modifying files (CI-safe)
|
||||
cd core && ruff format --check .
|
||||
cd tools && ruff format --check .
|
||||
|
||||
test: ## Run all tests
|
||||
test: ## Run all tests (core + tools, excludes live)
|
||||
cd core && uv run python -m pytest tests/ -v
|
||||
cd tools && uv run python -m pytest -v
|
||||
|
||||
test-tools: ## Run tool tests only (mocked, no credentials needed)
|
||||
cd tools && uv run python -m pytest -v
|
||||
|
||||
test-live: ## Run live integration tests (requires real API credentials)
|
||||
cd tools && uv run python -m pytest -m live -s -o "addopts=" --log-cli-level=INFO
|
||||
|
||||
test-all: ## Run everything including live tests
|
||||
cd core && uv run python -m pytest tests/ -v
|
||||
cd tools && uv run python -m pytest -v
|
||||
cd tools && uv run python -m pytest -m live -s -o "addopts=" --log-cli-level=INFO
|
||||
|
||||
install-hooks: ## Install pre-commit hooks
|
||||
uv pip install pre-commit
|
||||
|
||||
@@ -82,6 +82,7 @@ Use Hive when you need:
|
||||
|
||||
- Python 3.11+ for agent development
|
||||
- An LLM provider that powers the agents
|
||||
- **ripgrep (optional, recommended on Windows):** The `search_files` tool uses ripgrep for faster file search. If not installed, a Python fallback is used. On Windows: `winget install BurntSushi.ripgrep` or `scoop install ripgrep`
|
||||
|
||||
> **Note for Windows Users:** It is strongly recommended to use **WSL (Windows Subsystem for Linux)** or **Git Bash** to run this framework. Some core automation scripts may not execute correctly in standard Command Prompt or PowerShell.
|
||||
|
||||
|
||||
@@ -46,6 +46,7 @@ _SHARED_TOOLS = [
|
||||
"read_file",
|
||||
"write_file",
|
||||
"edit_file",
|
||||
"hashline_edit",
|
||||
"list_directory",
|
||||
"search_files",
|
||||
"run_command",
|
||||
@@ -131,12 +132,23 @@ errors yourself. Don't declare success until validation passes.
|
||||
|
||||
# Tools
|
||||
|
||||
## Paths (MANDATORY)
|
||||
**Always use RELATIVE paths**
|
||||
(e.g. `exports/agent_name/config.py`, `exports/agent_name/nodes/__init__.py`).
|
||||
**Never use absolute paths** like `/mnt/data/...` or `/workspace/...` — they fail.
|
||||
The project root is implicit.
|
||||
|
||||
## File I/O
|
||||
- read_file(path, offset?, limit?) — read with line numbers
|
||||
- read_file(path, offset?, limit?, hashline?) — read with line numbers; \
|
||||
hashline=True for N:hhhh|content anchors (use with hashline_edit)
|
||||
- write_file(path, content) — create/overwrite, auto-mkdir
|
||||
- edit_file(path, old_text, new_text, replace_all?) — fuzzy-match edit
|
||||
- hashline_edit(path, edits, auto_cleanup?, encoding?) — anchor-based \
|
||||
editing using N:hhhh refs from read_file(hashline=True). Ops: set_line, \
|
||||
replace_lines, insert_after, insert_before, replace, append
|
||||
- list_directory(path, recursive?) — list contents
|
||||
- search_files(pattern, path?, include?) — regex search
|
||||
- search_files(pattern, path?, include?, hashline?) — regex search; \
|
||||
hashline=True for anchors in results
|
||||
- run_command(command, cwd?, timeout?) — shell execution
|
||||
- undo_changes(path?) — restore from git snapshot
|
||||
|
||||
|
||||
@@ -431,8 +431,7 @@ class GraphSpec(BaseModel):
|
||||
max_tokens: int = Field(default=None) # resolved by _resolve_max_tokens validator
|
||||
|
||||
# Cleanup LLM for JSON extraction fallback (fast/cheap model preferred)
|
||||
# If not set, uses CEREBRAS_API_KEY -> cerebras/llama-3.3-70b or
|
||||
# ANTHROPIC_API_KEY -> claude-haiku-4-5 as fallback
|
||||
# If not set, uses CEREBRAS_API_KEY -> cerebras/llama-3.3-70b
|
||||
cleanup_llm_model: str | None = None
|
||||
|
||||
# Execution limits
|
||||
|
||||
@@ -4058,329 +4058,3 @@ class EventLoopNode(NodeProtocol):
|
||||
content=json.dumps(result_json, indent=2),
|
||||
is_error=True,
|
||||
)
|
||||
|
||||
# -------------------------------------------------------------------
|
||||
# Subagent Execution
|
||||
# -------------------------------------------------------------------
|
||||
|
||||
async def _execute_subagent(
|
||||
self,
|
||||
ctx: NodeContext,
|
||||
agent_id: str,
|
||||
task: str,
|
||||
*,
|
||||
accumulator: OutputAccumulator | None = None,
|
||||
) -> ToolResult:
|
||||
"""Execute a subagent and return the result as a ToolResult.
|
||||
|
||||
The subagent:
|
||||
- Gets a fresh conversation with just the task
|
||||
- Has read-only access to the parent's readable memory
|
||||
- Cannot delegate to its own subagents (prevents recursion)
|
||||
- Returns its output in structured JSON format
|
||||
|
||||
Args:
|
||||
ctx: Parent node's context (for memory, tools, LLM access).
|
||||
agent_id: The node ID of the subagent to invoke.
|
||||
task: The task description to give the subagent.
|
||||
accumulator: Parent's OutputAccumulator — provides outputs that
|
||||
have been set via ``set_output`` but not yet written to
|
||||
shared memory (which only happens after the node completes).
|
||||
|
||||
Returns:
|
||||
ToolResult with structured JSON output containing:
|
||||
- message: Human-readable summary
|
||||
- data: Subagent's output (free-form JSON)
|
||||
- metadata: Execution metadata (success, tokens, latency)
|
||||
"""
|
||||
from framework.graph.node import NodeContext, SharedMemory
|
||||
|
||||
# Log subagent invocation start
|
||||
logger.info(
|
||||
"\n" + "=" * 60 + "\n"
|
||||
"🤖 SUBAGENT INVOCATION\n"
|
||||
"=" * 60 + "\n"
|
||||
"Parent Node: %s\n"
|
||||
"Subagent ID: %s\n"
|
||||
"Task: %s\n" + "=" * 60,
|
||||
ctx.node_id,
|
||||
agent_id,
|
||||
task[:500] + "..." if len(task) > 500 else task,
|
||||
)
|
||||
|
||||
# 1. Validate agent exists in registry
|
||||
if agent_id not in ctx.node_registry:
|
||||
return ToolResult(
|
||||
tool_use_id="",
|
||||
content=json.dumps(
|
||||
{
|
||||
"message": f"Sub-agent '{agent_id}' not found in registry",
|
||||
"data": None,
|
||||
"metadata": {"agent_id": agent_id, "success": False, "error": "not_found"},
|
||||
}
|
||||
),
|
||||
is_error=True,
|
||||
)
|
||||
|
||||
subagent_spec = ctx.node_registry[agent_id]
|
||||
|
||||
# 2. Create read-only memory snapshot
|
||||
# Start with everything the parent can read from shared memory.
|
||||
parent_data = ctx.memory.read_all()
|
||||
|
||||
# Merge in-flight outputs from the parent's accumulator.
|
||||
# set_output() writes to the accumulator but shared memory is only
|
||||
# updated after the parent node completes — so the subagent would
|
||||
# otherwise miss any keys the parent set before delegating.
|
||||
if accumulator:
|
||||
for key, value in accumulator.to_dict().items():
|
||||
if key not in parent_data:
|
||||
parent_data[key] = value
|
||||
|
||||
subagent_memory = SharedMemory()
|
||||
for key, value in parent_data.items():
|
||||
subagent_memory.write(key, value, validate=False)
|
||||
|
||||
# Allow reads for parent data AND the subagent's declared input_keys
|
||||
# (input_keys may reference keys that exist but weren't in read_all,
|
||||
# or keys that were just written by the accumulator).
|
||||
read_keys = set(parent_data.keys()) | set(subagent_spec.input_keys or [])
|
||||
scoped_memory = subagent_memory.with_permissions(
|
||||
read_keys=list(read_keys),
|
||||
write_keys=[], # Read-only!
|
||||
)
|
||||
|
||||
# 2b. Set up report callback (one-way channel to parent / event bus)
|
||||
subagent_reports: list[dict] = []
|
||||
|
||||
async def _report_callback(
|
||||
message: str,
|
||||
data: dict | None = None,
|
||||
*,
|
||||
wait_for_response: bool = False,
|
||||
) -> str | None:
|
||||
subagent_reports.append({"message": message, "data": data, "timestamp": time.time()})
|
||||
if self._event_bus:
|
||||
await self._event_bus.emit_subagent_report(
|
||||
stream_id=ctx.node_id,
|
||||
node_id=f"{ctx.node_id}:subagent:{agent_id}",
|
||||
subagent_id=agent_id,
|
||||
message=message,
|
||||
data=data,
|
||||
execution_id=ctx.execution_id,
|
||||
)
|
||||
|
||||
if not wait_for_response:
|
||||
return None
|
||||
|
||||
if not self._event_bus:
|
||||
logger.warning(
|
||||
"Subagent '%s' requested user response but no event_bus available",
|
||||
agent_id,
|
||||
)
|
||||
return None
|
||||
|
||||
# Create isolated receiver and register for input routing
|
||||
import uuid
|
||||
|
||||
escalation_id = f"{ctx.node_id}:escalation:{uuid.uuid4().hex[:8]}"
|
||||
receiver = _EscalationReceiver()
|
||||
registry = ctx.shared_node_registry
|
||||
|
||||
registry[escalation_id] = receiver
|
||||
try:
|
||||
# Stream message to user (parent's node_id so TUI shows parent talking)
|
||||
await self._event_bus.emit_client_output_delta(
|
||||
stream_id=ctx.node_id,
|
||||
node_id=ctx.node_id,
|
||||
content=message,
|
||||
snapshot=message,
|
||||
execution_id=ctx.execution_id,
|
||||
)
|
||||
# Request input (escalation_id for routing response back)
|
||||
await self._event_bus.emit_client_input_requested(
|
||||
stream_id=ctx.node_id,
|
||||
node_id=escalation_id,
|
||||
prompt=message,
|
||||
execution_id=ctx.execution_id,
|
||||
)
|
||||
# Block until user responds
|
||||
return await receiver.wait()
|
||||
finally:
|
||||
registry.pop(escalation_id, None)
|
||||
|
||||
# 3. Filter tools for subagent
|
||||
# Use the full tool catalog (ctx.all_tools) so subagents can access tools
|
||||
# that aren't in the parent node's filtered set (e.g. browser tools for a
|
||||
# GCU subagent when the parent only has web_scrape/save_data).
|
||||
# Falls back to ctx.available_tools if all_tools is empty (e.g. in tests).
|
||||
subagent_tool_names = set(subagent_spec.tools or [])
|
||||
tool_source = ctx.all_tools if ctx.all_tools else ctx.available_tools
|
||||
|
||||
subagent_tools = [
|
||||
t
|
||||
for t in tool_source
|
||||
if t.name in subagent_tool_names and t.name != "delegate_to_sub_agent"
|
||||
]
|
||||
|
||||
missing = subagent_tool_names - {t.name for t in subagent_tools}
|
||||
if missing:
|
||||
logger.warning(
|
||||
"Subagent '%s' requested tools not found in catalog: %s",
|
||||
agent_id,
|
||||
sorted(missing),
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"📦 Subagent '%s' configuration:\n"
|
||||
" - System prompt: %s\n"
|
||||
" - Tools available (%d): %s\n"
|
||||
" - Memory keys inherited: %s",
|
||||
agent_id,
|
||||
(subagent_spec.system_prompt[:200] + "...")
|
||||
if subagent_spec.system_prompt and len(subagent_spec.system_prompt) > 200
|
||||
else subagent_spec.system_prompt,
|
||||
len(subagent_tools),
|
||||
[t.name for t in subagent_tools],
|
||||
list(parent_data.keys()),
|
||||
)
|
||||
|
||||
# 4. Build subagent context
|
||||
max_iter = min(self._config.max_iterations, 10)
|
||||
subagent_ctx = NodeContext(
|
||||
runtime=ctx.runtime,
|
||||
node_id=f"{ctx.node_id}:subagent:{agent_id}",
|
||||
node_spec=subagent_spec,
|
||||
memory=scoped_memory,
|
||||
input_data={"task": task, **parent_data},
|
||||
llm=ctx.llm,
|
||||
available_tools=subagent_tools,
|
||||
goal_context=(
|
||||
f"Your specific task: {task}\n\n"
|
||||
f"COMPLETION REQUIREMENTS:\n"
|
||||
f"When your task is done, you MUST call set_output() "
|
||||
f"for each required key: {subagent_spec.output_keys}\n"
|
||||
f"Alternatively, call report_to_parent(mark_complete=true) "
|
||||
f"with your findings in message/data.\n"
|
||||
f"You have a maximum of {max_iter} turns to complete this task."
|
||||
),
|
||||
goal=ctx.goal,
|
||||
max_tokens=ctx.max_tokens,
|
||||
runtime_logger=ctx.runtime_logger,
|
||||
is_subagent_mode=True, # Prevents nested delegation
|
||||
report_callback=_report_callback,
|
||||
node_registry={}, # Empty - no nested subagents
|
||||
shared_node_registry=ctx.shared_node_registry, # For escalation routing
|
||||
)
|
||||
|
||||
# 5. Create and execute subagent EventLoopNode
|
||||
# Derive a conversation store for the subagent from the parent's store.
|
||||
# Each invocation gets a unique path so that repeated delegate calls
|
||||
# (e.g. one per profile) don't restore a stale completed conversation.
|
||||
self._subagent_instance_counter.setdefault(agent_id, 0)
|
||||
self._subagent_instance_counter[agent_id] += 1
|
||||
subagent_instance = str(self._subagent_instance_counter[agent_id])
|
||||
|
||||
subagent_conv_store = None
|
||||
if self._conversation_store is not None:
|
||||
from framework.storage.conversation_store import FileConversationStore
|
||||
|
||||
parent_base = getattr(self._conversation_store, "_base", None)
|
||||
if parent_base is not None:
|
||||
# Store subagent conversations parallel to the parent node,
|
||||
# not nested inside it. e.g. conversations/{node}:subagent:{agent_id}:{instance}/
|
||||
conversations_dir = parent_base.parent # e.g. conversations/
|
||||
subagent_dir_name = f"{agent_id}-{subagent_instance}"
|
||||
subagent_store_path = conversations_dir / subagent_dir_name
|
||||
subagent_conv_store = FileConversationStore(base_path=subagent_store_path)
|
||||
|
||||
# Derive a subagent-scoped spillover dir so large tool results
|
||||
# (e.g. browser_snapshot) get written to disk instead of being
|
||||
# silently truncated. Each instance gets its own directory to
|
||||
# avoid file collisions between concurrent subagents.
|
||||
subagent_spillover = None
|
||||
if self._config.spillover_dir:
|
||||
subagent_spillover = str(
|
||||
Path(self._config.spillover_dir) / agent_id / subagent_instance
|
||||
)
|
||||
|
||||
subagent_node = EventLoopNode(
|
||||
event_bus=None, # Subagents don't emit events to parent's bus
|
||||
judge=SubagentJudge(task=task, max_iterations=max_iter),
|
||||
config=LoopConfig(
|
||||
max_iterations=max_iter, # Tighter budget
|
||||
max_tool_calls_per_turn=self._config.max_tool_calls_per_turn,
|
||||
tool_call_overflow_margin=self._config.tool_call_overflow_margin,
|
||||
max_history_tokens=self._config.max_history_tokens,
|
||||
stall_detection_threshold=self._config.stall_detection_threshold,
|
||||
max_tool_result_chars=self._config.max_tool_result_chars,
|
||||
spillover_dir=subagent_spillover,
|
||||
),
|
||||
tool_executor=self._tool_executor,
|
||||
conversation_store=subagent_conv_store,
|
||||
)
|
||||
|
||||
try:
|
||||
logger.info("🚀 Starting subagent '%s' execution...", agent_id)
|
||||
start_time = time.time()
|
||||
result = await subagent_node.execute(subagent_ctx)
|
||||
latency_ms = int((time.time() - start_time) * 1000)
|
||||
|
||||
logger.info(
|
||||
"\n" + "-" * 60 + "\n"
|
||||
"✅ SUBAGENT '%s' COMPLETED\n"
|
||||
"-" * 60 + "\n"
|
||||
"Success: %s\n"
|
||||
"Latency: %dms\n"
|
||||
"Tokens used: %s\n"
|
||||
"Output keys: %s\n" + "-" * 60,
|
||||
agent_id,
|
||||
result.success,
|
||||
latency_ms,
|
||||
result.tokens_used,
|
||||
list(result.output.keys()) if result.output else [],
|
||||
)
|
||||
|
||||
result_json = {
|
||||
"message": (
|
||||
f"Sub-agent '{agent_id}' completed successfully"
|
||||
if result.success
|
||||
else f"Sub-agent '{agent_id}' failed: {result.error}"
|
||||
),
|
||||
"data": result.output,
|
||||
"reports": subagent_reports if subagent_reports else None,
|
||||
"metadata": {
|
||||
"agent_id": agent_id,
|
||||
"success": result.success,
|
||||
"tokens_used": result.tokens_used,
|
||||
"latency_ms": latency_ms,
|
||||
"report_count": len(subagent_reports),
|
||||
},
|
||||
}
|
||||
|
||||
return ToolResult(
|
||||
tool_use_id="",
|
||||
content=json.dumps(result_json, indent=2, default=str),
|
||||
is_error=not result.success,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(
|
||||
"\n" + "!" * 60 + "\n❌ SUBAGENT '%s' FAILED\nError: %s\n" + "!" * 60,
|
||||
agent_id,
|
||||
str(e),
|
||||
)
|
||||
result_json = {
|
||||
"message": f"Sub-agent '{agent_id}' raised exception: {e}",
|
||||
"data": None,
|
||||
"metadata": {
|
||||
"agent_id": agent_id,
|
||||
"success": False,
|
||||
"error": str(e),
|
||||
},
|
||||
}
|
||||
return ToolResult(
|
||||
tool_use_id="",
|
||||
content=json.dumps(result_json, indent=2),
|
||||
is_error=True,
|
||||
)
|
||||
|
||||
@@ -183,11 +183,12 @@ class GraphExecutor:
|
||||
self.tool_provider_map = tool_provider_map
|
||||
self.dynamic_tools_provider = dynamic_tools_provider
|
||||
|
||||
# Initialize output cleaner
|
||||
# Initialize output cleaner — uses its own dedicated fast model (CEREBRAS_API_KEY),
|
||||
# never the main agent LLM. Passing the main LLM here would cause expensive
|
||||
# Anthropic calls for output cleaning whenever ANTHROPIC_API_KEY is set.
|
||||
self.cleansing_config = cleansing_config or CleansingConfig()
|
||||
self.output_cleaner = OutputCleaner(
|
||||
config=self.cleansing_config,
|
||||
llm_provider=llm,
|
||||
)
|
||||
|
||||
# Parallel execution settings
|
||||
|
||||
@@ -154,69 +154,17 @@ class HITLProtocol:
|
||||
"""
|
||||
Parse human's raw input into structured response.
|
||||
|
||||
Uses Haiku to intelligently extract answers for each question.
|
||||
Maps the raw input to the first question. For multi-question HITL,
|
||||
the caller should present one question at a time.
|
||||
"""
|
||||
import os
|
||||
|
||||
response = HITLResponse(request_id=request.request_id, raw_input=raw_input)
|
||||
|
||||
# If no questions, just return raw input
|
||||
if not request.questions:
|
||||
return response
|
||||
|
||||
# Try to use Haiku for intelligent parsing
|
||||
api_key = os.environ.get("ANTHROPIC_API_KEY")
|
||||
if not use_haiku or not api_key:
|
||||
# Simple fallback: treat as answer to first question
|
||||
if request.questions:
|
||||
response.answers[request.questions[0].id] = raw_input
|
||||
return response
|
||||
|
||||
# Use Haiku to extract answers
|
||||
try:
|
||||
import json
|
||||
|
||||
import anthropic
|
||||
|
||||
questions_str = "\n".join(
|
||||
[f"{i + 1}. {q.question} (id: {q.id})" for i, q in enumerate(request.questions)]
|
||||
)
|
||||
|
||||
prompt = f"""Parse the user's response and extract answers for each question.
|
||||
|
||||
Questions asked:
|
||||
{questions_str}
|
||||
|
||||
User's response:
|
||||
{raw_input}
|
||||
|
||||
Extract the answer for each question. Output JSON with question IDs as keys.
|
||||
|
||||
Example format:
|
||||
{{"question-1": "answer here", "question-2": "answer here"}}"""
|
||||
|
||||
client = anthropic.Anthropic(api_key=api_key)
|
||||
message = client.messages.create(
|
||||
model="claude-haiku-4-5-20251001",
|
||||
max_tokens=500,
|
||||
messages=[{"role": "user", "content": prompt}],
|
||||
)
|
||||
|
||||
# Parse Haiku's response
|
||||
import re
|
||||
|
||||
response_text = message.content[0].text.strip()
|
||||
json_match = re.search(r"\{[^{}]*\}", response_text, re.DOTALL)
|
||||
|
||||
if json_match:
|
||||
parsed = json.loads(json_match.group())
|
||||
response.answers = parsed
|
||||
|
||||
except Exception:
|
||||
# Fallback: use raw input for first question
|
||||
if request.questions:
|
||||
response.answers[request.questions[0].id] = raw_input
|
||||
|
||||
# Map raw input to first question
|
||||
response.answers[request.questions[0].id] = raw_input
|
||||
return response
|
||||
|
||||
@staticmethod
|
||||
|
||||
@@ -585,7 +585,6 @@ class NodeResult:
|
||||
Generate a human-readable summary of this node's execution and output.
|
||||
|
||||
This is like toString() - it describes what the node produced in its current state.
|
||||
Uses Haiku to intelligently summarize complex outputs.
|
||||
"""
|
||||
if not self.success:
|
||||
return f"❌ Failed: {self.error}"
|
||||
@@ -593,59 +592,13 @@ class NodeResult:
|
||||
if not self.output:
|
||||
return "✓ Completed (no output)"
|
||||
|
||||
# Use Haiku to generate intelligent summary
|
||||
import os
|
||||
|
||||
api_key = os.environ.get("ANTHROPIC_API_KEY")
|
||||
|
||||
if not api_key:
|
||||
# Fallback: simple key-value listing
|
||||
parts = [f"✓ Completed with {len(self.output)} outputs:"]
|
||||
for key, value in list(self.output.items())[:5]: # Limit to 5 keys
|
||||
value_str = str(value)[:100]
|
||||
if len(str(value)) > 100:
|
||||
value_str += "..."
|
||||
parts.append(f" • {key}: {value_str}")
|
||||
return "\n".join(parts)
|
||||
|
||||
# Use Haiku to generate intelligent summary
|
||||
try:
|
||||
import json
|
||||
|
||||
import anthropic
|
||||
|
||||
node_context = ""
|
||||
if node_spec:
|
||||
node_context = f"\nNode: {node_spec.name}\nPurpose: {node_spec.description}"
|
||||
|
||||
output_json = json.dumps(self.output, indent=2, default=str)[:2000]
|
||||
prompt = (
|
||||
f"Generate a 1-2 sentence human-readable summary of "
|
||||
f"what this node produced.{node_context}\n\n"
|
||||
f"Node output:\n{output_json}\n\n"
|
||||
"Provide a concise, clear summary that a human can quickly "
|
||||
"understand. Focus on the key information produced."
|
||||
)
|
||||
|
||||
client = anthropic.Anthropic(api_key=api_key)
|
||||
message = client.messages.create(
|
||||
model="claude-haiku-4-5-20251001",
|
||||
max_tokens=200,
|
||||
messages=[{"role": "user", "content": prompt}],
|
||||
)
|
||||
|
||||
summary = message.content[0].text.strip()
|
||||
return f"✓ {summary}"
|
||||
|
||||
except Exception:
|
||||
# Fallback on error
|
||||
parts = [f"✓ Completed with {len(self.output)} outputs:"]
|
||||
for key, value in list(self.output.items())[:3]:
|
||||
value_str = str(value)[:80]
|
||||
if len(str(value)) > 80:
|
||||
value_str += "..."
|
||||
parts.append(f" • {key}: {value_str}")
|
||||
return "\n".join(parts)
|
||||
parts = [f"✓ Completed with {len(self.output)} outputs:"]
|
||||
for key, value in list(self.output.items())[:5]: # Limit to 5 keys
|
||||
value_str = str(value)[:100]
|
||||
if len(str(value)) > 100:
|
||||
value_str += "..."
|
||||
parts.append(f" • {key}: {value_str}")
|
||||
return "\n".join(parts)
|
||||
|
||||
|
||||
class NodeProtocol(ABC):
|
||||
|
||||
@@ -2899,6 +2899,7 @@ def run_tests(
|
||||
text=True,
|
||||
timeout=600, # 10 minute timeout
|
||||
env=env,
|
||||
stdin=subprocess.DEVNULL,
|
||||
)
|
||||
except subprocess.TimeoutExpired:
|
||||
return json.dumps(
|
||||
@@ -3091,6 +3092,7 @@ def debug_test(
|
||||
text=True,
|
||||
timeout=120, # 2 minute timeout for single test
|
||||
env=env,
|
||||
stdin=subprocess.DEVNULL,
|
||||
)
|
||||
except subprocess.TimeoutExpired:
|
||||
return json.dumps(
|
||||
|
||||
@@ -1090,62 +1090,19 @@ def _interactive_approval(request):
|
||||
def _format_natural_language_to_json(
|
||||
user_input: str, input_keys: list[str], agent_description: str, session_context: dict = None
|
||||
) -> dict:
|
||||
"""Use Haiku to convert natural language input to JSON based on agent's input schema."""
|
||||
import os
|
||||
"""Convert natural language input to JSON based on agent's input schema.
|
||||
|
||||
import anthropic
|
||||
Maps user input to the primary input field. For follow-up inputs,
|
||||
appends to the existing value.
|
||||
"""
|
||||
main_field = input_keys[0] if input_keys else "objective"
|
||||
|
||||
client = anthropic.Anthropic(api_key=os.environ.get("ANTHROPIC_API_KEY"))
|
||||
|
||||
# Build prompt for Haiku
|
||||
session_info = ""
|
||||
if session_context:
|
||||
# Extract the main field (usually 'objective') that we'll append to
|
||||
main_field = input_keys[0] if input_keys else "objective"
|
||||
existing_value = session_context.get(main_field, "")
|
||||
if existing_value:
|
||||
return {main_field: f"{existing_value}\n\n{user_input}"}
|
||||
|
||||
session_info = (
|
||||
f'\n\nExisting {main_field}: "{existing_value}"\n\n'
|
||||
f"The user is providing ADDITIONAL information. Append this new "
|
||||
f"information to the existing {main_field} to create an enriched, "
|
||||
"more detailed version."
|
||||
)
|
||||
|
||||
prompt = f"""You are formatting user input for an agent that requires specific input fields.
|
||||
|
||||
Agent: {agent_description}
|
||||
|
||||
Required input fields: {", ".join(input_keys)}{session_info}
|
||||
|
||||
User input: {user_input}
|
||||
|
||||
{"If this is a follow-up, APPEND new info to the existing field value." if session_context else ""}
|
||||
|
||||
Output ONLY valid JSON, no explanation:"""
|
||||
|
||||
try:
|
||||
message = client.messages.create(
|
||||
model="claude-haiku-4-5-20251001", # Fast and cheap
|
||||
max_tokens=500,
|
||||
messages=[{"role": "user", "content": prompt}],
|
||||
)
|
||||
|
||||
json_str = message.content[0].text.strip()
|
||||
# Remove markdown code blocks if present
|
||||
if json_str.startswith("```"):
|
||||
json_str = json_str.split("```")[1]
|
||||
if json_str.startswith("json"):
|
||||
json_str = json_str[4:]
|
||||
json_str = json_str.strip()
|
||||
|
||||
return json.loads(json_str)
|
||||
except Exception:
|
||||
# Fallback: try to infer the main field
|
||||
if len(input_keys) == 1:
|
||||
return {input_keys[0]: user_input}
|
||||
else:
|
||||
# Put it in the first field as fallback
|
||||
return {input_keys[0]: user_input}
|
||||
return {main_field: user_input}
|
||||
|
||||
|
||||
def cmd_shell(args: argparse.Namespace) -> int:
|
||||
|
||||
@@ -7,6 +7,8 @@ Supports both STDIO and HTTP transports using the official MCP Python SDK.
|
||||
import asyncio
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
import threading
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any, Literal
|
||||
|
||||
@@ -73,6 +75,8 @@ class MCPClient:
|
||||
# Background event loop for persistent STDIO connection
|
||||
self._loop = None
|
||||
self._loop_thread = None
|
||||
# Serialize STDIO tool calls (avoids races, helps on Windows)
|
||||
self._stdio_call_lock = threading.Lock()
|
||||
|
||||
def _run_async(self, coro):
|
||||
"""
|
||||
@@ -156,11 +160,19 @@ class MCPClient:
|
||||
# Create server parameters
|
||||
# Always inherit parent environment and merge with any custom env vars
|
||||
merged_env = {**os.environ, **(self.config.env or {})}
|
||||
# On Windows, passing cwd can cause WinError 267 ("invalid directory name").
|
||||
# tool_registry passes cwd=None and uses absolute script paths when applicable.
|
||||
cwd = self.config.cwd
|
||||
if os.name == "nt" and cwd is not None:
|
||||
# Avoid passing cwd on Windows; tool_registry should have set cwd=None
|
||||
# and absolute script paths for tools-dir servers. If cwd is still set,
|
||||
# pass None to prevent WinError 267 (caller should use absolute paths).
|
||||
cwd = None
|
||||
server_params = StdioServerParameters(
|
||||
command=self.config.command,
|
||||
args=self.config.args,
|
||||
env=merged_env,
|
||||
cwd=self.config.cwd,
|
||||
cwd=cwd,
|
||||
)
|
||||
|
||||
# Store for later use
|
||||
@@ -184,10 +196,12 @@ class MCPClient:
|
||||
from mcp.client.stdio import stdio_client
|
||||
|
||||
# Create persistent stdio client context.
|
||||
# Redirect server stderr to devnull to prevent raw
|
||||
# output from leaking behind the TUI.
|
||||
devnull = open(os.devnull, "w") # noqa: SIM115
|
||||
self._stdio_context = stdio_client(server_params, errlog=devnull)
|
||||
# On Windows, use stderr so subprocess startup errors are visible.
|
||||
if os.name == "nt":
|
||||
errlog = sys.stderr
|
||||
else:
|
||||
errlog = open(os.devnull, "w") # noqa: SIM115
|
||||
self._stdio_context = stdio_client(server_params, errlog=errlog)
|
||||
(
|
||||
self._read_stream,
|
||||
self._write_stream,
|
||||
@@ -353,7 +367,8 @@ class MCPClient:
|
||||
raise ValueError(f"Unknown tool: {tool_name}")
|
||||
|
||||
if self.config.transport == "stdio":
|
||||
return self._run_async(self._call_tool_stdio_async(tool_name, arguments))
|
||||
with self._stdio_call_lock:
|
||||
return self._run_async(self._call_tool_stdio_async(tool_name, arguments))
|
||||
else:
|
||||
return self._call_tool_http(tool_name, arguments)
|
||||
|
||||
@@ -448,11 +463,15 @@ class MCPClient:
|
||||
if self._stdio_context:
|
||||
await self._stdio_context.__aexit__(None, None, None)
|
||||
except asyncio.CancelledError:
|
||||
logger.warning(
|
||||
logger.debug(
|
||||
"STDIO context cleanup was cancelled; proceeding with best-effort shutdown"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(f"Error closing STDIO context: {e}")
|
||||
msg = str(e).lower()
|
||||
if "cancel scope" in msg or "different task" in msg:
|
||||
logger.debug("STDIO context teardown (known anyio quirk): %s", e)
|
||||
else:
|
||||
logger.warning(f"Error closing STDIO context: {e}")
|
||||
finally:
|
||||
self._stdio_context = None
|
||||
|
||||
|
||||
@@ -326,6 +326,103 @@ class ToolRegistry:
|
||||
"""Restore execution context to its previous state."""
|
||||
_execution_context.reset(token)
|
||||
|
||||
@staticmethod
|
||||
def resolve_mcp_stdio_config(server_config: dict[str, Any], base_dir: Path) -> dict[str, Any]:
|
||||
"""Resolve cwd and script paths for MCP stdio config (Windows compatibility).
|
||||
|
||||
Use this when building MCPServerConfig from a config file (e.g. in
|
||||
list_agent_tools, discover_mcp_tools) so hive-tools and other servers
|
||||
work on Windows. Call with base_dir = directory containing the config.
|
||||
"""
|
||||
registry = ToolRegistry()
|
||||
return registry._resolve_mcp_server_config(server_config, base_dir)
|
||||
|
||||
def _resolve_mcp_server_config(
|
||||
self, server_config: dict[str, Any], base_dir: Path
|
||||
) -> dict[str, Any]:
|
||||
"""Resolve cwd and script paths for MCP stdio servers (Windows compatibility).
|
||||
|
||||
On Windows, passing cwd to subprocess can cause WinError 267. We use cwd=None
|
||||
and absolute script paths when the server runs a .py script from the tools dir.
|
||||
If the resolved cwd doesn't exist (e.g. config from ~/.hive/agents/), fall back
|
||||
to Path.cwd() / "tools".
|
||||
"""
|
||||
config = dict(server_config)
|
||||
if config.get("transport") != "stdio":
|
||||
return config
|
||||
|
||||
cwd = config.get("cwd")
|
||||
args = list(config.get("args", []))
|
||||
if not cwd and not args:
|
||||
return config
|
||||
|
||||
# Resolve cwd relative to base_dir
|
||||
resolved_cwd: Path | None = None
|
||||
if cwd:
|
||||
if Path(cwd).is_absolute():
|
||||
resolved_cwd = Path(cwd)
|
||||
else:
|
||||
resolved_cwd = (base_dir / cwd).resolve()
|
||||
|
||||
# Find .py script in args (e.g. coder_tools_server.py, files_server.py)
|
||||
script_name = None
|
||||
for i, arg in enumerate(args):
|
||||
if isinstance(arg, str) and arg.endswith(".py"):
|
||||
script_name = arg
|
||||
script_idx = i
|
||||
break
|
||||
|
||||
if resolved_cwd is None:
|
||||
return config
|
||||
|
||||
# If resolved cwd doesn't exist or (when we have a script) doesn't contain it,
|
||||
# try fallback
|
||||
tools_fallback = Path.cwd() / "tools"
|
||||
need_fallback = not resolved_cwd.is_dir()
|
||||
if script_name and not need_fallback:
|
||||
need_fallback = not (resolved_cwd / script_name).exists()
|
||||
if need_fallback:
|
||||
fallback_ok = tools_fallback.is_dir()
|
||||
if script_name:
|
||||
fallback_ok = fallback_ok and (tools_fallback / script_name).exists()
|
||||
else:
|
||||
# No script (e.g. GCU); just need tools dir to exist
|
||||
pass
|
||||
if fallback_ok:
|
||||
resolved_cwd = tools_fallback
|
||||
logger.debug(
|
||||
"MCP server '%s': using fallback tools dir %s",
|
||||
config.get("name", "?"),
|
||||
resolved_cwd,
|
||||
)
|
||||
else:
|
||||
config["cwd"] = str(resolved_cwd)
|
||||
return config
|
||||
|
||||
if not script_name:
|
||||
# No .py script (e.g. GCU uses -m gcu.server); just set cwd
|
||||
config["cwd"] = str(resolved_cwd)
|
||||
return config
|
||||
|
||||
# For coder_tools_server, inject --project-root so writes go to the expected workspace
|
||||
if script_name and "coder_tools" in script_name:
|
||||
project_root = str(resolved_cwd.parent.resolve())
|
||||
args = list(args)
|
||||
if "--project-root" not in args:
|
||||
args.extend(["--project-root", project_root])
|
||||
config["args"] = args
|
||||
|
||||
if os.name == "nt":
|
||||
# Windows: cwd=None avoids WinError 267; use absolute script path
|
||||
config["cwd"] = None
|
||||
abs_script = str((resolved_cwd / script_name).resolve())
|
||||
args = list(config["args"])
|
||||
args[script_idx] = abs_script
|
||||
config["args"] = args
|
||||
else:
|
||||
config["cwd"] = str(resolved_cwd)
|
||||
return config
|
||||
|
||||
def load_mcp_config(self, config_path: Path) -> None:
|
||||
"""
|
||||
Load and register MCP servers from a config file.
|
||||
@@ -357,9 +454,7 @@ class ToolRegistry:
|
||||
server_list = [{"name": name, **cfg} for name, cfg in config.items()]
|
||||
|
||||
for server_config in server_list:
|
||||
cwd = server_config.get("cwd")
|
||||
if cwd and not Path(cwd).is_absolute():
|
||||
server_config["cwd"] = str((base_dir / cwd).resolve())
|
||||
server_config = self._resolve_mcp_server_config(server_config, base_dir)
|
||||
try:
|
||||
self.register_mcp_server(server_config)
|
||||
except Exception as e:
|
||||
@@ -480,6 +575,11 @@ class ToolRegistry:
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to register MCP server: {e}")
|
||||
if "Connection closed" in str(e) and os.name == "nt":
|
||||
logger.debug(
|
||||
"On Windows, check that the MCP subprocess starts (e.g. uv in PATH, "
|
||||
"script path correct). Worker config uses base_dir = mcp_servers.json parent."
|
||||
)
|
||||
return 0
|
||||
|
||||
def _convert_mcp_tool_to_framework_tool(self, mcp_tool: Any) -> Tool:
|
||||
|
||||
+5
-5
@@ -145,7 +145,7 @@ Implement the core execution engine where every Agent operates as an isolated, a
|
||||
- [x] SharedState manager (runtime/shared_state.py)
|
||||
- [x] Session-based storage (storage/session_store.py)
|
||||
- [x] Isolation levels: ISOLATED, SHARED, SYNCHRONIZED
|
||||
- [ ] **Default Monitoring Hooks**
|
||||
- [x] **Default Monitoring Hooks**
|
||||
- [ ] Performance metrics collection
|
||||
- [ ] Resource usage tracking
|
||||
- [ ] Health check endpoints
|
||||
@@ -590,7 +590,7 @@ Write the Quick Start guide, detailed tool usage documentation, and set up the M
|
||||
- [x] README with examples
|
||||
- [x] Contributing guidelines
|
||||
- [x] GitHub Page setup
|
||||
- [ ] **Tool Usage Documentation**
|
||||
- [x] **Tool Usage Documentation**
|
||||
- [ ] Comprehensive tool documentation
|
||||
- [ ] Tool integration examples
|
||||
- [ ] Best practices guide
|
||||
@@ -643,7 +643,7 @@ Expose basic REST/WebSocket endpoints for external control (Start, Stop, Pause,
|
||||
- [x] Load/unload/start/restart in AgentRuntime
|
||||
- [x] State persistence
|
||||
- [x] Recovery mechanisms
|
||||
- [ ] **REST API Endpoints**
|
||||
- [x] **REST API Endpoints**
|
||||
- [ ] Start endpoint for agent execution
|
||||
- [ ] Stop endpoint for graceful shutdown
|
||||
- [ ] Pause endpoint for execution suspension
|
||||
@@ -661,7 +661,7 @@ Implement automated test execution, agent version control, and mandatory test-pa
|
||||
- [x] Test framework with pytest integration (testing/)
|
||||
- [x] Test result reporting
|
||||
- [x] Test CLI commands (test-run, test-debug, etc.)
|
||||
- [ ] **Automated Testing Pipeline**
|
||||
- [x] **Automated Testing Pipeline**
|
||||
- [ ] CI integration (GitHub Actions, etc.)
|
||||
- [ ] Mandatory test-passing gates
|
||||
- [ ] Coverage reporting
|
||||
@@ -873,7 +873,7 @@ Build native frontend configurations to easily connect Open Hive's backend to lo
|
||||
- [ ] Node.js runtime support
|
||||
- [ ] Browser runtime support
|
||||
- [ ] **Platform Compatibility**
|
||||
- [ ] Windows support improvements
|
||||
- [x] Windows support improvements
|
||||
- [ ] macOS optimization
|
||||
- [ ] Linux distribution support
|
||||
|
||||
|
||||
@@ -78,5 +78,6 @@ if (-not $env:HIVE_CREDENTIAL_KEY) {
|
||||
}
|
||||
|
||||
# ── Run the Hive CLI ────────────────────────────────────────────────
|
||||
|
||||
# PYTHONUTF8=1: use UTF-8 for default encoding (fixes charmap decode errors on Windows)
|
||||
$env:PYTHONUTF8 = "1"
|
||||
& uv run hive @args
|
||||
|
||||
+15
-6
@@ -130,8 +130,8 @@ function Test-DefenderExclusions {
|
||||
|
||||
# Normalize and filter null/empty values
|
||||
$safePrefixes = $safePrefixes | Where-Object { $_ } | ForEach-Object {
|
||||
[System.IO.Path]::GetFullPath($_)
|
||||
}
|
||||
try { [System.IO.Path]::GetFullPath($_) } catch { $null }
|
||||
} | Where-Object { $_ }
|
||||
|
||||
try {
|
||||
# Check if Defender cmdlets are available (may not exist on older Windows)
|
||||
@@ -157,15 +157,20 @@ function Test-DefenderExclusions {
|
||||
$existing = $prefs.ExclusionPath
|
||||
if (-not $existing) { $existing = @() }
|
||||
|
||||
# Normalize existing paths for comparison
|
||||
# Normalize existing paths for comparison (some may contain wildcards
|
||||
# or env vars that GetFullPath rejects — skip those gracefully)
|
||||
$existing = $existing | Where-Object { $_ } | ForEach-Object {
|
||||
[System.IO.Path]::GetFullPath($_)
|
||||
try { [System.IO.Path]::GetFullPath($_) } catch { $_ }
|
||||
}
|
||||
|
||||
# Normalize paths and find missing exclusions
|
||||
$missing = @()
|
||||
foreach ($path in $Paths) {
|
||||
$normalized = [System.IO.Path]::GetFullPath($path)
|
||||
try {
|
||||
$normalized = [System.IO.Path]::GetFullPath($path)
|
||||
} catch {
|
||||
continue # Skip paths with unsupported format
|
||||
}
|
||||
|
||||
# Security: Ensure path is within safe boundaries
|
||||
$isSafe = $false
|
||||
@@ -250,7 +255,11 @@ function Add-DefenderExclusions {
|
||||
|
||||
foreach ($path in $Paths) {
|
||||
try {
|
||||
$normalized = [System.IO.Path]::GetFullPath($path)
|
||||
try {
|
||||
$normalized = [System.IO.Path]::GetFullPath($path)
|
||||
} catch {
|
||||
$normalized = $path # Use raw path if normalization fails
|
||||
}
|
||||
Add-MpPreference -ExclusionPath $normalized -ErrorAction Stop
|
||||
$added += $normalized
|
||||
} catch {
|
||||
|
||||
+12
-8
@@ -72,6 +72,7 @@ python mcp_server.py
|
||||
| `apply_diff` | Apply diff patches to files |
|
||||
| `apply_patch` | Apply unified patches to files |
|
||||
| `grep_search` | Search file contents with regex |
|
||||
| `hashline_edit` | Anchor-based file editing with hash-validated line references |
|
||||
| `execute_command_tool` | Execute shell commands |
|
||||
| `save_data` / `load_data` | Persist and retrieve structured data across steps |
|
||||
| `serve_file_to_user` | Serve a file for the user to download |
|
||||
@@ -175,14 +176,17 @@ tools/
|
||||
│ └── tools/ # Tool implementations
|
||||
│ ├── example_tool/
|
||||
│ ├── file_system_toolkits/ # File operation tools
|
||||
│ │ ├── view_file.py
|
||||
│ │ ├── write_to_file.py
|
||||
│ │ ├── list_dir.py
|
||||
│ │ ├── replace_file_content.py
|
||||
│ │ ├── apply_diff.py
|
||||
│ │ ├── apply_patch.py
|
||||
│ │ ├── grep_search.py
|
||||
│ │ └── execute_command_tool.py
|
||||
│ │ ├── security.py
|
||||
│ │ ├── hashline.py
|
||||
│ │ ├── view_file/
|
||||
│ │ ├── write_to_file/
|
||||
│ │ ├── list_dir/
|
||||
│ │ ├── replace_file_content/
|
||||
│ │ ├── apply_diff/
|
||||
│ │ ├── apply_patch/
|
||||
│ │ ├── grep_search/
|
||||
│ │ ├── hashline_edit/
|
||||
│ │ └── execute_command_tool/
|
||||
│ ├── web_search_tool/
|
||||
│ ├── web_scrape_tool/
|
||||
│ ├── pdf_read_tool/
|
||||
|
||||
+124
-37
@@ -71,8 +71,49 @@ def _find_project_root() -> str:
|
||||
|
||||
def _resolve_path(path: str) -> str:
|
||||
"""Resolve path relative to PROJECT_ROOT. Raises ValueError if outside."""
|
||||
# Normalize slashes for cross-platform (e.g. exports/hi_agent from LLM)
|
||||
path = path.replace("/", os.sep)
|
||||
if os.path.isabs(path):
|
||||
resolved = os.path.abspath(path)
|
||||
try:
|
||||
common = os.path.commonpath([resolved, PROJECT_ROOT])
|
||||
except ValueError:
|
||||
common = ""
|
||||
if common != PROJECT_ROOT:
|
||||
# LLM may emit wrong-root paths (/mnt/data, /workspace, etc.).
|
||||
# Strip known prefixes and treat the remainder as relative to PROJECT_ROOT.
|
||||
path_norm = path.replace("\\", "/")
|
||||
for prefix in (
|
||||
"/mnt/data/",
|
||||
"/mnt/data",
|
||||
"/workspace/",
|
||||
"/workspace",
|
||||
"/repo/",
|
||||
"/repo",
|
||||
):
|
||||
p = prefix.rstrip("/") + "/"
|
||||
prefix_stripped = prefix.rstrip("/")
|
||||
if path_norm.startswith(p) or (
|
||||
path_norm.startswith(prefix_stripped) and len(path_norm) > len(prefix)
|
||||
):
|
||||
suffix = path_norm[len(prefix_stripped) :].lstrip("/")
|
||||
if suffix:
|
||||
path = suffix.replace("/", os.sep)
|
||||
resolved = os.path.abspath(os.path.join(PROJECT_ROOT, path))
|
||||
break
|
||||
else:
|
||||
# Try extracting exports/ or core/ subpath from the absolute path
|
||||
parts = path.split(os.sep)
|
||||
if "exports" in parts:
|
||||
idx = parts.index("exports")
|
||||
path = os.sep.join(parts[idx:])
|
||||
resolved = os.path.abspath(os.path.join(PROJECT_ROOT, path))
|
||||
elif "core" in parts:
|
||||
idx = parts.index("core")
|
||||
path = os.sep.join(parts[idx:])
|
||||
resolved = os.path.abspath(os.path.join(PROJECT_ROOT, path))
|
||||
else:
|
||||
raise ValueError(f"Access denied: '{path}' is outside the project root.")
|
||||
else:
|
||||
resolved = os.path.abspath(os.path.join(PROJECT_ROOT, path))
|
||||
try:
|
||||
@@ -91,11 +132,7 @@ def _snapshot_git(*args: str) -> str:
|
||||
"""Run a git command with the snapshot GIT_DIR and PROJECT_ROOT worktree."""
|
||||
cmd = ["git", "--git-dir", SNAPSHOT_DIR, "--work-tree", PROJECT_ROOT, *args]
|
||||
result = subprocess.run(
|
||||
cmd,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=30,
|
||||
encoding="utf-8",
|
||||
cmd, capture_output=True, text=True, timeout=30, encoding="utf-8", stdin=subprocess.DEVNULL
|
||||
)
|
||||
return result.stdout.strip()
|
||||
|
||||
@@ -110,6 +147,7 @@ def _ensure_snapshot_repo():
|
||||
["git", "init", "--bare", SNAPSHOT_DIR],
|
||||
capture_output=True,
|
||||
timeout=10,
|
||||
stdin=subprocess.DEVNULL,
|
||||
encoding="utf-8",
|
||||
)
|
||||
_snapshot_git("config", "core.autocrlf", "false")
|
||||
@@ -132,6 +170,37 @@ def _take_snapshot() -> str:
|
||||
MAX_COMMAND_OUTPUT = 30_000 # chars before truncation
|
||||
|
||||
|
||||
def _translate_command_for_windows(command: str) -> str:
|
||||
"""Translate common Unix commands to Windows equivalents."""
|
||||
if os.name != "nt":
|
||||
return command
|
||||
cmd = command.strip()
|
||||
|
||||
# mkdir -p: Unix creates parents; Windows mkdir already does; -p becomes a dir name
|
||||
if cmd.startswith("mkdir -p ") or cmd.startswith("mkdir -p\t"):
|
||||
rest = cmd[9:].lstrip().replace("/", os.sep)
|
||||
return "mkdir " + rest
|
||||
|
||||
# ls / pwd: cmd.exe uses dir and cd
|
||||
# Order matters: replace longer patterns first
|
||||
for unix, win in [
|
||||
("ls -la", "dir /a"),
|
||||
("ls -al", "dir /a"),
|
||||
("ls -l", "dir"),
|
||||
("ls -a", "dir /a"),
|
||||
("ls ", "dir "),
|
||||
("pwd", "cd"),
|
||||
]:
|
||||
cmd = cmd.replace(unix, win)
|
||||
# Standalone "ls" at end (e.g. "cd x && ls")
|
||||
if cmd.endswith(" ls"):
|
||||
cmd = cmd[:-3] + " dir"
|
||||
elif cmd == "ls":
|
||||
cmd = "dir"
|
||||
|
||||
return cmd
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
def run_command(command: str, cwd: str = "", timeout: int = 120) -> str:
|
||||
"""Execute a shell command in the project context.
|
||||
@@ -151,6 +220,7 @@ def run_command(command: str, cwd: str = "", timeout: int = 120) -> str:
|
||||
work_dir = _resolve_path(cwd) if cwd else PROJECT_ROOT
|
||||
|
||||
try:
|
||||
command = _translate_command_for_windows(command)
|
||||
start = time.monotonic()
|
||||
result = subprocess.run(
|
||||
command,
|
||||
@@ -159,12 +229,16 @@ def run_command(command: str, cwd: str = "", timeout: int = 120) -> str:
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=timeout,
|
||||
stdin=subprocess.DEVNULL,
|
||||
encoding="utf-8",
|
||||
env={
|
||||
**os.environ,
|
||||
"PYTHONPATH": (
|
||||
f"{PROJECT_ROOT}/core:{PROJECT_ROOT}/exports"
|
||||
f":{PROJECT_ROOT}/core/framework/agents"
|
||||
"PYTHONPATH": os.pathsep.join(
|
||||
[
|
||||
os.path.join(PROJECT_ROOT, "core"),
|
||||
os.path.join(PROJECT_ROOT, "exports"),
|
||||
os.path.join(PROJECT_ROOT, "core", "framework", "agents"),
|
||||
]
|
||||
),
|
||||
},
|
||||
)
|
||||
@@ -236,6 +310,7 @@ def undo_changes(path: str = "") -> str:
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=10,
|
||||
stdin=subprocess.DEVNULL,
|
||||
encoding="utf-8",
|
||||
)
|
||||
return f"Restored: {path}"
|
||||
@@ -309,28 +384,31 @@ def list_agent_tools(
|
||||
return json.dumps({"error": f"Failed to read config: {e}"})
|
||||
|
||||
try:
|
||||
from pathlib import Path
|
||||
|
||||
from framework.runner.mcp_client import MCPClient, MCPServerConfig
|
||||
from framework.runner.tool_registry import ToolRegistry
|
||||
except ImportError:
|
||||
return json.dumps({"error": "Cannot import MCPClient"})
|
||||
|
||||
all_tools: list[dict] = []
|
||||
errors = []
|
||||
config_dir = os.path.dirname(config_path)
|
||||
config_dir = Path(config_path).parent
|
||||
|
||||
for server_name, server_conf in servers_config.items():
|
||||
cwd = server_conf.get("cwd", "")
|
||||
if cwd and not os.path.isabs(cwd):
|
||||
cwd = os.path.abspath(os.path.join(config_dir, cwd))
|
||||
resolved = ToolRegistry.resolve_mcp_stdio_config(
|
||||
{"name": server_name, **server_conf}, config_dir
|
||||
)
|
||||
try:
|
||||
config = MCPServerConfig(
|
||||
name=server_name,
|
||||
transport=server_conf.get("transport", "stdio"),
|
||||
command=server_conf.get("command"),
|
||||
args=server_conf.get("args", []),
|
||||
env=server_conf.get("env", {}),
|
||||
cwd=cwd or None,
|
||||
url=server_conf.get("url"),
|
||||
headers=server_conf.get("headers", {}),
|
||||
transport=resolved.get("transport", "stdio"),
|
||||
command=resolved.get("command"),
|
||||
args=resolved.get("args", []),
|
||||
env=resolved.get("env", {}),
|
||||
cwd=resolved.get("cwd"),
|
||||
url=resolved.get("url"),
|
||||
headers=resolved.get("headers", {}),
|
||||
)
|
||||
client = MCPClient(config)
|
||||
client.connect()
|
||||
@@ -419,19 +497,24 @@ def validate_agent_tools(agent_path: str) -> str:
|
||||
if not os.path.isdir(resolved):
|
||||
return json.dumps({"error": f"Agent directory not found: {agent_path}"})
|
||||
|
||||
agent_dir = resolved # Keep path; 'resolved' is reused for MCP config in loop
|
||||
|
||||
# --- Discover available tools from agent's MCP servers ---
|
||||
mcp_config_path = os.path.join(resolved, "mcp_servers.json")
|
||||
mcp_config_path = os.path.join(agent_dir, "mcp_servers.json")
|
||||
if not os.path.isfile(mcp_config_path):
|
||||
return json.dumps({"error": f"No mcp_servers.json found in {agent_path}"})
|
||||
|
||||
try:
|
||||
from pathlib import Path
|
||||
|
||||
from framework.runner.mcp_client import MCPClient, MCPServerConfig
|
||||
from framework.runner.tool_registry import ToolRegistry
|
||||
except ImportError:
|
||||
return json.dumps({"error": "Cannot import MCPClient"})
|
||||
|
||||
available_tools: set[str] = set()
|
||||
discovery_errors = []
|
||||
config_dir = os.path.dirname(mcp_config_path)
|
||||
config_dir = Path(mcp_config_path).parent
|
||||
|
||||
try:
|
||||
with open(mcp_config_path, encoding="utf-8") as f:
|
||||
@@ -440,19 +523,19 @@ def validate_agent_tools(agent_path: str) -> str:
|
||||
return json.dumps({"error": f"Failed to read mcp_servers.json: {e}"})
|
||||
|
||||
for server_name, server_conf in servers_config.items():
|
||||
cwd = server_conf.get("cwd", "")
|
||||
if cwd and not os.path.isabs(cwd):
|
||||
cwd = os.path.abspath(os.path.join(config_dir, cwd))
|
||||
resolved = ToolRegistry.resolve_mcp_stdio_config(
|
||||
{"name": server_name, **server_conf}, config_dir
|
||||
)
|
||||
try:
|
||||
config = MCPServerConfig(
|
||||
name=server_name,
|
||||
transport=server_conf.get("transport", "stdio"),
|
||||
command=server_conf.get("command"),
|
||||
args=server_conf.get("args", []),
|
||||
env=server_conf.get("env", {}),
|
||||
cwd=cwd or None,
|
||||
url=server_conf.get("url"),
|
||||
headers=server_conf.get("headers", {}),
|
||||
transport=resolved.get("transport", "stdio"),
|
||||
command=resolved.get("command"),
|
||||
args=resolved.get("args", []),
|
||||
env=resolved.get("env", {}),
|
||||
cwd=resolved.get("cwd"),
|
||||
url=resolved.get("url"),
|
||||
headers=resolved.get("headers", {}),
|
||||
)
|
||||
client = MCPClient(config)
|
||||
client.connect()
|
||||
@@ -463,7 +546,7 @@ def validate_agent_tools(agent_path: str) -> str:
|
||||
discovery_errors.append({"server": server_name, "error": str(e)})
|
||||
|
||||
# --- Load agent nodes and extract declared tools ---
|
||||
agent_py = os.path.join(resolved, "agent.py")
|
||||
agent_py = os.path.join(agent_dir, "agent.py")
|
||||
if not os.path.isfile(agent_py):
|
||||
return json.dumps({"error": f"No agent.py found in {agent_path}"})
|
||||
|
||||
@@ -471,8 +554,8 @@ def validate_agent_tools(agent_path: str) -> str:
|
||||
import importlib.util
|
||||
import sys
|
||||
|
||||
package_name = os.path.basename(resolved)
|
||||
parent_dir = os.path.dirname(os.path.abspath(resolved))
|
||||
package_name = os.path.basename(agent_dir)
|
||||
parent_dir = os.path.dirname(os.path.abspath(agent_dir))
|
||||
if parent_dir not in sys.path:
|
||||
sys.path.insert(0, parent_dir)
|
||||
|
||||
@@ -1015,13 +1098,16 @@ def run_agent_tests(
|
||||
cmd.append("-x")
|
||||
cmd.append("--tb=short")
|
||||
|
||||
# Set PYTHONPATH
|
||||
# Set PYTHONPATH (use pathsep for Windows)
|
||||
env = os.environ.copy()
|
||||
pythonpath = env.get("PYTHONPATH", "")
|
||||
core_path = os.path.join(PROJECT_ROOT, "core")
|
||||
exports_path = os.path.join(PROJECT_ROOT, "exports")
|
||||
fw_agents_path = os.path.join(PROJECT_ROOT, "core", "framework", "agents")
|
||||
env["PYTHONPATH"] = f"{core_path}:{exports_path}:{fw_agents_path}:{PROJECT_ROOT}:{pythonpath}"
|
||||
path_parts = [core_path, exports_path, fw_agents_path, PROJECT_ROOT]
|
||||
if pythonpath:
|
||||
path_parts.append(pythonpath)
|
||||
env["PYTHONPATH"] = os.pathsep.join(path_parts)
|
||||
|
||||
try:
|
||||
result = subprocess.run(
|
||||
@@ -1030,6 +1116,7 @@ def run_agent_tests(
|
||||
text=True,
|
||||
timeout=120,
|
||||
env=env,
|
||||
stdin=subprocess.DEVNULL,
|
||||
encoding="utf-8",
|
||||
)
|
||||
except subprocess.TimeoutExpired:
|
||||
@@ -1154,7 +1241,7 @@ def main() -> None:
|
||||
register_file_tools(
|
||||
mcp,
|
||||
resolve_path=_resolve_path,
|
||||
before_write=_take_snapshot,
|
||||
before_write=None, # Git snapshot causes stdio deadlock on Windows; undo_changes limited
|
||||
project_root=PROJECT_ROOT,
|
||||
)
|
||||
|
||||
|
||||
@@ -114,6 +114,10 @@ lint.isort.section-order = [
|
||||
[tool.pytest.ini_options]
|
||||
testpaths = ["tests"]
|
||||
asyncio_mode = "auto"
|
||||
addopts = "-m 'not live'"
|
||||
markers = [
|
||||
"live: Tests that call real external APIs (require credentials, never run in CI)",
|
||||
]
|
||||
|
||||
[dependency-groups]
|
||||
dev = [
|
||||
|
||||
@@ -33,7 +33,6 @@ Usage:
|
||||
})
|
||||
|
||||
Credential categories:
|
||||
- llm.py: LLM provider credentials (anthropic, openai, etc.)
|
||||
- search.py: Search tool credentials (brave_search, google_search, etc.)
|
||||
- email.py: Email provider credentials (resend, google/gmail)
|
||||
- apollo.py: Apollo.io API credentials
|
||||
@@ -97,7 +96,6 @@ from .jira import JIRA_CREDENTIALS
|
||||
from .kafka import KAFKA_CREDENTIALS
|
||||
from .langfuse import LANGFUSE_CREDENTIALS
|
||||
from .linear import LINEAR_CREDENTIALS
|
||||
from .llm import LLM_CREDENTIALS
|
||||
from .lusha import LUSHA_CREDENTIALS
|
||||
from .microsoft_graph import MICROSOFT_GRAPH_CREDENTIALS
|
||||
from .mongodb import MONGODB_CREDENTIALS
|
||||
@@ -148,7 +146,6 @@ from .zoom import ZOOM_CREDENTIALS
|
||||
# Merged registry of all credentials
|
||||
CREDENTIAL_SPECS = {
|
||||
**AIRTABLE_CREDENTIALS,
|
||||
**LLM_CREDENTIALS,
|
||||
**NEWS_CREDENTIALS,
|
||||
**SEARCH_CREDENTIALS,
|
||||
**EMAIL_CREDENTIALS,
|
||||
@@ -248,7 +245,6 @@ __all__ = [
|
||||
"CREDENTIAL_SPECS",
|
||||
# Category registries (for direct access if needed)
|
||||
"AIRTABLE_CREDENTIALS",
|
||||
"LLM_CREDENTIALS",
|
||||
"NEWS_CREDENTIALS",
|
||||
"SEARCH_CREDENTIALS",
|
||||
"EMAIL_CREDENTIALS",
|
||||
|
||||
@@ -690,83 +690,6 @@ class CalendlyHealthChecker:
|
||||
)
|
||||
|
||||
|
||||
class AnthropicHealthChecker:
|
||||
"""Health checker for Anthropic API credentials."""
|
||||
|
||||
ENDPOINT = "https://api.anthropic.com/v1/messages"
|
||||
TIMEOUT = 10.0
|
||||
|
||||
def check(self, api_key: str) -> HealthCheckResult:
|
||||
"""
|
||||
Validate Anthropic API key without consuming tokens.
|
||||
|
||||
Sends a deliberately invalid request (empty messages) to the messages endpoint.
|
||||
A 401 means invalid key; 400 (bad request) means the key authenticated
|
||||
but the payload was rejected — confirming the key is valid without
|
||||
generating any tokens. 429 (rate limited) also indicates a valid key.
|
||||
"""
|
||||
try:
|
||||
with httpx.Client(timeout=self.TIMEOUT) as client:
|
||||
response = client.post(
|
||||
self.ENDPOINT,
|
||||
headers={
|
||||
"x-api-key": api_key,
|
||||
"anthropic-version": "2023-06-01",
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
# Empty messages triggers 400 (not 200), so no tokens are consumed.
|
||||
json={
|
||||
"model": "claude-sonnet-4-20250514",
|
||||
"max_tokens": 1,
|
||||
"messages": [],
|
||||
},
|
||||
)
|
||||
|
||||
if response.status_code == 200:
|
||||
return HealthCheckResult(
|
||||
valid=True,
|
||||
message="Anthropic API key valid",
|
||||
)
|
||||
elif response.status_code == 401:
|
||||
return HealthCheckResult(
|
||||
valid=False,
|
||||
message="Anthropic API key is invalid",
|
||||
details={"status_code": 401},
|
||||
)
|
||||
elif response.status_code == 429:
|
||||
# Rate limited but key is valid
|
||||
return HealthCheckResult(
|
||||
valid=True,
|
||||
message="Anthropic API key valid (rate limited)",
|
||||
details={"status_code": 429, "rate_limited": True},
|
||||
)
|
||||
elif response.status_code == 400:
|
||||
# Bad request but key authenticated - key is valid
|
||||
return HealthCheckResult(
|
||||
valid=True,
|
||||
message="Anthropic API key valid",
|
||||
details={"status_code": 400},
|
||||
)
|
||||
else:
|
||||
return HealthCheckResult(
|
||||
valid=False,
|
||||
message=f"Anthropic API returned status {response.status_code}",
|
||||
details={"status_code": response.status_code},
|
||||
)
|
||||
except httpx.TimeoutException:
|
||||
return HealthCheckResult(
|
||||
valid=False,
|
||||
message="Anthropic API request timed out",
|
||||
details={"error": "timeout"},
|
||||
)
|
||||
except httpx.RequestError as e:
|
||||
return HealthCheckResult(
|
||||
valid=False,
|
||||
message=f"Failed to connect to Anthropic API: {e}",
|
||||
details={"error": str(e)},
|
||||
)
|
||||
|
||||
|
||||
class GitHubHealthChecker:
|
||||
"""Health checker for GitHub Personal Access Token."""
|
||||
|
||||
@@ -1376,7 +1299,6 @@ class YouTubeHealthChecker(BaseHttpHealthChecker):
|
||||
|
||||
# Registry of health checkers
|
||||
HEALTH_CHECKERS: dict[str, CredentialHealthChecker] = {
|
||||
"anthropic": AnthropicHealthChecker(),
|
||||
"apify": ApifyHealthChecker(),
|
||||
"apollo": ApolloHealthChecker(),
|
||||
"asana": AsanaHealthChecker(),
|
||||
|
||||
@@ -1,44 +0,0 @@
|
||||
"""
|
||||
LLM provider credentials.
|
||||
|
||||
Contains credentials for language model providers like Anthropic, OpenAI, etc.
|
||||
"""
|
||||
|
||||
from .base import CredentialSpec
|
||||
|
||||
LLM_CREDENTIALS = {
|
||||
"anthropic": CredentialSpec(
|
||||
env_var="ANTHROPIC_API_KEY",
|
||||
tools=[],
|
||||
node_types=["event_loop"],
|
||||
required=False, # Not required - agents can use other providers via LiteLLM
|
||||
startup_required=False, # MCP server doesn't need LLM credentials
|
||||
help_url="https://console.anthropic.com/settings/keys",
|
||||
description="API key for Anthropic Claude models",
|
||||
# Auth method support
|
||||
direct_api_key_supported=True,
|
||||
api_key_instructions="""To get an Anthropic API key:
|
||||
1. Go to https://console.anthropic.com/settings/keys
|
||||
2. Sign in or create an Anthropic account
|
||||
3. Click "Create Key"
|
||||
4. Give your key a descriptive name (e.g., "Hive Agent")
|
||||
5. Copy the API key (starts with sk-ant-)
|
||||
6. Store it securely - you won't be able to see the full key again!""",
|
||||
# Health check configuration
|
||||
health_check_endpoint="https://api.anthropic.com/v1/messages",
|
||||
health_check_method="POST",
|
||||
# Credential store mapping
|
||||
credential_id="anthropic",
|
||||
credential_key="api_key",
|
||||
),
|
||||
# Future LLM providers:
|
||||
# "openai": CredentialSpec(
|
||||
# env_var="OPENAI_API_KEY",
|
||||
# tools=[],
|
||||
# node_types=["openai_generate"],
|
||||
# required=False,
|
||||
# startup_required=False,
|
||||
# help_url="https://platform.openai.com/api-keys",
|
||||
# description="API key for OpenAI models",
|
||||
# ),
|
||||
}
|
||||
@@ -1,9 +1,10 @@
|
||||
"""
|
||||
Shared file operation tools for MCP servers.
|
||||
|
||||
Provides 6 tools (read_file, write_file, edit_file, list_directory, search_files,
|
||||
run_command) plus supporting helpers. Used by both files_server.py (unsandboxed)
|
||||
and coder_tools_server.py (project-root sandboxed with git snapshots).
|
||||
Provides 7 tools (read_file, write_file, edit_file, hashline_edit,
|
||||
list_directory, search_files, run_command) plus supporting helpers.
|
||||
Used by both files_server.py (unsandboxed) and coder_tools_server.py
|
||||
(project-root sandboxed with git snapshots).
|
||||
|
||||
Usage:
|
||||
from aden_tools.file_ops import register_file_tools
|
||||
@@ -15,16 +16,31 @@ Usage:
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import contextlib
|
||||
import difflib
|
||||
import fnmatch
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
import tempfile
|
||||
from collections.abc import Callable
|
||||
from pathlib import Path
|
||||
|
||||
from fastmcp import FastMCP
|
||||
|
||||
from aden_tools.hashline import (
|
||||
HASHLINE_MAX_FILE_BYTES,
|
||||
compute_line_hash,
|
||||
format_hashlines,
|
||||
maybe_strip,
|
||||
parse_anchor,
|
||||
strip_boundary_echo,
|
||||
strip_content_prefixes,
|
||||
strip_insert_echo,
|
||||
validate_anchor,
|
||||
)
|
||||
|
||||
# ── Constants ─────────────────────────────────────────────────────────────
|
||||
|
||||
MAX_READ_LINES = 2000
|
||||
@@ -245,16 +261,21 @@ def register_file_tools(
|
||||
_resolve = resolve_path or _default_resolve_path
|
||||
|
||||
@mcp.tool()
|
||||
def read_file(path: str, offset: int = 1, limit: int = 0) -> str:
|
||||
def read_file(path: str, offset: int = 1, limit: int = 0, hashline: bool = False) -> str:
|
||||
"""Read file contents with line numbers and byte-budget truncation.
|
||||
|
||||
Binary files are detected and rejected. Large files are automatically
|
||||
truncated at 2000 lines or 50KB. Use offset and limit to paginate.
|
||||
|
||||
Set hashline=True to get N:hhhh|content format with content-hash
|
||||
anchors for use with hashline_edit. Line truncation is disabled in
|
||||
hashline mode to preserve hash integrity.
|
||||
|
||||
Args:
|
||||
path: Absolute file path to read.
|
||||
offset: Starting line number, 1-indexed (default: 1).
|
||||
limit: Max lines to return, 0 = up to 2000 (default: 0).
|
||||
hashline: If True, return N:hhhh|content anchors (default: False).
|
||||
"""
|
||||
resolved = _resolve(path)
|
||||
|
||||
@@ -276,8 +297,10 @@ def register_file_tools(
|
||||
|
||||
try:
|
||||
with open(resolved, encoding="utf-8", errors="replace") as f:
|
||||
all_lines = f.readlines()
|
||||
content = f.read()
|
||||
|
||||
# Use splitlines() for consistent line splitting with hashline module
|
||||
all_lines = content.splitlines()
|
||||
total_lines = len(all_lines)
|
||||
start_idx = max(0, offset - 1)
|
||||
effective_limit = limit if limit > 0 else MAX_READ_LINES
|
||||
@@ -287,10 +310,15 @@ def register_file_tools(
|
||||
byte_count = 0
|
||||
truncated_by_bytes = False
|
||||
for i in range(start_idx, end_idx):
|
||||
line = all_lines[i].rstrip("\n\r")
|
||||
if len(line) > MAX_LINE_LENGTH:
|
||||
line = line[:MAX_LINE_LENGTH] + "..."
|
||||
formatted = f"{i + 1:>6}\t{line}"
|
||||
line = all_lines[i]
|
||||
if hashline:
|
||||
# No line truncation in hashline mode (would corrupt hashes)
|
||||
h = compute_line_hash(line)
|
||||
formatted = f"{i + 1}:{h}|{line}"
|
||||
else:
|
||||
if len(line) > MAX_LINE_LENGTH:
|
||||
line = line[:MAX_LINE_LENGTH] + "..."
|
||||
formatted = f"{i + 1:>6}\t{line}"
|
||||
line_bytes = len(formatted.encode("utf-8")) + 1
|
||||
if byte_count + line_bytes > MAX_OUTPUT_BYTES:
|
||||
truncated_by_bytes = True
|
||||
@@ -323,19 +351,31 @@ def register_file_tools(
|
||||
content: Complete file content to write.
|
||||
"""
|
||||
resolved = _resolve(path)
|
||||
resolved_path = Path(resolved)
|
||||
|
||||
try:
|
||||
# Create parent dirs first (before git snapshot) so structure exists
|
||||
resolved_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
if before_write:
|
||||
before_write()
|
||||
try:
|
||||
before_write()
|
||||
except Exception:
|
||||
# Don't block the write if git snapshot fails. Do NOT log here —
|
||||
# logging writes to stderr and can deadlock the MCP stdio pipe.
|
||||
pass
|
||||
|
||||
existed = os.path.isfile(resolved)
|
||||
os.makedirs(os.path.dirname(resolved), exist_ok=True)
|
||||
with open(resolved, "w", encoding="utf-8") as f:
|
||||
f.write(content)
|
||||
existed = resolved_path.is_file()
|
||||
content_str = content if content is not None else ""
|
||||
with open(resolved_path, "w", encoding="utf-8") as f:
|
||||
f.write(content_str)
|
||||
f.flush()
|
||||
os.fsync(f.fileno())
|
||||
|
||||
line_count = content.count("\n") + (1 if content and not content.endswith("\n") else 0)
|
||||
line_count = content_str.count("\n") + (
|
||||
1 if content_str and not content_str.endswith("\n") else 0
|
||||
)
|
||||
action = "Updated" if existed else "Created"
|
||||
return f"{action} {path} ({len(content):,} bytes, {line_count} lines)"
|
||||
return f"{action} {path} ({len(content_str):,} bytes, {line_count} lines)"
|
||||
except Exception as e:
|
||||
return f"Error writing file: {e}"
|
||||
|
||||
@@ -472,15 +512,19 @@ def register_file_tools(
|
||||
return f"Error listing directory: {e}"
|
||||
|
||||
@mcp.tool()
|
||||
def search_files(pattern: str, path: str = ".", include: str = "") -> str:
|
||||
def search_files(
|
||||
pattern: str, path: str = ".", include: str = "", hashline: bool = False
|
||||
) -> str:
|
||||
"""Search file contents using regex. Uses ripgrep if available.
|
||||
|
||||
Results sorted by file with line numbers.
|
||||
Results sorted by file with line numbers. Set hashline=True to include
|
||||
content-hash anchors (N:hhhh) for use with hashline_edit.
|
||||
|
||||
Args:
|
||||
pattern: Regex pattern to search for.
|
||||
path: Absolute directory path to search (default: current directory).
|
||||
include: File glob filter (e.g. '*.py').
|
||||
hashline: If True, include hash anchors in results (default: False).
|
||||
"""
|
||||
resolved = _resolve(path)
|
||||
if not os.path.isdir(resolved):
|
||||
@@ -507,6 +551,7 @@ def register_file_tools(
|
||||
text=True,
|
||||
timeout=30,
|
||||
encoding="utf-8",
|
||||
stdin=subprocess.DEVNULL,
|
||||
)
|
||||
if rg_result.returncode <= 1:
|
||||
output = rg_result.stdout.strip()
|
||||
@@ -517,6 +562,30 @@ def register_file_tools(
|
||||
for line in output.split("\n")[:SEARCH_RESULT_LIMIT]:
|
||||
if project_root:
|
||||
line = line.replace(project_root + "/", "")
|
||||
if hashline:
|
||||
# Parse file:linenum:content and insert hash anchor
|
||||
parts = line.split(":", 2)
|
||||
if len(parts) >= 3:
|
||||
content = parts[2]
|
||||
h = compute_line_hash(content)
|
||||
line = f"{parts[0]}:{parts[1]}:{h}|{content}"
|
||||
else:
|
||||
# Platform-agnostic relativization: ripgrep may output
|
||||
# forward or backslash paths; normalize before relpath (Windows).
|
||||
match = re.match(r"^(.+):(\d+):", line)
|
||||
if match:
|
||||
path_part, line_num, rest = (
|
||||
match.group(1),
|
||||
match.group(2),
|
||||
line[match.end() :],
|
||||
)
|
||||
path_part = os.path.normpath(path_part.replace("/", os.sep))
|
||||
proj_norm = os.path.normpath(project_root.replace("/", os.sep))
|
||||
try:
|
||||
rel = os.path.relpath(path_part, proj_norm)
|
||||
line = f"{rel}:{line_num}:{rest}"
|
||||
except ValueError:
|
||||
pass
|
||||
if len(line) > MAX_LINE_LENGTH:
|
||||
line = line[:MAX_LINE_LENGTH] + "..."
|
||||
lines.append(line)
|
||||
@@ -544,14 +613,26 @@ def register_file_tools(
|
||||
if include and not fnmatch.fnmatch(fname, include):
|
||||
continue
|
||||
fpath = os.path.join(root, fname)
|
||||
display_path = os.path.relpath(fpath, project_root) if project_root else fpath
|
||||
if project_root:
|
||||
proj_norm = os.path.normpath(project_root.replace("/", os.sep))
|
||||
try:
|
||||
display_path = os.path.relpath(fpath, proj_norm)
|
||||
except ValueError:
|
||||
display_path = fpath
|
||||
else:
|
||||
display_path = fpath
|
||||
try:
|
||||
with open(fpath, encoding="utf-8", errors="ignore") as f:
|
||||
for i, line in enumerate(f, 1):
|
||||
if compiled.search(line):
|
||||
matches.append(
|
||||
f"{display_path}:{i}:{line.rstrip()[:MAX_LINE_LENGTH]}"
|
||||
)
|
||||
stripped = line.rstrip()
|
||||
if compiled.search(stripped):
|
||||
if hashline:
|
||||
h = compute_line_hash(stripped)
|
||||
matches.append(f"{display_path}:{i}:{h}|{stripped}")
|
||||
else:
|
||||
matches.append(
|
||||
f"{display_path}:{i}:{stripped[:MAX_LINE_LENGTH]}"
|
||||
)
|
||||
if len(matches) >= SEARCH_RESULT_LIMIT:
|
||||
return "\n".join(matches) + "\n... (truncated)"
|
||||
except (OSError, UnicodeDecodeError):
|
||||
@@ -560,3 +641,370 @@ def register_file_tools(
|
||||
return "\n".join(matches) if matches else "No matches found."
|
||||
except re.error as e:
|
||||
return f"Error: Invalid regex: {e}"
|
||||
|
||||
@mcp.tool()
|
||||
def hashline_edit(
|
||||
path: str,
|
||||
edits: str,
|
||||
auto_cleanup: bool = True,
|
||||
encoding: str = "utf-8",
|
||||
) -> str:
|
||||
"""Edit a file using anchor-based line references (N:hash) for precise edits.
|
||||
|
||||
After reading a file with read_file(hashline=True), use the anchors to make
|
||||
targeted edits without reproducing exact file content.
|
||||
|
||||
Anchors must match current file content (hash validation). All edits in a
|
||||
batch are validated before any are applied (atomic). Overlapping line ranges
|
||||
within a single call are rejected.
|
||||
|
||||
Args:
|
||||
path: Absolute file path to edit.
|
||||
edits: JSON string containing a list of edit operations. Each op is a
|
||||
dict with "op" key and operation-specific fields:
|
||||
- set_line: anchor, content (single line replacement)
|
||||
- replace_lines: start_anchor, end_anchor, content (multi-line)
|
||||
- insert_after: anchor, content
|
||||
- insert_before: anchor, content
|
||||
- replace: old_content, new_content, allow_multiple
|
||||
- append: content
|
||||
auto_cleanup: Strip hashline prefixes and echoed context from edit
|
||||
content (default: True).
|
||||
encoding: File encoding (default: "utf-8").
|
||||
"""
|
||||
# 1. Parse JSON
|
||||
try:
|
||||
edit_ops = json.loads(edits)
|
||||
except (json.JSONDecodeError, TypeError) as e:
|
||||
return f"Error: Invalid JSON in edits: {e}"
|
||||
|
||||
if not isinstance(edit_ops, list):
|
||||
return "Error: edits must be a JSON array of operations"
|
||||
if not edit_ops:
|
||||
return "Error: edits array is empty"
|
||||
if len(edit_ops) > 100:
|
||||
return "Error: Too many edits in one call (max 100). Split into multiple calls."
|
||||
|
||||
# 2. Read file
|
||||
resolved = _resolve(path)
|
||||
if not os.path.isfile(resolved):
|
||||
return f"Error: File not found: {path}"
|
||||
|
||||
try:
|
||||
with open(resolved, "rb") as f:
|
||||
raw_head = f.read(8192)
|
||||
eol = "\r\n" if b"\r\n" in raw_head else "\n"
|
||||
|
||||
with open(resolved, encoding=encoding) as f:
|
||||
content = f.read()
|
||||
except Exception as e:
|
||||
return f"Error: Failed to read file: {e}"
|
||||
|
||||
content_bytes = len(content.encode(encoding))
|
||||
if content_bytes > HASHLINE_MAX_FILE_BYTES:
|
||||
return f"Error: File too large for hashline_edit ({content_bytes} bytes, max 10MB)"
|
||||
|
||||
trailing_newline = content.endswith("\n")
|
||||
lines = content.splitlines()
|
||||
|
||||
# 3. Categorize and validate ops
|
||||
splices = [] # (start_0idx, end_0idx, new_lines, op_index)
|
||||
replaces = [] # (old_content, new_content, op_index, allow_multiple)
|
||||
cleanup_actions: list[str] = []
|
||||
|
||||
for i, op in enumerate(edit_ops):
|
||||
if not isinstance(op, dict):
|
||||
return f"Error: Edit #{i + 1}: operation must be a dict"
|
||||
|
||||
match op.get("op"):
|
||||
case "set_line":
|
||||
anchor = op.get("anchor", "")
|
||||
err = validate_anchor(anchor, lines)
|
||||
if err:
|
||||
return f"Error: Edit #{i + 1} (set_line): {err}"
|
||||
if "content" not in op:
|
||||
return f"Error: Edit #{i + 1} (set_line): missing required field 'content'"
|
||||
if not isinstance(op["content"], str):
|
||||
return f"Error: Edit #{i + 1} (set_line): content must be a string"
|
||||
if "\n" in op["content"] or "\r" in op["content"]:
|
||||
return (
|
||||
f"Error: Edit #{i + 1} (set_line): content must be a single line. "
|
||||
f"Use replace_lines for multi-line replacement."
|
||||
)
|
||||
line_num, _ = parse_anchor(anchor)
|
||||
idx = line_num - 1
|
||||
new_content = op["content"]
|
||||
new_lines = [new_content] if new_content else []
|
||||
new_lines = maybe_strip(
|
||||
new_lines,
|
||||
strip_content_prefixes,
|
||||
"prefix_strip",
|
||||
auto_cleanup,
|
||||
cleanup_actions,
|
||||
)
|
||||
splices.append((idx, idx, new_lines, i))
|
||||
|
||||
case "replace_lines":
|
||||
start_anchor = op.get("start_anchor", "")
|
||||
end_anchor = op.get("end_anchor", "")
|
||||
err = validate_anchor(start_anchor, lines)
|
||||
if err:
|
||||
return f"Error: Edit #{i + 1} (replace_lines start): {err}"
|
||||
err = validate_anchor(end_anchor, lines)
|
||||
if err:
|
||||
return f"Error: Edit #{i + 1} (replace_lines end): {err}"
|
||||
start_num, _ = parse_anchor(start_anchor)
|
||||
end_num, _ = parse_anchor(end_anchor)
|
||||
if start_num > end_num:
|
||||
return (
|
||||
f"Error: Edit #{i + 1} (replace_lines): "
|
||||
f"start line {start_num} > end line {end_num}"
|
||||
)
|
||||
if "content" not in op:
|
||||
return (
|
||||
f"Error: Edit #{i + 1} (replace_lines): "
|
||||
f"missing required field 'content'"
|
||||
)
|
||||
if not isinstance(op["content"], str):
|
||||
return f"Error: Edit #{i + 1} (replace_lines): content must be a string"
|
||||
new_content = op["content"]
|
||||
new_lines = new_content.splitlines() if new_content else []
|
||||
new_lines = maybe_strip(
|
||||
new_lines,
|
||||
strip_content_prefixes,
|
||||
"prefix_strip",
|
||||
auto_cleanup,
|
||||
cleanup_actions,
|
||||
)
|
||||
new_lines = maybe_strip(
|
||||
new_lines,
|
||||
lambda nl, s=start_num, e=end_num: strip_boundary_echo(lines, s, e, nl),
|
||||
"boundary_echo_strip",
|
||||
auto_cleanup,
|
||||
cleanup_actions,
|
||||
)
|
||||
splices.append((start_num - 1, end_num - 1, new_lines, i))
|
||||
|
||||
case "insert_after":
|
||||
anchor = op.get("anchor", "")
|
||||
err = validate_anchor(anchor, lines)
|
||||
if err:
|
||||
return f"Error: Edit #{i + 1} (insert_after): {err}"
|
||||
line_num, _ = parse_anchor(anchor)
|
||||
idx = line_num - 1
|
||||
new_content = op.get("content", "")
|
||||
if not isinstance(new_content, str):
|
||||
return f"Error: Edit #{i + 1} (insert_after): content must be a string"
|
||||
if not new_content:
|
||||
return f"Error: Edit #{i + 1} (insert_after): content is empty"
|
||||
new_lines = new_content.splitlines()
|
||||
new_lines = maybe_strip(
|
||||
new_lines,
|
||||
strip_content_prefixes,
|
||||
"prefix_strip",
|
||||
auto_cleanup,
|
||||
cleanup_actions,
|
||||
)
|
||||
new_lines = maybe_strip(
|
||||
new_lines,
|
||||
lambda nl, _idx=idx: strip_insert_echo(lines[_idx], nl),
|
||||
"insert_echo_strip",
|
||||
auto_cleanup,
|
||||
cleanup_actions,
|
||||
)
|
||||
splices.append((idx + 1, idx, new_lines, i))
|
||||
|
||||
case "insert_before":
|
||||
anchor = op.get("anchor", "")
|
||||
err = validate_anchor(anchor, lines)
|
||||
if err:
|
||||
return f"Error: Edit #{i + 1} (insert_before): {err}"
|
||||
line_num, _ = parse_anchor(anchor)
|
||||
idx = line_num - 1
|
||||
new_content = op.get("content", "")
|
||||
if not isinstance(new_content, str):
|
||||
return f"Error: Edit #{i + 1} (insert_before): content must be a string"
|
||||
if not new_content:
|
||||
return f"Error: Edit #{i + 1} (insert_before): content is empty"
|
||||
new_lines = new_content.splitlines()
|
||||
new_lines = maybe_strip(
|
||||
new_lines,
|
||||
strip_content_prefixes,
|
||||
"prefix_strip",
|
||||
auto_cleanup,
|
||||
cleanup_actions,
|
||||
)
|
||||
new_lines = maybe_strip(
|
||||
new_lines,
|
||||
lambda nl, _idx=idx: strip_insert_echo(lines[_idx], nl, position="last"),
|
||||
"insert_echo_strip",
|
||||
auto_cleanup,
|
||||
cleanup_actions,
|
||||
)
|
||||
splices.append((idx, idx - 1, new_lines, i))
|
||||
|
||||
case "replace":
|
||||
old_content = op.get("old_content")
|
||||
new_content = op.get("new_content")
|
||||
if old_content is None:
|
||||
return f"Error: Edit #{i + 1} (replace): missing old_content"
|
||||
if not isinstance(old_content, str):
|
||||
return f"Error: Edit #{i + 1} (replace): old_content must be a string"
|
||||
if not old_content:
|
||||
return f"Error: Edit #{i + 1} (replace): old_content must not be empty"
|
||||
if new_content is None:
|
||||
return f"Error: Edit #{i + 1} (replace): missing new_content"
|
||||
if not isinstance(new_content, str):
|
||||
return f"Error: Edit #{i + 1} (replace): new_content must be a string"
|
||||
allow_multiple = op.get("allow_multiple", False)
|
||||
if not isinstance(allow_multiple, bool):
|
||||
return f"Error: Edit #{i + 1} (replace): allow_multiple must be a boolean"
|
||||
replaces.append((old_content, new_content, i, allow_multiple))
|
||||
|
||||
case "append":
|
||||
new_content = op.get("content")
|
||||
if new_content is None:
|
||||
return f"Error: Edit #{i + 1} (append): missing content"
|
||||
if not isinstance(new_content, str):
|
||||
return f"Error: Edit #{i + 1} (append): content must be a string"
|
||||
if not new_content:
|
||||
return f"Error: Edit #{i + 1} (append): content must not be empty"
|
||||
new_lines = new_content.splitlines()
|
||||
new_lines = maybe_strip(
|
||||
new_lines,
|
||||
strip_content_prefixes,
|
||||
"prefix_strip",
|
||||
auto_cleanup,
|
||||
cleanup_actions,
|
||||
)
|
||||
insert_point = len(lines)
|
||||
splices.append((insert_point, insert_point - 1, new_lines, i))
|
||||
|
||||
case unknown:
|
||||
return f"Error: Edit #{i + 1}: unknown op '{unknown}'"
|
||||
|
||||
# 4. Check for overlapping splice ranges
|
||||
for j in range(len(splices)):
|
||||
for k in range(j + 1, len(splices)):
|
||||
s_a, e_a, _, idx_a = splices[j]
|
||||
s_b, e_b, _, idx_b = splices[k]
|
||||
is_insert_a = s_a > e_a
|
||||
is_insert_b = s_b > e_b
|
||||
|
||||
if is_insert_a and is_insert_b:
|
||||
continue
|
||||
if is_insert_a and not is_insert_b:
|
||||
if s_b <= s_a <= e_b + 1:
|
||||
return (
|
||||
f"Error: Overlapping edits: edit #{idx_a + 1} "
|
||||
f"and edit #{idx_b + 1} affect overlapping line ranges"
|
||||
)
|
||||
continue
|
||||
if is_insert_b and not is_insert_a:
|
||||
if s_a <= s_b <= e_a + 1:
|
||||
return (
|
||||
f"Error: Overlapping edits: edit #{idx_a + 1} "
|
||||
f"and edit #{idx_b + 1} affect overlapping line ranges"
|
||||
)
|
||||
continue
|
||||
if not (e_a < s_b or e_b < s_a):
|
||||
return (
|
||||
f"Error: Overlapping edits: edit #{idx_a + 1} "
|
||||
f"and edit #{idx_b + 1} affect overlapping line ranges"
|
||||
)
|
||||
|
||||
# 5. Apply splices bottom-up
|
||||
changes_made = 0
|
||||
working = list(lines)
|
||||
for start, end, new_lines, _ in sorted(splices, key=lambda s: (s[0], s[3]), reverse=True):
|
||||
if start > end:
|
||||
changes_made += 1
|
||||
for k, nl in enumerate(new_lines):
|
||||
working.insert(start + k, nl)
|
||||
else:
|
||||
old_slice = working[start : end + 1]
|
||||
if old_slice != new_lines:
|
||||
changes_made += 1
|
||||
working[start : end + 1] = new_lines
|
||||
|
||||
# 6. Apply str_replace ops
|
||||
joined = "\n".join(working)
|
||||
replace_counts = []
|
||||
for old_content, new_content, op_idx, allow_multiple in replaces:
|
||||
count = joined.count(old_content)
|
||||
if count == 0:
|
||||
return (
|
||||
f"Error: Edit #{op_idx + 1} (replace): "
|
||||
f"old_content not found "
|
||||
f"(note: anchor-based edits in this batch are applied first)"
|
||||
)
|
||||
if count > 1 and not allow_multiple:
|
||||
return (
|
||||
f"Error: Edit #{op_idx + 1} (replace): "
|
||||
f"old_content found {count} times (must be unique). "
|
||||
f"Include more surrounding context to make it unique, "
|
||||
f"or use anchor-based ops instead."
|
||||
)
|
||||
if allow_multiple:
|
||||
joined = joined.replace(old_content, new_content)
|
||||
replace_counts.append((op_idx, count))
|
||||
else:
|
||||
joined = joined.replace(old_content, new_content, 1)
|
||||
if count > 0 and old_content != new_content:
|
||||
changes_made += 1
|
||||
|
||||
# 7. Restore trailing newline
|
||||
if trailing_newline and joined and not joined.endswith("\n"):
|
||||
joined += "\n"
|
||||
|
||||
# 8. Restore original EOL style (only convert bare \n, not existing \r\n)
|
||||
if eol == "\r\n":
|
||||
joined = re.sub(r"(?<!\r)\n", "\r\n", joined)
|
||||
|
||||
# 9. Snapshot + atomic write
|
||||
try:
|
||||
if before_write:
|
||||
before_write()
|
||||
original_mode = os.stat(resolved).st_mode
|
||||
fd, tmp_path = tempfile.mkstemp(dir=os.path.dirname(resolved))
|
||||
fd_open = True
|
||||
try:
|
||||
if hasattr(os, "fchmod"):
|
||||
os.fchmod(fd, original_mode)
|
||||
with os.fdopen(fd, "w", encoding=encoding, newline="") as f:
|
||||
fd_open = False
|
||||
f.write(joined)
|
||||
os.replace(tmp_path, resolved)
|
||||
except BaseException:
|
||||
if fd_open:
|
||||
os.close(fd)
|
||||
with contextlib.suppress(OSError):
|
||||
os.unlink(tmp_path)
|
||||
raise
|
||||
except Exception as e:
|
||||
return f"Error: Failed to write file: {e}"
|
||||
|
||||
# 10. Build response
|
||||
updated_lines = joined.splitlines()
|
||||
total_lines = len(updated_lines)
|
||||
|
||||
# Limit returned content to first 200 lines
|
||||
preview_limit = 200
|
||||
hashline_content = format_hashlines(updated_lines, limit=preview_limit)
|
||||
|
||||
parts = [f"Applied {changes_made} edit(s) to {path}"]
|
||||
if changes_made == 0:
|
||||
parts.append("(content unchanged after applying edits)")
|
||||
if cleanup_actions:
|
||||
parts.append(f"Auto-cleanup: {', '.join(cleanup_actions)}")
|
||||
if replace_counts:
|
||||
for op_idx, count in replace_counts:
|
||||
parts.append(f"Edit #{op_idx + 1} replaced {count} occurrence(s)")
|
||||
parts.append("")
|
||||
parts.append(hashline_content)
|
||||
if total_lines > preview_limit:
|
||||
parts.append(
|
||||
f"\n(Showing first {preview_limit} of {total_lines} lines. "
|
||||
f"Use read_file with offset to see more.)"
|
||||
)
|
||||
return "\n".join(parts)
|
||||
|
||||
@@ -0,0 +1,230 @@
|
||||
"""Hashline utilities for anchor-based file editing.
|
||||
|
||||
Each line gets a short content hash anchor (line_number:hash). Models reference
|
||||
lines by anchor instead of reproducing text. If the file changed since the model
|
||||
read it, the hash won't match and the edit is cleanly rejected.
|
||||
"""
|
||||
|
||||
import re
|
||||
import zlib
|
||||
|
||||
# ── Constants ─────────────────────────────────────────────────────────────
|
||||
|
||||
# Files beyond this size are skipped/rejected in hashline mode because
|
||||
# hashline anchors are not practical on files this large (minified
|
||||
# bundles, logs, data dumps). Shared by view_file, grep_search, and
|
||||
# hashline_edit.
|
||||
HASHLINE_MAX_FILE_BYTES = 10 * 1024 * 1024 # 10 MB
|
||||
|
||||
# ── Hash computation ──────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def compute_line_hash(line: str) -> str:
|
||||
"""Compute a 4-char hex hash for a line of text.
|
||||
|
||||
Uses CRC32 mod 65536, formatted as lowercase hex. Only trailing spaces
|
||||
and tabs are stripped before hashing. Leading whitespace (indentation)
|
||||
is included in the hash so indentation changes invalidate anchors.
|
||||
This keeps stale-anchor detection safe for indentation-sensitive files
|
||||
while still ignoring common trailing-whitespace noise.
|
||||
|
||||
Collision probability is ~0.0015% per changed line (4-char hex,
|
||||
migrated from 2-char hex which had ~0.39% collision rate).
|
||||
"""
|
||||
stripped = line.rstrip(" \t")
|
||||
crc = zlib.crc32(stripped.encode("utf-8")) & 0xFFFFFFFF
|
||||
return f"{crc % 65536:04x}"
|
||||
|
||||
|
||||
def format_hashlines(lines: list[str], offset: int = 1, limit: int = 0) -> str:
|
||||
"""Format lines with N:hhhh|content prefixes.
|
||||
|
||||
Args:
|
||||
lines: The file content split into lines.
|
||||
offset: 1-indexed start line (default 1).
|
||||
limit: Maximum lines to return, 0 means all.
|
||||
|
||||
Returns:
|
||||
Formatted string with hashline prefixes.
|
||||
"""
|
||||
start = offset - 1 # convert to 0-indexed
|
||||
if limit > 0:
|
||||
selected = lines[start : start + limit]
|
||||
else:
|
||||
selected = lines[start:]
|
||||
|
||||
result_parts = []
|
||||
for i, line in enumerate(selected):
|
||||
line_num = offset + i
|
||||
h = compute_line_hash(line)
|
||||
result_parts.append(f"{line_num}:{h}|{line}")
|
||||
|
||||
return "\n".join(result_parts)
|
||||
|
||||
|
||||
# ── Anchor parsing & validation ───────────────────────────────────────────
|
||||
|
||||
|
||||
def parse_anchor(anchor: str) -> tuple[int, str]:
|
||||
"""Parse an anchor string like '2:a3b1' into (line_number, hash).
|
||||
|
||||
Raises:
|
||||
ValueError: If the anchor format is invalid.
|
||||
"""
|
||||
if ":" not in anchor:
|
||||
raise ValueError(f"Invalid anchor format (no colon): '{anchor}'")
|
||||
|
||||
parts = anchor.split(":", 1)
|
||||
try:
|
||||
line_num = int(parts[0])
|
||||
except ValueError as exc:
|
||||
raise ValueError(f"Invalid anchor format (line number not an integer): '{anchor}'") from exc
|
||||
|
||||
hash_str = parts[1]
|
||||
if len(hash_str) != 4:
|
||||
raise ValueError(f"Invalid anchor format (hash must be 4 chars): '{anchor}'")
|
||||
if not all(c in "0123456789abcdef" for c in hash_str):
|
||||
raise ValueError(f"Invalid anchor format (hash must be lowercase hex): '{anchor}'")
|
||||
|
||||
return line_num, hash_str
|
||||
|
||||
|
||||
def validate_anchor(anchor: str, lines: list[str]) -> str | None:
|
||||
"""Validate an anchor against file lines.
|
||||
|
||||
Returns:
|
||||
None if valid, error message string if invalid.
|
||||
"""
|
||||
try:
|
||||
line_num, expected_hash = parse_anchor(anchor)
|
||||
except ValueError as e:
|
||||
return str(e)
|
||||
|
||||
if line_num < 1 or line_num > len(lines):
|
||||
return f"Line {line_num} out of range (file has {len(lines)} lines)"
|
||||
|
||||
actual_line = lines[line_num - 1]
|
||||
actual_hash = compute_line_hash(actual_line)
|
||||
if actual_hash != expected_hash:
|
||||
preview = actual_line.strip()
|
||||
if len(preview) > 80:
|
||||
preview = preview[:77] + "..."
|
||||
return (
|
||||
f"Hash mismatch at line {line_num}: expected '{expected_hash}', "
|
||||
f"got '{actual_hash}'. Current content: {preview!r}. "
|
||||
f"Re-read the file to get current anchors."
|
||||
)
|
||||
|
||||
return None
|
||||
|
||||
|
||||
# ── Auto-cleanup helpers ──────────────────────────────────────────────────
|
||||
# Shared by both file_ops.hashline_edit and file_system_toolkits.hashline_edit.
|
||||
|
||||
HASHLINE_PREFIX_RE = re.compile(r"^\d+:[0-9a-f]{4}\|")
|
||||
|
||||
|
||||
def strip_content_prefixes(lines: list[str]) -> list[str]:
|
||||
"""Strip hashline prefixes from content lines when all have them.
|
||||
|
||||
LLMs frequently copy hashline-formatted text (e.g. '5:a3b1|content') into
|
||||
their content fields. Only strips when 2+ non-empty lines all match the
|
||||
exact hashline prefix pattern (N:hhhh|). Single-line content is left alone
|
||||
to avoid false positives on literal text that happens to match the pattern.
|
||||
"""
|
||||
if not lines:
|
||||
return lines
|
||||
non_empty = [ln for ln in lines if ln]
|
||||
if len(non_empty) < 2:
|
||||
return lines
|
||||
prefix_count = sum(1 for ln in non_empty if HASHLINE_PREFIX_RE.match(ln))
|
||||
if prefix_count < len(non_empty):
|
||||
return lines
|
||||
return [HASHLINE_PREFIX_RE.sub("", ln) for ln in lines]
|
||||
|
||||
|
||||
def whitespace_equal(a: str, b: str) -> bool:
|
||||
"""Compare strings ignoring spaces and tabs."""
|
||||
return a.replace(" ", "").replace("\t", "") == b.replace(" ", "").replace("\t", "")
|
||||
|
||||
|
||||
def strip_insert_echo(
|
||||
anchor_line: str, new_lines: list[str], *, position: str = "first"
|
||||
) -> list[str]:
|
||||
"""Strip echoed anchor line from insert content.
|
||||
|
||||
If the model echoes the anchor line in inserted content, remove it to
|
||||
avoid duplication. Only applies when content has 2+ lines and both the
|
||||
anchor and checked content line are non-blank.
|
||||
|
||||
position="first" (insert_after): check first line, strip from front.
|
||||
position="last" (insert_before): check last line, strip from end.
|
||||
"""
|
||||
if len(new_lines) <= 1:
|
||||
return new_lines
|
||||
if position == "last":
|
||||
if not anchor_line.strip() or not new_lines[-1].strip():
|
||||
return new_lines
|
||||
if whitespace_equal(new_lines[-1], anchor_line):
|
||||
return new_lines[:-1]
|
||||
else:
|
||||
if not anchor_line.strip() or not new_lines[0].strip():
|
||||
return new_lines
|
||||
if whitespace_equal(new_lines[0], anchor_line):
|
||||
return new_lines[1:]
|
||||
return new_lines
|
||||
|
||||
|
||||
def strip_boundary_echo(
|
||||
file_lines: list[str], start_1idx: int, end_1idx: int, new_lines: list[str]
|
||||
) -> list[str]:
|
||||
"""Strip echoed boundary context from replace_lines content.
|
||||
|
||||
If the model includes the line before AND after the replaced range as part
|
||||
of the replacement content, strip those echoed boundary lines. Both
|
||||
boundaries must echo simultaneously before either is stripped (a single
|
||||
boundary match is too likely to be a coincidence with real content).
|
||||
Only applies when the replacement has more lines than the range being
|
||||
replaced, and both the boundary line and content line are non-blank.
|
||||
"""
|
||||
range_count = end_1idx - start_1idx + 1
|
||||
if len(new_lines) <= 1 or len(new_lines) <= range_count:
|
||||
return new_lines
|
||||
|
||||
# Check if leading boundary echoes
|
||||
before_idx = start_1idx - 2 # 0-indexed line before range
|
||||
leading_echoes = (
|
||||
before_idx >= 0
|
||||
and new_lines[0].strip()
|
||||
and file_lines[before_idx].strip()
|
||||
and whitespace_equal(new_lines[0], file_lines[before_idx])
|
||||
)
|
||||
|
||||
# Check if trailing boundary echoes
|
||||
after_idx = end_1idx # 0-indexed line after range
|
||||
trailing_echoes = (
|
||||
after_idx < len(file_lines)
|
||||
and new_lines[-1].strip()
|
||||
and file_lines[after_idx].strip()
|
||||
and whitespace_equal(new_lines[-1], file_lines[after_idx])
|
||||
)
|
||||
|
||||
# Only strip if BOTH boundaries echo and there is content between them.
|
||||
# len < 3 means no real content between the two boundary lines, so
|
||||
# stripping would produce an empty list (accidental deletion).
|
||||
if not (leading_echoes and trailing_echoes) or len(new_lines) < 3:
|
||||
return new_lines
|
||||
|
||||
return new_lines[1:-1]
|
||||
|
||||
|
||||
def maybe_strip(new_lines, strip_fn, action_name, auto_cleanup, cleanup_actions):
|
||||
"""Apply a strip function if auto_cleanup is enabled, tracking actions."""
|
||||
if not auto_cleanup:
|
||||
return new_lines
|
||||
cleaned = strip_fn(new_lines)
|
||||
if cleaned != new_lines:
|
||||
if action_name not in cleanup_actions:
|
||||
cleanup_actions.append(action_name)
|
||||
return cleaned
|
||||
return new_lines
|
||||
@@ -56,6 +56,8 @@ from .email_tool import register_tools as register_email
|
||||
from .exa_search_tool import register_tools as register_exa_search
|
||||
from .example_tool import register_tools as register_example
|
||||
from .excel_tool import register_tools as register_excel
|
||||
|
||||
# File system toolkits
|
||||
from .file_system_toolkits.apply_diff import register_tools as register_apply_diff
|
||||
from .file_system_toolkits.apply_patch import register_tools as register_apply_patch
|
||||
from .file_system_toolkits.data_tools import register_tools as register_data_tools
|
||||
@@ -63,6 +65,7 @@ from .file_system_toolkits.execute_command_tool import (
|
||||
register_tools as register_execute_command,
|
||||
)
|
||||
from .file_system_toolkits.grep_search import register_tools as register_grep_search
|
||||
from .file_system_toolkits.hashline_edit import register_tools as register_hashline_edit
|
||||
from .file_system_toolkits.list_dir import register_tools as register_list_dir
|
||||
from .file_system_toolkits.replace_file_content import (
|
||||
register_tools as register_replace_file_content,
|
||||
@@ -162,6 +165,8 @@ def _register_verified(
|
||||
register_apply_diff(mcp)
|
||||
register_apply_patch(mcp)
|
||||
register_grep_search(mcp)
|
||||
# hashline_edit: anchor-based editing, pairs with view_file/grep_search hashline mode
|
||||
register_hashline_edit(mcp)
|
||||
register_execute_command(mcp)
|
||||
register_data_tools(mcp)
|
||||
register_csv(mcp)
|
||||
|
||||
@@ -36,12 +36,13 @@ grep_search(
|
||||
| `agent_id` | str | Yes | - | The ID of the agent |
|
||||
| `session_id` | str | Yes | - | The ID of the current session |
|
||||
| `recursive` | bool | No | False | Whether to search recursively in subdirectories |
|
||||
| `hashline` | bool | No | False | If True, include an `anchor` field (`N:hhhh`) in each match for use with `hashline_edit` |
|
||||
|
||||
## Returns
|
||||
|
||||
Returns a dictionary with the following structure:
|
||||
|
||||
**Success:**
|
||||
**Success (default mode):**
|
||||
```python
|
||||
{
|
||||
"success": True,
|
||||
@@ -64,6 +65,25 @@ Returns a dictionary with the following structure:
|
||||
}
|
||||
```
|
||||
|
||||
**Success (hashline mode):**
|
||||
```python
|
||||
{
|
||||
"success": True,
|
||||
"pattern": "def \\w+\\(",
|
||||
"path": "src",
|
||||
"recursive": True,
|
||||
"matches": [
|
||||
{
|
||||
"file": "src/main.py",
|
||||
"line_number": 10,
|
||||
"line_content": "def process_data(args):",
|
||||
"anchor": "10:a3f2"
|
||||
}
|
||||
],
|
||||
"total_matches": 1
|
||||
}
|
||||
```
|
||||
|
||||
**No matches:**
|
||||
```python
|
||||
{
|
||||
|
||||
@@ -3,6 +3,8 @@ import re
|
||||
|
||||
from mcp.server.fastmcp import FastMCP
|
||||
|
||||
from aden_tools.hashline import HASHLINE_MAX_FILE_BYTES, compute_line_hash
|
||||
|
||||
from ..security import WORKSPACES_DIR, get_secure_path
|
||||
|
||||
|
||||
@@ -17,12 +19,14 @@ def register_tools(mcp: FastMCP) -> None:
|
||||
agent_id: str,
|
||||
session_id: str,
|
||||
recursive: bool = False,
|
||||
hashline: bool = False,
|
||||
) -> dict:
|
||||
"""
|
||||
Search for a pattern in a file or directory within the session sandbox.
|
||||
|
||||
Use this when you need to find specific content or patterns in files using regex.
|
||||
Set recursive=True to search through all subdirectories.
|
||||
Set hashline=True to include anchor hashes in results for use with hashline_edit.
|
||||
|
||||
Args:
|
||||
path: The path to search in (file or directory, relative to session root)
|
||||
@@ -31,6 +35,7 @@ def register_tools(mcp: FastMCP) -> None:
|
||||
agent_id: The ID of the agent
|
||||
session_id: The ID of the current session
|
||||
recursive: Whether to search recursively in directories (default: False)
|
||||
hashline: If True, include anchor field (N:hhhh) in each match (default: False)
|
||||
|
||||
Returns:
|
||||
Dict with search results and match details, or error dict
|
||||
@@ -48,6 +53,7 @@ def register_tools(mcp: FastMCP) -> None:
|
||||
session_root = os.path.join(WORKSPACES_DIR, workspace_id, agent_id, session_id)
|
||||
|
||||
matches = []
|
||||
skipped_large_files = []
|
||||
|
||||
if os.path.isfile(secure_path):
|
||||
files = [secure_path]
|
||||
@@ -67,21 +73,46 @@ def register_tools(mcp: FastMCP) -> None:
|
||||
# Calculate relative path for display
|
||||
display_path = os.path.relpath(file_path, session_root)
|
||||
try:
|
||||
with open(file_path, encoding="utf-8") as f:
|
||||
for i, line in enumerate(f, 1):
|
||||
if regex.search(line):
|
||||
if hashline:
|
||||
# Use splitlines() for anchor consistency with
|
||||
# view_file/hashline_edit (handles Unicode line
|
||||
# separators like \u2028, \x85).
|
||||
# Skip files > 10MB to avoid excessive memory use.
|
||||
file_size = os.path.getsize(file_path)
|
||||
if file_size > HASHLINE_MAX_FILE_BYTES:
|
||||
skipped_large_files.append(display_path)
|
||||
continue
|
||||
with open(file_path, encoding="utf-8") as f:
|
||||
content = f.read()
|
||||
for i, line in enumerate(content.splitlines(), 1):
|
||||
if not regex.search(line):
|
||||
continue
|
||||
matches.append(
|
||||
{
|
||||
"file": display_path,
|
||||
"line_number": i,
|
||||
"line_content": line,
|
||||
"anchor": f"{i}:{compute_line_hash(line)}",
|
||||
}
|
||||
)
|
||||
else:
|
||||
with open(file_path, encoding="utf-8") as f:
|
||||
for i, line in enumerate(f, 1):
|
||||
bare = line.rstrip("\n\r")
|
||||
if not regex.search(bare):
|
||||
continue
|
||||
matches.append(
|
||||
{
|
||||
"file": display_path,
|
||||
"line_number": i,
|
||||
"line_content": line.strip(),
|
||||
"line_content": bare.strip(),
|
||||
}
|
||||
)
|
||||
except (UnicodeDecodeError, PermissionError):
|
||||
# Skips files that cannot be decoded or lack permissions
|
||||
continue
|
||||
|
||||
return {
|
||||
result = {
|
||||
"success": True,
|
||||
"pattern": pattern,
|
||||
"path": path,
|
||||
@@ -89,6 +120,9 @@ def register_tools(mcp: FastMCP) -> None:
|
||||
"matches": matches,
|
||||
"total_matches": len(matches),
|
||||
}
|
||||
if skipped_large_files:
|
||||
result["skipped_large_files"] = skipped_large_files
|
||||
return result
|
||||
|
||||
# 2. Specific Exception Handling (Issue #55 Requirements)
|
||||
except FileNotFoundError:
|
||||
|
||||
@@ -0,0 +1,19 @@
|
||||
"""Backward-compatible re-exports from aden_tools.hashline.
|
||||
|
||||
This module has been moved to aden_tools.hashline for shared use across
|
||||
both file_system_toolkits and file_ops (coder tools). All imports continue
|
||||
to work via this shim.
|
||||
"""
|
||||
|
||||
from aden_tools.hashline import ( # noqa: F401
|
||||
HASHLINE_PREFIX_RE,
|
||||
compute_line_hash,
|
||||
format_hashlines,
|
||||
maybe_strip,
|
||||
parse_anchor,
|
||||
strip_boundary_echo,
|
||||
strip_content_prefixes,
|
||||
strip_insert_echo,
|
||||
validate_anchor,
|
||||
whitespace_equal,
|
||||
)
|
||||
@@ -0,0 +1,216 @@
|
||||
# Hashline Edit Tool
|
||||
|
||||
Edit files using anchor-based line references for precise, hash-validated edits.
|
||||
|
||||
## Description
|
||||
|
||||
The `hashline_edit` tool enables file editing using short content-hash anchors (`N:hhhh`) instead of requiring exact text reproduction. Each line's anchor includes a 4-character hash of its content. If the file has changed since the model last read it, the hash won't match and the edit is cleanly rejected.
|
||||
|
||||
Use this tool together with `view_file(hashline=True)` and `grep_search(hashline=True)`, which return anchors for each line.
|
||||
|
||||
## Use Cases
|
||||
|
||||
- Making targeted edits after reading a file with `view_file(hashline=True)`
|
||||
- Replacing single lines, line ranges, or inserting new lines by anchor
|
||||
- Batch editing multiple locations in a single atomic call
|
||||
- Falling back to string replacement when anchors are not available
|
||||
|
||||
## Usage
|
||||
|
||||
```python
|
||||
import json
|
||||
|
||||
# First, read the file with hashline mode to get anchors
|
||||
content = view_file(path="app.py", hashline=True, workspace_id="ws-1", agent_id="a-1", session_id="s-1")
|
||||
# Returns lines like: 1:a3b1|def main(): 2:f1c2| print("hello") ...
|
||||
|
||||
# Then edit using the anchors
|
||||
hashline_edit(
|
||||
path="app.py",
|
||||
edits=json.dumps([
|
||||
{"op": "set_line", "anchor": "2:f1c2", "content": ' print("goodbye")'}
|
||||
]),
|
||||
workspace_id="ws-1",
|
||||
agent_id="a-1",
|
||||
session_id="s-1"
|
||||
)
|
||||
```
|
||||
|
||||
## Arguments
|
||||
|
||||
| Argument | Type | Required | Default | Description |
|
||||
|----------|------|----------|---------|-------------|
|
||||
| `path` | str | Yes | - | The path to the file (relative to session root) |
|
||||
| `edits` | str | Yes | - | JSON string containing a list of edit operations (see Operations below) |
|
||||
| `workspace_id` | str | Yes | - | The ID of the workspace |
|
||||
| `agent_id` | str | Yes | - | The ID of the agent |
|
||||
| `session_id` | str | Yes | - | The ID of the current session |
|
||||
| `auto_cleanup` | bool | No | `True` | Strip hashline prefixes and echoed context from content. Set to `False` to write content exactly as provided. |
|
||||
| `encoding` | str | No | `"utf-8"` | File encoding. Must match the file's actual encoding. |
|
||||
|
||||
## Operations
|
||||
|
||||
The `edits` parameter is a JSON array of operation objects. Each object must have an `"op"` field:
|
||||
|
||||
| Op | Fields | Behavior |
|
||||
|---|---|---|
|
||||
| `set_line` | `anchor`, `content` | Replace one line identified by anchor (use `content: ""` to delete the line) |
|
||||
| `replace_lines` | `start_anchor`, `end_anchor`, `content` | Replace a range of lines (can expand or shrink) |
|
||||
| `insert_after` | `anchor`, `content` | Insert new lines after the anchor line |
|
||||
| `insert_before` | `anchor`, `content` | Insert new lines before the anchor line |
|
||||
| `replace` | `old_content`, `new_content`, `allow_multiple` (optional) | Fallback string replacement; errors if 0 or 2+ matches (unless `allow_multiple: true`) |
|
||||
| `append` | `content` | Append new lines to end of file (works for empty files too) |
|
||||
|
||||
## Returns
|
||||
|
||||
**Success:**
|
||||
```python
|
||||
{
|
||||
"success": True,
|
||||
"path": "app.py",
|
||||
"edits_applied": 2,
|
||||
"content": "1:b2c4|def main():\n2:c4a1| print(\"goodbye\")\n..."
|
||||
}
|
||||
```
|
||||
|
||||
**Success (noop, content unchanged after applying edits):**
|
||||
```python
|
||||
{
|
||||
"success": True,
|
||||
"path": "app.py",
|
||||
"edits_applied": 0,
|
||||
"note": "Content unchanged after applying edits",
|
||||
"content": "1:b2c4|def main():\n..."
|
||||
}
|
||||
```
|
||||
|
||||
**Success (with auto-cleanup applied):**
|
||||
```python
|
||||
{
|
||||
"success": True,
|
||||
"path": "app.py",
|
||||
"edits_applied": 1,
|
||||
"content": "...",
|
||||
"cleanup_applied": ["prefix_strip"]
|
||||
}
|
||||
```
|
||||
|
||||
The `cleanup_applied` field is only present when cleanup actually modified content. Possible values: `prefix_strip`, `boundary_echo_strip`, `insert_echo_strip`.
|
||||
|
||||
**Success (replace with allow_multiple):**
|
||||
```python
|
||||
{
|
||||
"success": True,
|
||||
"path": "app.py",
|
||||
"edits_applied": 1,
|
||||
"content": "...",
|
||||
"replacements": {"edit_1": 3}
|
||||
}
|
||||
```
|
||||
|
||||
The `replacements` field is only present when `allow_multiple: true` was used, showing the count per replace op.
|
||||
|
||||
**Error:**
|
||||
```python
|
||||
{
|
||||
"error": "Edit #1 (set_line): Hash mismatch at line 2: expected 'f1c2', got 'a3b1'. Re-read the file to get current anchors."
|
||||
}
|
||||
```
|
||||
|
||||
## Error Handling
|
||||
|
||||
- Returns an error if the file doesn't exist
|
||||
- Returns an error if any anchor hash doesn't match (stale read)
|
||||
- Returns an error if a line number is out of range
|
||||
- Returns an error if splice ranges overlap within a batch
|
||||
- Returns an error if a `replace` op matches 0 or 2+ times (unless `allow_multiple: true`)
|
||||
- Returns an error for unknown op types or invalid JSON
|
||||
- All edits are validated before any writes occur (atomic): on any error the file is unchanged
|
||||
|
||||
## Examples
|
||||
|
||||
### Replacing a single line
|
||||
```python
|
||||
edits = json.dumps([
|
||||
{"op": "set_line", "anchor": "5:a3b1", "content": " return result"}
|
||||
])
|
||||
result = hashline_edit(path="app.py", edits=edits, workspace_id="ws-1", agent_id="a-1", session_id="s-1")
|
||||
# Returns: {"success": True, "path": "app.py", "edits_applied": 1, "content": "..."}
|
||||
```
|
||||
|
||||
### Replacing a range of lines
|
||||
```python
|
||||
edits = json.dumps([{
|
||||
"op": "replace_lines",
|
||||
"start_anchor": "10:b1c2",
|
||||
"end_anchor": "15:c2d3",
|
||||
"content": " # simplified\n return x + y"
|
||||
}])
|
||||
result = hashline_edit(path="math.py", edits=edits, workspace_id="ws-1", agent_id="a-1", session_id="s-1")
|
||||
```
|
||||
|
||||
### Inserting new lines after
|
||||
```python
|
||||
edits = json.dumps([
|
||||
{"op": "insert_after", "anchor": "3:d4e5", "content": "import os\nimport sys"}
|
||||
])
|
||||
result = hashline_edit(path="app.py", edits=edits, workspace_id="ws-1", agent_id="a-1", session_id="s-1")
|
||||
```
|
||||
|
||||
### Inserting new lines before
|
||||
```python
|
||||
edits = json.dumps([
|
||||
{"op": "insert_before", "anchor": "1:a1b2", "content": "#!/usr/bin/env python3"}
|
||||
])
|
||||
result = hashline_edit(path="app.py", edits=edits, workspace_id="ws-1", agent_id="a-1", session_id="s-1")
|
||||
```
|
||||
|
||||
### Batch editing
|
||||
```python
|
||||
edits = json.dumps([
|
||||
{"op": "set_line", "anchor": "1:a1b2", "content": "#!/usr/bin/env python3"},
|
||||
{"op": "insert_after", "anchor": "2:b2c3", "content": "import logging"},
|
||||
{"op": "set_line", "anchor": "10:c3d4", "content": " logging.info('done')"},
|
||||
])
|
||||
result = hashline_edit(path="app.py", edits=edits, workspace_id="ws-1", agent_id="a-1", session_id="s-1")
|
||||
```
|
||||
|
||||
### Replace all occurrences
|
||||
```python
|
||||
edits = json.dumps([
|
||||
{"op": "replace", "old_content": "old_name", "new_content": "new_name", "allow_multiple": True}
|
||||
])
|
||||
result = hashline_edit(path="app.py", edits=edits, workspace_id="ws-1", agent_id="a-1", session_id="s-1")
|
||||
# Returns: {..., "replacements": {"edit_1": 5}}
|
||||
```
|
||||
|
||||
## Notes
|
||||
|
||||
- Anchors are generated by `view_file(hashline=True)` and `grep_search(hashline=True)`
|
||||
- The hash is a CRC32-based 4-char hex digest of the line content (with trailing spaces and tabs stripped; leading whitespace is included so indentation changes invalidate anchors). Collision probability is ~0.0015% per changed line.
|
||||
- All anchor-based ops are validated before any writes occur; if any op fails validation, the file is left unchanged
|
||||
- String `replace` ops are applied after all anchor-based splices, so they match against post-splice content
|
||||
- Original line endings (LF or CRLF) are preserved
|
||||
- The response includes the updated file content in hashline format, so subsequent edits can use the new anchors without re-reading
|
||||
|
||||
## Auto-Cleanup Details
|
||||
|
||||
When `auto_cleanup=True` (the default), the tool strips hashline prefixes and echoed context that LLMs frequently include in edit content. Prefix stripping uses a **2+ non-empty line threshold** to avoid false positives. The prefix regex matches the `N:hhhh|` pattern (4-char hex hash).
|
||||
|
||||
**Why the threshold matters:** Single-line content matching the `N:hhhh|` pattern is ambiguous. It could be literal content (CSV data, config values, log format strings) that happens to match the pattern. With 2+ lines all matching, the probability of a false positive drops dramatically.
|
||||
|
||||
**Single-line example (NOT stripped):**
|
||||
```python
|
||||
# set_line with content "5:a3b1|hello" writes literally "5:a3b1|hello"
|
||||
{"op": "set_line", "anchor": "2:f1c2", "content": "5:a3b1|hello"}
|
||||
```
|
||||
|
||||
**Multi-line example (stripped):**
|
||||
```python
|
||||
# replace_lines where all lines match N:hhhh| pattern gets stripped
|
||||
{"op": "replace_lines", "start_anchor": "2:f1c2", "end_anchor": "3:b2d3",
|
||||
"content": "2:a3b1|BBB\n3:c4d2|CCC"}
|
||||
# Writes "BBB\nCCC" (prefixes removed)
|
||||
```
|
||||
|
||||
**Escape hatch:** Set `auto_cleanup=False` to write content exactly as provided, bypassing all cleanup heuristics.
|
||||
@@ -0,0 +1,3 @@
|
||||
from .hashline_edit import register_tools
|
||||
|
||||
__all__ = ["register_tools"]
|
||||
@@ -0,0 +1,420 @@
|
||||
import contextlib
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import tempfile
|
||||
|
||||
from mcp.server.fastmcp import FastMCP
|
||||
|
||||
from aden_tools.hashline import (
|
||||
HASHLINE_MAX_FILE_BYTES,
|
||||
format_hashlines,
|
||||
maybe_strip,
|
||||
parse_anchor,
|
||||
strip_boundary_echo,
|
||||
strip_content_prefixes,
|
||||
strip_insert_echo,
|
||||
validate_anchor,
|
||||
)
|
||||
|
||||
from ..security import get_secure_path
|
||||
|
||||
|
||||
def register_tools(mcp: FastMCP) -> None:
|
||||
"""Register hashline edit tools with the MCP server."""
|
||||
|
||||
@mcp.tool()
|
||||
def hashline_edit(
|
||||
path: str,
|
||||
edits: str,
|
||||
workspace_id: str,
|
||||
agent_id: str,
|
||||
session_id: str,
|
||||
auto_cleanup: bool = True,
|
||||
encoding: str = "utf-8",
|
||||
) -> dict:
|
||||
"""
|
||||
Purpose
|
||||
Edit a file using anchor-based line references (N:hash) for precise edits.
|
||||
|
||||
When to use
|
||||
After reading a file with view_file(hashline=True), use the anchors to make
|
||||
targeted edits without reproducing exact file content.
|
||||
|
||||
Rules & Constraints
|
||||
Anchors must match the current file content (hash validation).
|
||||
All edits in a batch are validated before any are applied (atomic).
|
||||
Overlapping line ranges within a single call are rejected.
|
||||
|
||||
Args:
|
||||
path: The path to the file (relative to session root)
|
||||
edits: JSON string containing a list of edit operations.
|
||||
Each op is a dict with:
|
||||
- set_line: anchor, content
|
||||
- replace_lines: start_anchor, end_anchor, content
|
||||
- insert_after: anchor, content
|
||||
- insert_before: anchor, content
|
||||
- replace: old_content, new_content, allow_multiple
|
||||
- append: content
|
||||
workspace_id: The ID of workspace
|
||||
agent_id: The ID of agent
|
||||
session_id: The ID of the current session
|
||||
auto_cleanup: If True (default), automatically strip hashline prefixes and
|
||||
echoed context from edit content. Set to False to write content exactly
|
||||
as provided.
|
||||
encoding: File encoding (default "utf-8"). Must match the file's actual encoding.
|
||||
|
||||
Returns:
|
||||
Dict with success status, updated hashline content, and edit count, or error dict
|
||||
"""
|
||||
# 1. Parse JSON
|
||||
try:
|
||||
edit_ops = json.loads(edits)
|
||||
except (json.JSONDecodeError, TypeError) as e:
|
||||
return {"error": f"Invalid JSON in edits: {e}"}
|
||||
|
||||
if not isinstance(edit_ops, list):
|
||||
return {"error": "edits must be a JSON array of operations"}
|
||||
|
||||
if not edit_ops:
|
||||
return {"error": "edits array is empty"}
|
||||
|
||||
if len(edit_ops) > 100:
|
||||
return {"error": "Too many edits in one call (max 100). Split into multiple calls."}
|
||||
|
||||
# 2. Read file
|
||||
try:
|
||||
secure_path = get_secure_path(path, workspace_id, agent_id, session_id)
|
||||
if not os.path.exists(secure_path):
|
||||
return {"error": f"File not found at {path}"}
|
||||
if not os.path.isfile(secure_path):
|
||||
return {"error": f"Path is not a file: {path}"}
|
||||
|
||||
with open(secure_path, "rb") as f:
|
||||
raw_head = f.read(8192)
|
||||
eol = "\r\n" if b"\r\n" in raw_head else "\n"
|
||||
|
||||
with open(secure_path, encoding=encoding) as f:
|
||||
content = f.read()
|
||||
except Exception as e:
|
||||
return {"error": f"Failed to read file: {e}"}
|
||||
|
||||
content_bytes = len(content.encode(encoding))
|
||||
if content_bytes > HASHLINE_MAX_FILE_BYTES:
|
||||
return {"error": f"File too large for hashline_edit ({content_bytes} bytes, max 10MB)"}
|
||||
|
||||
trailing_newline = content.endswith("\n")
|
||||
lines = content.splitlines()
|
||||
|
||||
# 3. Categorize and validate ops
|
||||
splices = [] # (start_0idx, end_0idx, new_lines, op_index)
|
||||
replaces = [] # (old_content, new_content, op_index, allow_multiple)
|
||||
cleanup_actions = []
|
||||
|
||||
for i, op in enumerate(edit_ops):
|
||||
if not isinstance(op, dict):
|
||||
return {"error": f"Edit #{i + 1}: operation must be a dict"}
|
||||
|
||||
match op.get("op"):
|
||||
case "set_line":
|
||||
anchor = op.get("anchor", "")
|
||||
err = validate_anchor(anchor, lines)
|
||||
if err:
|
||||
return {"error": f"Edit #{i + 1} (set_line): {err}"}
|
||||
if "content" not in op:
|
||||
return {
|
||||
"error": f"Edit #{i + 1} (set_line): missing required field 'content'"
|
||||
}
|
||||
if not isinstance(op["content"], str):
|
||||
return {"error": f"Edit #{i + 1} (set_line): content must be a string"}
|
||||
if "\n" in op["content"] or "\r" in op["content"]:
|
||||
return {
|
||||
"error": f"Edit #{i + 1} (set_line): content must be a single line. "
|
||||
f"Use replace_lines for multi-line replacement."
|
||||
}
|
||||
line_num, _ = parse_anchor(anchor)
|
||||
idx = line_num - 1
|
||||
new_content = op["content"]
|
||||
new_lines = [new_content] if new_content else []
|
||||
new_lines = maybe_strip(
|
||||
new_lines,
|
||||
strip_content_prefixes,
|
||||
"prefix_strip",
|
||||
auto_cleanup,
|
||||
cleanup_actions,
|
||||
)
|
||||
splices.append((idx, idx, new_lines, i))
|
||||
|
||||
case "replace_lines":
|
||||
start_anchor = op.get("start_anchor", "")
|
||||
end_anchor = op.get("end_anchor", "")
|
||||
err = validate_anchor(start_anchor, lines)
|
||||
if err:
|
||||
return {"error": f"Edit #{i + 1} (replace_lines start): {err}"}
|
||||
err = validate_anchor(end_anchor, lines)
|
||||
if err:
|
||||
return {"error": f"Edit #{i + 1} (replace_lines end): {err}"}
|
||||
start_num, _ = parse_anchor(start_anchor)
|
||||
end_num, _ = parse_anchor(end_anchor)
|
||||
if start_num > end_num:
|
||||
return {
|
||||
"error": f"Edit #{i + 1} (replace_lines): "
|
||||
f"start line {start_num} > end line {end_num}"
|
||||
}
|
||||
if "content" not in op:
|
||||
return {
|
||||
"error": (
|
||||
f"Edit #{i + 1} (replace_lines): missing required field 'content'"
|
||||
)
|
||||
}
|
||||
if not isinstance(op["content"], str):
|
||||
return {"error": f"Edit #{i + 1} (replace_lines): content must be a string"}
|
||||
new_content = op["content"]
|
||||
new_lines = new_content.splitlines() if new_content else []
|
||||
new_lines = maybe_strip(
|
||||
new_lines,
|
||||
strip_content_prefixes,
|
||||
"prefix_strip",
|
||||
auto_cleanup,
|
||||
cleanup_actions,
|
||||
)
|
||||
new_lines = maybe_strip(
|
||||
new_lines,
|
||||
lambda nl, s=start_num, e=end_num: strip_boundary_echo(lines, s, e, nl),
|
||||
"boundary_echo_strip",
|
||||
auto_cleanup,
|
||||
cleanup_actions,
|
||||
)
|
||||
splices.append((start_num - 1, end_num - 1, new_lines, i))
|
||||
|
||||
case "insert_after":
|
||||
anchor = op.get("anchor", "")
|
||||
err = validate_anchor(anchor, lines)
|
||||
if err:
|
||||
return {"error": f"Edit #{i + 1} (insert_after): {err}"}
|
||||
line_num, _ = parse_anchor(anchor)
|
||||
idx = line_num - 1
|
||||
new_content = op.get("content", "")
|
||||
if not isinstance(new_content, str):
|
||||
return {"error": f"Edit #{i + 1} (insert_after): content must be a string"}
|
||||
if not new_content:
|
||||
return {"error": f"Edit #{i + 1} (insert_after): content is empty"}
|
||||
new_lines = new_content.splitlines()
|
||||
new_lines = maybe_strip(
|
||||
new_lines,
|
||||
strip_content_prefixes,
|
||||
"prefix_strip",
|
||||
auto_cleanup,
|
||||
cleanup_actions,
|
||||
)
|
||||
new_lines = maybe_strip(
|
||||
new_lines,
|
||||
lambda nl, _idx=idx: strip_insert_echo(lines[_idx], nl),
|
||||
"insert_echo_strip",
|
||||
auto_cleanup,
|
||||
cleanup_actions,
|
||||
)
|
||||
splices.append((idx + 1, idx, new_lines, i))
|
||||
|
||||
case "insert_before":
|
||||
anchor = op.get("anchor", "")
|
||||
err = validate_anchor(anchor, lines)
|
||||
if err:
|
||||
return {"error": f"Edit #{i + 1} (insert_before): {err}"}
|
||||
line_num, _ = parse_anchor(anchor)
|
||||
idx = line_num - 1
|
||||
new_content = op.get("content", "")
|
||||
if not isinstance(new_content, str):
|
||||
return {"error": f"Edit #{i + 1} (insert_before): content must be a string"}
|
||||
if not new_content:
|
||||
return {"error": f"Edit #{i + 1} (insert_before): content is empty"}
|
||||
new_lines = new_content.splitlines()
|
||||
new_lines = maybe_strip(
|
||||
new_lines,
|
||||
strip_content_prefixes,
|
||||
"prefix_strip",
|
||||
auto_cleanup,
|
||||
cleanup_actions,
|
||||
)
|
||||
new_lines = maybe_strip(
|
||||
new_lines,
|
||||
lambda nl, _idx=idx: strip_insert_echo(lines[_idx], nl, position="last"),
|
||||
"insert_echo_strip",
|
||||
auto_cleanup,
|
||||
cleanup_actions,
|
||||
)
|
||||
splices.append((idx, idx - 1, new_lines, i))
|
||||
|
||||
case "replace":
|
||||
old_content = op.get("old_content")
|
||||
new_content = op.get("new_content")
|
||||
if old_content is None:
|
||||
return {"error": f"Edit #{i + 1} (replace): missing old_content"}
|
||||
if not isinstance(old_content, str):
|
||||
return {"error": f"Edit #{i + 1} (replace): old_content must be a string"}
|
||||
if not old_content:
|
||||
return {"error": f"Edit #{i + 1} (replace): old_content must not be empty"}
|
||||
if new_content is None:
|
||||
return {"error": f"Edit #{i + 1} (replace): missing new_content"}
|
||||
if not isinstance(new_content, str):
|
||||
return {"error": f"Edit #{i + 1} (replace): new_content must be a string"}
|
||||
allow_multiple = op.get("allow_multiple", False)
|
||||
if not isinstance(allow_multiple, bool):
|
||||
return {
|
||||
"error": f"Edit #{i + 1} (replace): allow_multiple must be a boolean"
|
||||
}
|
||||
replaces.append((old_content, new_content, i, allow_multiple))
|
||||
|
||||
case "append":
|
||||
new_content = op.get("content")
|
||||
if new_content is None:
|
||||
return {"error": f"Edit #{i + 1} (append): missing content"}
|
||||
if not isinstance(new_content, str):
|
||||
return {"error": f"Edit #{i + 1} (append): content must be a string"}
|
||||
if not new_content:
|
||||
return {"error": f"Edit #{i + 1} (append): content must not be empty"}
|
||||
new_lines = new_content.splitlines()
|
||||
new_lines = maybe_strip(
|
||||
new_lines,
|
||||
strip_content_prefixes,
|
||||
"prefix_strip",
|
||||
auto_cleanup,
|
||||
cleanup_actions,
|
||||
)
|
||||
insert_point = len(lines)
|
||||
splices.append((insert_point, insert_point - 1, new_lines, i))
|
||||
|
||||
case unknown:
|
||||
return {"error": f"Edit #{i + 1}: unknown op '{unknown}'"}
|
||||
|
||||
# 4. Check for overlapping splice ranges
|
||||
for j in range(len(splices)):
|
||||
for k in range(j + 1, len(splices)):
|
||||
s_a, e_a, _, idx_a = splices[j]
|
||||
s_b, e_b, _, idx_b = splices[k]
|
||||
is_insert_a = s_a > e_a
|
||||
is_insert_b = s_b > e_b
|
||||
|
||||
if is_insert_a and is_insert_b:
|
||||
continue
|
||||
|
||||
if is_insert_a and not is_insert_b:
|
||||
if s_b <= s_a <= e_b + 1:
|
||||
return {
|
||||
"error": (
|
||||
f"Overlapping edits: edit #{idx_a + 1} "
|
||||
f"and edit #{idx_b + 1} affect overlapping line ranges"
|
||||
)
|
||||
}
|
||||
continue
|
||||
|
||||
if is_insert_b and not is_insert_a:
|
||||
if s_a <= s_b <= e_a + 1:
|
||||
return {
|
||||
"error": (
|
||||
f"Overlapping edits: edit #{idx_a + 1} "
|
||||
f"and edit #{idx_b + 1} affect overlapping line ranges"
|
||||
)
|
||||
}
|
||||
continue
|
||||
|
||||
if not (e_a < s_b or e_b < s_a):
|
||||
return {
|
||||
"error": (
|
||||
f"Overlapping edits: edit #{idx_a + 1} "
|
||||
f"and edit #{idx_b + 1} affect overlapping line ranges"
|
||||
)
|
||||
}
|
||||
|
||||
# 5. Apply splices bottom-up
|
||||
changes_made = 0
|
||||
working = list(lines)
|
||||
for start, end, new_lines, _ in sorted(splices, key=lambda s: (s[0], s[3]), reverse=True):
|
||||
if start > end:
|
||||
changes_made += 1
|
||||
for k, nl in enumerate(new_lines):
|
||||
working.insert(start + k, nl)
|
||||
else:
|
||||
old_slice = working[start : end + 1]
|
||||
if old_slice != new_lines:
|
||||
changes_made += 1
|
||||
working[start : end + 1] = new_lines
|
||||
|
||||
# 6. Apply str_replace ops
|
||||
joined = "\n".join(working)
|
||||
replace_counts = []
|
||||
for old_content, new_content, op_idx, allow_multiple in replaces:
|
||||
count = joined.count(old_content)
|
||||
if count == 0:
|
||||
return {
|
||||
"error": (
|
||||
f"Edit #{op_idx + 1} (replace): "
|
||||
f"old_content not found "
|
||||
f"(note: anchor-based edits in this batch are applied first)"
|
||||
)
|
||||
}
|
||||
if count > 1 and not allow_multiple:
|
||||
return {
|
||||
"error": (
|
||||
f"Edit #{op_idx + 1} (replace): "
|
||||
f"old_content found {count} times (must be unique). "
|
||||
f"Include more surrounding context to make it unique, "
|
||||
f"or use anchor-based ops instead."
|
||||
)
|
||||
}
|
||||
if allow_multiple:
|
||||
joined = joined.replace(old_content, new_content)
|
||||
replace_counts.append((op_idx, count))
|
||||
else:
|
||||
joined = joined.replace(old_content, new_content, 1)
|
||||
if count > 0 and old_content != new_content:
|
||||
changes_made += 1
|
||||
|
||||
# 7. Restore trailing newline
|
||||
if trailing_newline and joined and not joined.endswith("\n"):
|
||||
joined += "\n"
|
||||
|
||||
# 8. Restore original EOL style (only convert bare \n, not existing \r\n)
|
||||
if eol == "\r\n":
|
||||
joined = re.sub(r"(?<!\r)\n", "\r\n", joined)
|
||||
|
||||
# 9. Atomic write (write-to-tmp + os.replace)
|
||||
try:
|
||||
original_mode = os.stat(secure_path).st_mode
|
||||
fd, tmp_path = tempfile.mkstemp(dir=os.path.dirname(secure_path))
|
||||
fd_open = True
|
||||
try:
|
||||
if hasattr(os, "fchmod"):
|
||||
os.fchmod(fd, original_mode)
|
||||
with os.fdopen(fd, "w", encoding=encoding, newline="") as f:
|
||||
fd_open = False
|
||||
f.write(joined)
|
||||
os.replace(tmp_path, secure_path)
|
||||
except BaseException:
|
||||
if fd_open:
|
||||
os.close(fd)
|
||||
with contextlib.suppress(OSError):
|
||||
os.unlink(tmp_path)
|
||||
raise
|
||||
except Exception as e:
|
||||
return {"error": f"Failed to write file: {e}"}
|
||||
|
||||
# 10. Build response
|
||||
updated_lines = joined.splitlines()
|
||||
hashline_content = format_hashlines(updated_lines)
|
||||
|
||||
result = {
|
||||
"success": True,
|
||||
"path": path,
|
||||
"edits_applied": changes_made,
|
||||
"content": hashline_content,
|
||||
}
|
||||
if changes_made == 0:
|
||||
result["note"] = "Content unchanged after applying edits"
|
||||
if cleanup_actions:
|
||||
result["cleanup_applied"] = cleanup_actions
|
||||
if replace_counts:
|
||||
result["replacements"] = {
|
||||
f"edit_{op_idx + 1}": count for op_idx, count in replace_counts
|
||||
}
|
||||
return result
|
||||
@@ -32,12 +32,17 @@ view_file(
|
||||
| `workspace_id` | str | Yes | - | The ID of the workspace |
|
||||
| `agent_id` | str | Yes | - | The ID of the agent |
|
||||
| `session_id` | str | Yes | - | The ID of the current session |
|
||||
| `encoding` | str | No | `"utf-8"` | The encoding to use for reading the file |
|
||||
| `max_size` | int | No | `10485760` | Maximum size of file content to return in bytes (10 MB) |
|
||||
| `hashline` | bool | No | `False` | If True, return content with `N:hhhh\|content` anchors for use with `hashline_edit` |
|
||||
| `offset` | int | No | `1` | 1-indexed start line (only used when `hashline=True`) |
|
||||
| `limit` | int | No | `0` | Max lines to return, 0 = all (only used when `hashline=True`) |
|
||||
|
||||
## Returns
|
||||
|
||||
Returns a dictionary with the following structure:
|
||||
|
||||
**Success:**
|
||||
**Success (default mode):**
|
||||
```python
|
||||
{
|
||||
"success": True,
|
||||
@@ -48,6 +53,21 @@ Returns a dictionary with the following structure:
|
||||
}
|
||||
```
|
||||
|
||||
**Success (hashline mode):**
|
||||
```python
|
||||
{
|
||||
"success": True,
|
||||
"path": "app.py",
|
||||
"content": "1:a3f2|def main():\n2:f1c4| print(\"hello\")",
|
||||
"hashline": True,
|
||||
"offset": 1,
|
||||
"limit": 0,
|
||||
"total_lines": 2,
|
||||
"shown_lines": 2,
|
||||
"size_bytes": 35
|
||||
}
|
||||
```
|
||||
|
||||
**Error:**
|
||||
```python
|
||||
{
|
||||
|
||||
@@ -2,6 +2,8 @@ import os
|
||||
|
||||
from mcp.server.fastmcp import FastMCP
|
||||
|
||||
from aden_tools.hashline import HASHLINE_MAX_FILE_BYTES, format_hashlines
|
||||
|
||||
from ..security import get_secure_path
|
||||
|
||||
|
||||
@@ -18,7 +20,10 @@ def register_tools(mcp: FastMCP) -> None:
|
||||
agent_id: str,
|
||||
session_id: str,
|
||||
encoding: str = "utf-8",
|
||||
max_size: int = 10 * 1024 * 1024,
|
||||
max_size: int = HASHLINE_MAX_FILE_BYTES,
|
||||
hashline: bool = False,
|
||||
offset: int = 1,
|
||||
limit: int = 0,
|
||||
) -> dict:
|
||||
"""
|
||||
Purpose
|
||||
@@ -41,6 +46,10 @@ def register_tools(mcp: FastMCP) -> None:
|
||||
session_id: The ID of the current session
|
||||
encoding: The encoding to use for reading the file (default: "utf-8")
|
||||
max_size: The maximum size of file content to return in bytes (default: 10MB)
|
||||
hashline: If True, return content with N:hhhh|content anchors
|
||||
for use with hashline_edit (default: False)
|
||||
offset: 1-indexed start line, only used when hashline=True (default: 1)
|
||||
limit: Max lines to return, 0 = all, only used when hashline=True (default: 0)
|
||||
|
||||
Returns:
|
||||
Dict with file content and metadata, or error dict
|
||||
@@ -57,8 +66,59 @@ def register_tools(mcp: FastMCP) -> None:
|
||||
return {"error": f"Path is not a file: {path}"}
|
||||
|
||||
with open(secure_path, encoding=encoding) as f:
|
||||
content = f.read()
|
||||
content_raw = f.read()
|
||||
|
||||
if not hashline and (offset != 1 or limit != 0):
|
||||
return {
|
||||
"error": "offset and limit are only supported when hashline=True. "
|
||||
"Set hashline=True to use paging."
|
||||
}
|
||||
|
||||
if hashline:
|
||||
if offset < 1:
|
||||
return {"error": f"offset must be >= 1, got {offset}"}
|
||||
if limit < 0:
|
||||
return {"error": f"limit must be >= 0, got {limit}"}
|
||||
|
||||
all_lines = content_raw.splitlines()
|
||||
total_lines = len(all_lines)
|
||||
raw_size = len(content_raw.encode(encoding))
|
||||
|
||||
if offset > max(total_lines, 1):
|
||||
return {"error": f"offset {offset} is beyond end of file ({total_lines} lines)"}
|
||||
|
||||
# Check size after considering offset/limit. When paging
|
||||
# (offset or limit set), only check the formatted output size.
|
||||
# When reading the full file, check the raw size.
|
||||
is_paging = offset > 1 or limit > 0
|
||||
if not is_paging and raw_size > max_size:
|
||||
return {
|
||||
"error": f"File too large for hashline mode ({raw_size} bytes, "
|
||||
f"max {max_size}). Use offset and limit to read a section at a time."
|
||||
}
|
||||
|
||||
formatted = format_hashlines(all_lines, offset=offset, limit=limit)
|
||||
shown_lines = len(formatted.splitlines()) if formatted else 0
|
||||
|
||||
if is_paging and len(formatted.encode(encoding)) > max_size:
|
||||
return {
|
||||
"error": f"Requested section too large ({shown_lines} lines). "
|
||||
f"Reduce limit to read a smaller section."
|
||||
}
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"path": path,
|
||||
"content": formatted,
|
||||
"hashline": True,
|
||||
"offset": offset,
|
||||
"limit": limit,
|
||||
"total_lines": total_lines,
|
||||
"shown_lines": shown_lines,
|
||||
"size_bytes": raw_size,
|
||||
}
|
||||
|
||||
content = content_raw
|
||||
if len(content.encode(encoding)) > max_size:
|
||||
content = content[:max_size]
|
||||
content += "\n\n[... Content truncated due to size limit ...]"
|
||||
@@ -67,7 +127,7 @@ def register_tools(mcp: FastMCP) -> None:
|
||||
"success": True,
|
||||
"path": path,
|
||||
"content": content,
|
||||
"size_bytes": len(content.encode("utf-8")),
|
||||
"size_bytes": len(content.encode(encoding)),
|
||||
"lines": len(content.splitlines()),
|
||||
}
|
||||
except Exception as e:
|
||||
|
||||
+60
-1
@@ -1,11 +1,18 @@
|
||||
"""Shared fixtures for tools tests."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import os
|
||||
from collections.abc import Callable
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
from fastmcp import FastMCP
|
||||
|
||||
from aden_tools.credentials import CredentialStoreAdapter
|
||||
from aden_tools.credentials import CREDENTIAL_SPECS, CredentialStoreAdapter
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@@ -56,3 +63,55 @@ def large_text_file(tmp_path: Path) -> Path:
|
||||
large_file = tmp_path / "large.txt"
|
||||
large_file.write_text("x" * 20_000_000) # 20MB
|
||||
return large_file
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def live_credential_resolver() -> Callable[[str], str | None]:
|
||||
"""Resolve live credentials for integration tests.
|
||||
|
||||
Tries two sources in order:
|
||||
1. Environment variable (spec.env_var)
|
||||
2. CredentialStoreAdapter.default() (encrypted store + env fallback)
|
||||
|
||||
Returns a callable: resolver(credential_name) -> str | None.
|
||||
Credential values are never logged or exposed in test output.
|
||||
"""
|
||||
_adapter: CredentialStoreAdapter | None = None
|
||||
_adapter_init_failed = False
|
||||
|
||||
def _get_adapter() -> CredentialStoreAdapter | None:
|
||||
nonlocal _adapter, _adapter_init_failed
|
||||
if _adapter is not None:
|
||||
return _adapter
|
||||
if _adapter_init_failed:
|
||||
return None
|
||||
try:
|
||||
_adapter = CredentialStoreAdapter.default()
|
||||
except Exception as exc:
|
||||
logger.debug("Could not initialize CredentialStoreAdapter: %s", exc)
|
||||
_adapter_init_failed = True
|
||||
return _adapter
|
||||
|
||||
def resolve(credential_name: str) -> str | None:
|
||||
spec = CREDENTIAL_SPECS.get(credential_name)
|
||||
if spec is None:
|
||||
return None
|
||||
|
||||
# 1. Try env var directly
|
||||
value = os.environ.get(spec.env_var)
|
||||
if value:
|
||||
return value
|
||||
|
||||
# 2. Try the adapter (encrypted store + fallback)
|
||||
adapter = _get_adapter()
|
||||
if adapter is not None:
|
||||
try:
|
||||
value = adapter.get(credential_name)
|
||||
if value:
|
||||
return value
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return None
|
||||
|
||||
return resolve
|
||||
|
||||
@@ -23,7 +23,6 @@ from aden_tools.credentials import (
|
||||
EMAIL_CREDENTIALS,
|
||||
GITHUB_CREDENTIALS,
|
||||
HUBSPOT_CREDENTIALS,
|
||||
LLM_CREDENTIALS,
|
||||
SEARCH_CREDENTIALS,
|
||||
SLACK_CREDENTIALS,
|
||||
)
|
||||
@@ -206,7 +205,6 @@ class TestSpecsMergedIntoCredentialSpecs:
|
||||
"""All category credential dicts must be merged into the global CREDENTIAL_SPECS."""
|
||||
|
||||
CATEGORY_DICTS = {
|
||||
"LLM_CREDENTIALS": LLM_CREDENTIALS,
|
||||
"SEARCH_CREDENTIALS": SEARCH_CREDENTIALS,
|
||||
"EMAIL_CREDENTIALS": EMAIL_CREDENTIALS,
|
||||
"GITHUB_CREDENTIALS": GITHUB_CREDENTIALS,
|
||||
|
||||
@@ -308,18 +308,6 @@ class TestCredentialSpecs:
|
||||
assert spec.startup_required is False
|
||||
assert "brave.com" in spec.help_url
|
||||
|
||||
def test_anthropic_spec_exists(self):
|
||||
"""CREDENTIAL_SPECS includes anthropic with startup_required=True."""
|
||||
assert "anthropic" in CREDENTIAL_SPECS
|
||||
|
||||
spec = CREDENTIAL_SPECS["anthropic"]
|
||||
assert spec.env_var == "ANTHROPIC_API_KEY"
|
||||
assert spec.tools == []
|
||||
assert "event_loop" in spec.node_types
|
||||
assert spec.required is False
|
||||
assert spec.startup_required is False
|
||||
assert "anthropic.com" in spec.help_url
|
||||
|
||||
|
||||
class TestNodeTypeValidation:
|
||||
"""Tests for node type credential validation."""
|
||||
|
||||
@@ -6,7 +6,6 @@ import httpx
|
||||
|
||||
from aden_tools.credentials.health_check import (
|
||||
HEALTH_CHECKERS,
|
||||
AnthropicHealthChecker,
|
||||
ApolloHealthChecker,
|
||||
BrevoHealthChecker,
|
||||
CalcomHealthChecker,
|
||||
@@ -36,11 +35,6 @@ class TestHealthCheckerRegistry:
|
||||
assert "google_search" in HEALTH_CHECKERS
|
||||
assert isinstance(HEALTH_CHECKERS["google_search"], GoogleSearchHealthChecker)
|
||||
|
||||
def test_anthropic_registered(self):
|
||||
"""AnthropicHealthChecker is registered in HEALTH_CHECKERS."""
|
||||
assert "anthropic" in HEALTH_CHECKERS
|
||||
assert isinstance(HEALTH_CHECKERS["anthropic"], AnthropicHealthChecker)
|
||||
|
||||
def test_github_registered(self):
|
||||
"""GitHubHealthChecker is registered in HEALTH_CHECKERS."""
|
||||
assert "github" in HEALTH_CHECKERS
|
||||
@@ -74,29 +68,32 @@ class TestHealthCheckerRegistry:
|
||||
def test_all_expected_checkers_registered(self):
|
||||
"""All expected health checkers are in the registry."""
|
||||
expected = {
|
||||
"anthropic",
|
||||
"hubspot",
|
||||
"brave_search",
|
||||
"google_search",
|
||||
"google_maps",
|
||||
"github",
|
||||
"resend",
|
||||
"google_calendar_oauth",
|
||||
"google",
|
||||
"slack",
|
||||
"discord",
|
||||
"stripe",
|
||||
"exa_search",
|
||||
"google_docs",
|
||||
"calcom",
|
||||
"serpapi",
|
||||
"apify",
|
||||
"apollo",
|
||||
"asana",
|
||||
"attio",
|
||||
"brave_search",
|
||||
"brevo",
|
||||
"calcom",
|
||||
"calendly_pat",
|
||||
"discord",
|
||||
"docker_hub",
|
||||
"exa_search",
|
||||
"finlight",
|
||||
"github",
|
||||
"gitlab_token",
|
||||
"google",
|
||||
"google_calendar_oauth",
|
||||
"google_docs",
|
||||
"google_maps",
|
||||
"google_search",
|
||||
"google_search_console",
|
||||
"greenhouse_token",
|
||||
"hubspot",
|
||||
"huggingface",
|
||||
"intercom",
|
||||
"linear",
|
||||
@@ -106,10 +103,6 @@ class TestHealthCheckerRegistry:
|
||||
"notion_token",
|
||||
"pinecone",
|
||||
"pipedrive",
|
||||
"resend",
|
||||
"serpapi",
|
||||
"slack",
|
||||
"stripe",
|
||||
"telegram",
|
||||
"trello_key",
|
||||
"trello_token",
|
||||
@@ -120,81 +113,6 @@ class TestHealthCheckerRegistry:
|
||||
assert set(HEALTH_CHECKERS.keys()) == expected
|
||||
|
||||
|
||||
class TestAnthropicHealthChecker:
|
||||
"""Tests for AnthropicHealthChecker."""
|
||||
|
||||
def _mock_response(self, status_code, json_data=None):
|
||||
response = MagicMock(spec=httpx.Response)
|
||||
response.status_code = status_code
|
||||
if json_data:
|
||||
response.json.return_value = json_data
|
||||
return response
|
||||
|
||||
@patch("aden_tools.credentials.health_check.httpx.Client")
|
||||
def test_valid_key_200(self, mock_client_cls):
|
||||
mock_client = MagicMock()
|
||||
mock_client_cls.return_value.__enter__ = MagicMock(return_value=mock_client)
|
||||
mock_client_cls.return_value.__exit__ = MagicMock(return_value=False)
|
||||
mock_client.post.return_value = self._mock_response(200)
|
||||
|
||||
checker = AnthropicHealthChecker()
|
||||
result = checker.check("sk-ant-test-key")
|
||||
|
||||
assert result.valid is True
|
||||
assert "valid" in result.message.lower()
|
||||
|
||||
@patch("aden_tools.credentials.health_check.httpx.Client")
|
||||
def test_invalid_key_401(self, mock_client_cls):
|
||||
mock_client = MagicMock()
|
||||
mock_client_cls.return_value.__enter__ = MagicMock(return_value=mock_client)
|
||||
mock_client_cls.return_value.__exit__ = MagicMock(return_value=False)
|
||||
mock_client.post.return_value = self._mock_response(401)
|
||||
|
||||
checker = AnthropicHealthChecker()
|
||||
result = checker.check("invalid-key")
|
||||
|
||||
assert result.valid is False
|
||||
assert result.details["status_code"] == 401
|
||||
|
||||
@patch("aden_tools.credentials.health_check.httpx.Client")
|
||||
def test_rate_limited_429(self, mock_client_cls):
|
||||
mock_client = MagicMock()
|
||||
mock_client_cls.return_value.__enter__ = MagicMock(return_value=mock_client)
|
||||
mock_client_cls.return_value.__exit__ = MagicMock(return_value=False)
|
||||
mock_client.post.return_value = self._mock_response(429)
|
||||
|
||||
checker = AnthropicHealthChecker()
|
||||
result = checker.check("sk-ant-test-key")
|
||||
|
||||
assert result.valid is True
|
||||
assert result.details.get("rate_limited") is True
|
||||
|
||||
@patch("aden_tools.credentials.health_check.httpx.Client")
|
||||
def test_bad_request_400_still_valid(self, mock_client_cls):
|
||||
mock_client = MagicMock()
|
||||
mock_client_cls.return_value.__enter__ = MagicMock(return_value=mock_client)
|
||||
mock_client_cls.return_value.__exit__ = MagicMock(return_value=False)
|
||||
mock_client.post.return_value = self._mock_response(400)
|
||||
|
||||
checker = AnthropicHealthChecker()
|
||||
result = checker.check("sk-ant-test-key")
|
||||
|
||||
assert result.valid is True
|
||||
|
||||
@patch("aden_tools.credentials.health_check.httpx.Client")
|
||||
def test_timeout(self, mock_client_cls):
|
||||
mock_client = MagicMock()
|
||||
mock_client_cls.return_value.__enter__ = MagicMock(return_value=mock_client)
|
||||
mock_client_cls.return_value.__exit__ = MagicMock(return_value=False)
|
||||
mock_client.post.side_effect = httpx.TimeoutException("timed out")
|
||||
|
||||
checker = AnthropicHealthChecker()
|
||||
result = checker.check("sk-ant-test-key")
|
||||
|
||||
assert result.valid is False
|
||||
assert result.details["error"] == "timeout"
|
||||
|
||||
|
||||
class TestGitHubHealthChecker:
|
||||
"""Tests for GitHubHealthChecker."""
|
||||
|
||||
|
||||
@@ -0,0 +1,197 @@
|
||||
"""Live integration tests for credential health checkers.
|
||||
|
||||
These tests make REAL API calls. They are gated behind the ``live`` marker
|
||||
and never run in CI. Run them manually::
|
||||
|
||||
pytest -m live -s --log-cli-level=INFO # all live tests
|
||||
pytest -m live -k anthropic -s # just anthropic
|
||||
pytest -m live -k "not google" -s # skip google variants
|
||||
pytest -m live --tb=short -q # quick summary
|
||||
|
||||
Prerequisites:
|
||||
- Credentials available via env vars or ~/.hive/credentials/ encrypted store
|
||||
- Tests skip gracefully when credentials are unavailable
|
||||
- Rate-limited responses (429) are treated as PASS (credential is valid)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
|
||||
import pytest
|
||||
|
||||
from aden_tools.credentials import CREDENTIAL_SPECS
|
||||
from aden_tools.credentials.health_check import (
|
||||
HEALTH_CHECKERS,
|
||||
check_credential_health,
|
||||
validate_integration_wiring,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# All credential names that have registered health checkers
|
||||
CHECKER_NAMES = sorted(HEALTH_CHECKERS.keys())
|
||||
|
||||
|
||||
def _redact(value: str) -> str:
|
||||
"""Redact a credential for safe logging."""
|
||||
if len(value) <= 8:
|
||||
return "****"
|
||||
return f"{value[:4]}...{value[-2:]}"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 1. Direct checker tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.live
|
||||
class TestLiveHealthCheckers:
|
||||
"""Call each health checker against the real API."""
|
||||
|
||||
@pytest.mark.parametrize("credential_name", CHECKER_NAMES, ids=CHECKER_NAMES)
|
||||
def test_checker_returns_valid(self, credential_name, live_credential_resolver):
|
||||
"""Health checker returns valid=True with a real credential."""
|
||||
credential_value = live_credential_resolver(credential_name)
|
||||
if credential_value is None:
|
||||
spec = CREDENTIAL_SPECS.get(credential_name)
|
||||
env_var = spec.env_var if spec else "???"
|
||||
pytest.skip(f"No credential available ({env_var})")
|
||||
|
||||
checker = HEALTH_CHECKERS[credential_name]
|
||||
result = checker.check(credential_value)
|
||||
|
||||
logger.info(
|
||||
"Live check %s: valid=%s message=%r",
|
||||
credential_name,
|
||||
result.valid,
|
||||
result.message,
|
||||
)
|
||||
|
||||
assert result.valid is True, (
|
||||
f"Health check for '{credential_name}' returned valid=False: "
|
||||
f"{result.message} (details: {result.details})"
|
||||
)
|
||||
assert result.message
|
||||
|
||||
@pytest.mark.parametrize("credential_name", CHECKER_NAMES, ids=CHECKER_NAMES)
|
||||
def test_checker_extracts_identity(self, credential_name, live_credential_resolver):
|
||||
"""Identity metadata (when present) contains non-empty strings."""
|
||||
credential_value = live_credential_resolver(credential_name)
|
||||
if credential_value is None:
|
||||
pytest.skip(f"No credential available for '{credential_name}'")
|
||||
|
||||
checker = HEALTH_CHECKERS[credential_name]
|
||||
result = checker.check(credential_value)
|
||||
|
||||
assert result.valid is True, (
|
||||
f"Cannot verify identity -- health check failed: {result.message}"
|
||||
)
|
||||
|
||||
identity = result.details.get("identity", {})
|
||||
if identity:
|
||||
logger.info("Identity for %s: %s", credential_name, identity)
|
||||
for key, value in identity.items():
|
||||
assert isinstance(value, str), (
|
||||
f"Identity key '{key}' is not a string: {type(value)}"
|
||||
)
|
||||
assert value, f"Identity key '{key}' is empty"
|
||||
else:
|
||||
logger.info("No identity metadata for %s (OK for some APIs)", credential_name)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 2. Dispatcher path (check_credential_health)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.live
|
||||
class TestLiveDispatcher:
|
||||
"""Verify the full check_credential_health() dispatch path."""
|
||||
|
||||
@pytest.mark.parametrize("credential_name", CHECKER_NAMES, ids=CHECKER_NAMES)
|
||||
def test_dispatcher_returns_valid(self, credential_name, live_credential_resolver):
|
||||
"""check_credential_health() returns valid=True via dispatcher."""
|
||||
credential_value = live_credential_resolver(credential_name)
|
||||
if credential_value is None:
|
||||
pytest.skip(f"No credential available for '{credential_name}'")
|
||||
|
||||
result = check_credential_health(credential_name, credential_value)
|
||||
|
||||
logger.info(
|
||||
"Dispatcher check %s: valid=%s message=%r",
|
||||
credential_name,
|
||||
result.valid,
|
||||
result.message,
|
||||
)
|
||||
|
||||
assert result.valid is True, (
|
||||
f"Dispatcher check for '{credential_name}' returned valid=False: "
|
||||
f"{result.message} (details: {result.details})"
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 3. Integration wiring verification
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.live
|
||||
class TestLiveIntegrationWiring:
|
||||
"""validate_integration_wiring() passes for every registered checker."""
|
||||
|
||||
@pytest.mark.parametrize("credential_name", CHECKER_NAMES, ids=CHECKER_NAMES)
|
||||
def test_wiring_valid(self, credential_name):
|
||||
"""No wiring issues for credentials with health checkers."""
|
||||
issues = validate_integration_wiring(credential_name)
|
||||
assert not issues, f"Wiring issues for '{credential_name}':\n" + "\n".join(
|
||||
f" - {i}" for i in issues
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 4. Summary reporter
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.live
|
||||
class TestLiveCredentialSummary:
|
||||
"""Print a human-readable summary of tested vs skipped credentials."""
|
||||
|
||||
def test_credential_availability_summary(self, live_credential_resolver):
|
||||
"""Report which credentials were available for live testing."""
|
||||
available = []
|
||||
skipped = []
|
||||
|
||||
for name in CHECKER_NAMES:
|
||||
value = live_credential_resolver(name)
|
||||
spec = CREDENTIAL_SPECS.get(name)
|
||||
env_var = spec.env_var if spec else "???"
|
||||
if value:
|
||||
available.append((name, env_var))
|
||||
else:
|
||||
skipped.append((name, env_var))
|
||||
|
||||
lines = [
|
||||
"",
|
||||
"=" * 60,
|
||||
"LIVE CREDENTIAL TEST SUMMARY",
|
||||
"=" * 60,
|
||||
f" Available: {len(available)} / {len(CHECKER_NAMES)}",
|
||||
f" Skipped: {len(skipped)} / {len(CHECKER_NAMES)}",
|
||||
"",
|
||||
]
|
||||
if available:
|
||||
lines.append(" TESTED:")
|
||||
for name, env_var in available:
|
||||
lines.append(f" [PASS] {name} ({env_var})")
|
||||
if skipped:
|
||||
lines.append("")
|
||||
lines.append(" SKIPPED (no credential):")
|
||||
for name, env_var in skipped:
|
||||
lines.append(f" [SKIP] {name} ({env_var})")
|
||||
lines.append("=" * 60)
|
||||
|
||||
summary = "\n".join(lines)
|
||||
logger.info(summary)
|
||||
print(summary) # noqa: T201 -- visible with pytest -s
|
||||
@@ -0,0 +1,143 @@
|
||||
"""Tests for aden_tools.file_ops (shared file tools).
|
||||
|
||||
These tests cover Windows compatibility concerns: path relativization
|
||||
in search_files (ripgrep and Python fallback) and cross-platform behavior.
|
||||
"""
|
||||
|
||||
import os
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
from fastmcp import FastMCP
|
||||
|
||||
from aden_tools.file_ops import register_file_tools
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def file_ops_mcp(tmp_path):
|
||||
"""Create FastMCP with file_ops registered, sandboxed to tmp_path."""
|
||||
|
||||
def resolve_path(p: str) -> str:
|
||||
if os.path.isabs(p):
|
||||
return os.path.normpath(p)
|
||||
return str((tmp_path / p).resolve())
|
||||
|
||||
mcp = FastMCP("test-file-ops")
|
||||
register_file_tools(
|
||||
mcp,
|
||||
resolve_path=resolve_path,
|
||||
project_root=str(tmp_path),
|
||||
)
|
||||
return mcp
|
||||
|
||||
|
||||
def _get_tool_fn(mcp, name):
|
||||
"""Extract the raw function for a registered tool."""
|
||||
return mcp._tool_manager._tools[name].fn
|
||||
|
||||
|
||||
class TestSearchFilesPathRelativization:
|
||||
"""Tests for search_files path handling (Windows path separator fix)."""
|
||||
|
||||
def test_ripgrep_output_with_backslash_relativized(self, file_ops_mcp, tmp_path):
|
||||
"""Ripgrep output with backslashes (Windows) relativized when project_root set.
|
||||
|
||||
Simulates: rg outputs 'C:\\Users\\...\\proj\\src\\foo.py:1:needle'
|
||||
Expected: output should show 'src\\foo.py:1:needle' or 'src/foo.py:1:needle'
|
||||
(relativized, not full path).
|
||||
"""
|
||||
# Create a file so the search has something to find
|
||||
(tmp_path / "src").mkdir()
|
||||
(tmp_path / "src" / "foo.py").write_text("needle\n")
|
||||
project_root = str(tmp_path)
|
||||
|
||||
# Ripgrep on Windows outputs backslash-separated paths
|
||||
# Format: path:line_num:content
|
||||
rg_output = f"{project_root}{os.sep}src{os.sep}foo.py:1:needle"
|
||||
|
||||
search_fn = _get_tool_fn(file_ops_mcp, "search_files")
|
||||
|
||||
with patch("aden_tools.file_ops.subprocess.run") as mock_run:
|
||||
mock_run.return_value = type(
|
||||
"Result", (), {"returncode": 0, "stdout": rg_output, "stderr": ""}
|
||||
)()
|
||||
|
||||
result = search_fn(
|
||||
pattern="needle",
|
||||
path=str(tmp_path),
|
||||
)
|
||||
|
||||
# Output should be relativized (no full project_root in the line)
|
||||
assert project_root not in result, (
|
||||
f"Output should not contain full project_root. Got: {result!r}"
|
||||
)
|
||||
# Should contain the relative path part
|
||||
assert "foo.py" in result
|
||||
assert "1:" in result or ":1:" in result
|
||||
|
||||
def test_ripgrep_output_with_forward_slash_relativized(self, file_ops_mcp, tmp_path):
|
||||
"""Ripgrep output using forward slashes (Unix/rg default) should be relativized."""
|
||||
(tmp_path / "src").mkdir()
|
||||
(tmp_path / "src" / "bar.py").write_text("pattern_match\n")
|
||||
project_root = str(tmp_path)
|
||||
|
||||
# Some ripgrep builds output forward slashes even on Windows
|
||||
rg_output = f"{project_root}/src/bar.py:1:pattern_match"
|
||||
|
||||
search_fn = _get_tool_fn(file_ops_mcp, "search_files")
|
||||
|
||||
with patch("aden_tools.file_ops.subprocess.run") as mock_run:
|
||||
mock_run.return_value = type(
|
||||
"Result", (), {"returncode": 0, "stdout": rg_output, "stderr": ""}
|
||||
)()
|
||||
|
||||
result = search_fn(
|
||||
pattern="pattern_match",
|
||||
path=str(tmp_path),
|
||||
)
|
||||
|
||||
assert project_root not in result or "src/bar.py" in result
|
||||
assert "bar.py" in result
|
||||
|
||||
def test_python_fallback_relativizes_paths(self, file_ops_mcp, tmp_path):
|
||||
"""Python fallback (no ripgrep) uses os.path.relpath - should work on all platforms."""
|
||||
(tmp_path / "subdir").mkdir()
|
||||
(tmp_path / "subdir" / "baz.txt").write_text("find_me\n")
|
||||
|
||||
search_fn = _get_tool_fn(file_ops_mcp, "search_files")
|
||||
|
||||
# Ensure ripgrep is not used
|
||||
with patch("aden_tools.file_ops.subprocess.run", side_effect=FileNotFoundError()):
|
||||
result = search_fn(
|
||||
pattern="find_me",
|
||||
path=str(tmp_path),
|
||||
)
|
||||
|
||||
# Python fallback uses os.path.relpath - should produce relative path
|
||||
project_root = str(tmp_path)
|
||||
assert project_root not in result or "subdir" in result
|
||||
assert "baz.txt" in result
|
||||
assert "1:" in result or ":1:" in result
|
||||
|
||||
|
||||
class TestSearchFilesBasic:
|
||||
"""Basic search_files behavior (no path mocking)."""
|
||||
|
||||
def test_search_finds_content(self, file_ops_mcp, tmp_path):
|
||||
"""search_files finds matching content via Python fallback when rg absent."""
|
||||
(tmp_path / "hello.txt").write_text("world\n")
|
||||
|
||||
search_fn = _get_tool_fn(file_ops_mcp, "search_files")
|
||||
|
||||
with patch("aden_tools.file_ops.subprocess.run", side_effect=FileNotFoundError()):
|
||||
result = search_fn(pattern="world", path=str(tmp_path))
|
||||
|
||||
assert "world" in result
|
||||
assert "hello.txt" in result
|
||||
|
||||
def test_search_nonexistent_dir_returns_error(self, file_ops_mcp, tmp_path):
|
||||
"""search_files on non-existent directory returns error."""
|
||||
search_fn = _get_tool_fn(file_ops_mcp, "search_files")
|
||||
result = search_fn(pattern="x", path=str(tmp_path / "nonexistent"))
|
||||
assert "Error" in result
|
||||
assert "not found" in result.lower()
|
||||
@@ -0,0 +1,459 @@
|
||||
"""Tests for hashline support in file_ops (coder tools)."""
|
||||
|
||||
import json
|
||||
import os
|
||||
|
||||
import pytest
|
||||
from fastmcp import FastMCP
|
||||
|
||||
from aden_tools.hashline import compute_line_hash
|
||||
|
||||
|
||||
def _anchor(line_num, line_text):
|
||||
"""Build an anchor string N:hhhh."""
|
||||
return f"{line_num}:{compute_line_hash(line_text)}"
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def tools(tmp_path):
|
||||
"""Register file_ops tools with tmp_path as project root."""
|
||||
from aden_tools.file_ops import register_file_tools
|
||||
|
||||
mcp = FastMCP("test-server")
|
||||
write_calls = []
|
||||
|
||||
def _resolve(p):
|
||||
return str(tmp_path / p)
|
||||
|
||||
def _before_write():
|
||||
write_calls.append(1)
|
||||
|
||||
register_file_tools(
|
||||
mcp,
|
||||
resolve_path=_resolve,
|
||||
before_write=_before_write,
|
||||
project_root=str(tmp_path),
|
||||
)
|
||||
tool_map = {name: t.fn for name, t in mcp._tool_manager._tools.items()}
|
||||
return tool_map, write_calls
|
||||
|
||||
|
||||
# ── read_file hashline ────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestReadFileHashline:
|
||||
def test_hashline_format(self, tools, tmp_path):
|
||||
"""hashline=True returns N:hhhh|content format."""
|
||||
read_file = tools[0]["read_file"]
|
||||
(tmp_path / "f.txt").write_text("hello\nworld\n")
|
||||
|
||||
result = read_file(path="f.txt", hashline=True)
|
||||
lines = result.strip().split("\n")
|
||||
# First two lines should be hashline formatted
|
||||
h1 = compute_line_hash("hello")
|
||||
h2 = compute_line_hash("world")
|
||||
assert lines[0] == f"1:{h1}|hello"
|
||||
assert lines[1] == f"2:{h2}|world"
|
||||
|
||||
def test_hashline_false_unchanged(self, tools, tmp_path):
|
||||
"""Default (hashline=False) returns standard line-number format."""
|
||||
read_file = tools[0]["read_file"]
|
||||
(tmp_path / "f.txt").write_text("hello\n")
|
||||
|
||||
result = read_file(path="f.txt", hashline=False)
|
||||
# Standard format uses tab-separated line numbers
|
||||
assert "\t" in result
|
||||
assert "hello" in result
|
||||
|
||||
def test_hashline_offset_limit(self, tools, tmp_path):
|
||||
"""offset and limit work in hashline mode."""
|
||||
read_file = tools[0]["read_file"]
|
||||
lines = [f"line{i}" for i in range(1, 11)]
|
||||
(tmp_path / "f.txt").write_text("\n".join(lines) + "\n")
|
||||
|
||||
result = read_file(path="f.txt", offset=3, limit=2, hashline=True)
|
||||
output_lines = [ln for ln in result.split("\n") if ln and not ln.startswith("(")]
|
||||
assert len(output_lines) == 2
|
||||
h3 = compute_line_hash("line3")
|
||||
assert output_lines[0] == f"3:{h3}|line3"
|
||||
|
||||
def test_hashline_no_line_truncation(self, tools, tmp_path):
|
||||
"""hashline mode doesn't truncate long lines (would corrupt hashes)."""
|
||||
read_file = tools[0]["read_file"]
|
||||
long_line = "x" * 3000
|
||||
(tmp_path / "f.txt").write_text(long_line + "\n")
|
||||
|
||||
result = read_file(path="f.txt", hashline=True)
|
||||
h = compute_line_hash(long_line)
|
||||
assert f"1:{h}|{long_line}" in result
|
||||
|
||||
|
||||
# ── search_files hashline ─────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestSearchFilesHashline:
|
||||
def test_hashline_in_results(self, tools, tmp_path):
|
||||
"""hashline=True adds hash anchors to search results."""
|
||||
search_files = tools[0]["search_files"]
|
||||
(tmp_path / "f.py").write_text("def foo():\n pass\n")
|
||||
|
||||
result = search_files(pattern="def foo", path=".", hashline=True)
|
||||
# Result should contain hash anchor
|
||||
h = compute_line_hash("def foo():")
|
||||
assert h in result
|
||||
assert f":{h}|" in result
|
||||
|
||||
def test_hashline_false_unchanged(self, tools, tmp_path):
|
||||
"""Default search has no hash anchors."""
|
||||
search_files = tools[0]["search_files"]
|
||||
(tmp_path / "f.py").write_text("def foo():\n pass\n")
|
||||
|
||||
result = search_files(pattern="def foo", path=".", hashline=False)
|
||||
h = compute_line_hash("def foo():")
|
||||
assert f":{h}|" not in result
|
||||
|
||||
|
||||
# ── hashline_edit ─────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestHashlineEditBasic:
|
||||
def test_returns_string(self, tools, tmp_path):
|
||||
"""hashline_edit returns a string, not a dict."""
|
||||
hashline_edit = tools[0]["hashline_edit"]
|
||||
f = tmp_path / "f.txt"
|
||||
f.write_text("aaa\nbbb\nccc\n")
|
||||
|
||||
edits = json.dumps([{"op": "set_line", "anchor": _anchor(2, "bbb"), "content": "BBB"}])
|
||||
result = hashline_edit(path="f.txt", edits=edits)
|
||||
assert isinstance(result, str)
|
||||
assert "Applied" in result
|
||||
|
||||
def test_calls_before_write(self, tools, tmp_path):
|
||||
"""hashline_edit calls the before_write hook."""
|
||||
hashline_edit = tools[0]["hashline_edit"]
|
||||
write_calls = tools[1]
|
||||
f = tmp_path / "f.txt"
|
||||
f.write_text("aaa\nbbb\nccc\n")
|
||||
|
||||
edits = json.dumps([{"op": "set_line", "anchor": _anchor(2, "bbb"), "content": "BBB"}])
|
||||
hashline_edit(path="f.txt", edits=edits)
|
||||
assert len(write_calls) == 1
|
||||
|
||||
def test_invalid_json(self, tools, tmp_path):
|
||||
"""Invalid JSON returns error string."""
|
||||
hashline_edit = tools[0]["hashline_edit"]
|
||||
(tmp_path / "f.txt").write_text("aaa\n")
|
||||
result = hashline_edit(path="f.txt", edits="not json")
|
||||
assert "Error" in result
|
||||
assert "Invalid JSON" in result
|
||||
|
||||
def test_empty_edits(self, tools, tmp_path):
|
||||
"""Empty edits array returns error."""
|
||||
hashline_edit = tools[0]["hashline_edit"]
|
||||
(tmp_path / "f.txt").write_text("aaa\n")
|
||||
result = hashline_edit(path="f.txt", edits="[]")
|
||||
assert "Error" in result
|
||||
assert "empty" in result
|
||||
|
||||
def test_file_not_found(self, tools, tmp_path):
|
||||
"""Missing file returns error."""
|
||||
hashline_edit = tools[0]["hashline_edit"]
|
||||
edits = json.dumps([{"op": "set_line", "anchor": "1:abcd", "content": "x"}])
|
||||
result = hashline_edit(path="nope.txt", edits=edits)
|
||||
assert "Error" in result
|
||||
assert "not found" in result
|
||||
|
||||
|
||||
class TestHashlineEditSetLine:
|
||||
def test_set_line(self, tools, tmp_path):
|
||||
"""set_line replaces a single line."""
|
||||
hashline_edit = tools[0]["hashline_edit"]
|
||||
f = tmp_path / "f.txt"
|
||||
f.write_text("aaa\nbbb\nccc\n")
|
||||
|
||||
edits = json.dumps([{"op": "set_line", "anchor": _anchor(2, "bbb"), "content": "BBB"}])
|
||||
result = hashline_edit(path="f.txt", edits=edits)
|
||||
assert "Applied 1 edit" in result
|
||||
assert f.read_text() == "aaa\nBBB\nccc\n"
|
||||
|
||||
def test_set_line_hash_mismatch(self, tools, tmp_path):
|
||||
"""set_line with wrong hash returns error."""
|
||||
hashline_edit = tools[0]["hashline_edit"]
|
||||
f = tmp_path / "f.txt"
|
||||
f.write_text("aaa\nbbb\nccc\n")
|
||||
|
||||
edits = json.dumps([{"op": "set_line", "anchor": "2:ffff", "content": "BBB"}])
|
||||
result = hashline_edit(path="f.txt", edits=edits)
|
||||
assert "Error" in result
|
||||
assert "mismatch" in result.lower()
|
||||
|
||||
def test_set_line_delete(self, tools, tmp_path):
|
||||
"""set_line with empty content deletes the line."""
|
||||
hashline_edit = tools[0]["hashline_edit"]
|
||||
f = tmp_path / "f.txt"
|
||||
f.write_text("aaa\nbbb\nccc\n")
|
||||
|
||||
edits = json.dumps([{"op": "set_line", "anchor": _anchor(2, "bbb"), "content": ""}])
|
||||
result = hashline_edit(path="f.txt", edits=edits)
|
||||
assert "Applied 1 edit" in result
|
||||
assert f.read_text() == "aaa\nccc\n"
|
||||
|
||||
|
||||
class TestHashlineEditReplaceLines:
|
||||
def test_replace_lines(self, tools, tmp_path):
|
||||
"""replace_lines replaces a range."""
|
||||
hashline_edit = tools[0]["hashline_edit"]
|
||||
f = tmp_path / "f.txt"
|
||||
f.write_text("aaa\nbbb\nccc\nddd\n")
|
||||
|
||||
edits = json.dumps(
|
||||
[
|
||||
{
|
||||
"op": "replace_lines",
|
||||
"start_anchor": _anchor(2, "bbb"),
|
||||
"end_anchor": _anchor(3, "ccc"),
|
||||
"content": "XXX\nYYY\nZZZ",
|
||||
}
|
||||
]
|
||||
)
|
||||
result = hashline_edit(path="f.txt", edits=edits)
|
||||
assert "Applied 1 edit" in result
|
||||
assert f.read_text() == "aaa\nXXX\nYYY\nZZZ\nddd\n"
|
||||
|
||||
|
||||
class TestHashlineEditInsert:
|
||||
def test_insert_after(self, tools, tmp_path):
|
||||
"""insert_after adds lines after the anchor."""
|
||||
hashline_edit = tools[0]["hashline_edit"]
|
||||
f = tmp_path / "f.txt"
|
||||
f.write_text("aaa\nbbb\nccc\n")
|
||||
|
||||
edits = json.dumps(
|
||||
[
|
||||
{
|
||||
"op": "insert_after",
|
||||
"anchor": _anchor(1, "aaa"),
|
||||
"content": "NEW",
|
||||
}
|
||||
]
|
||||
)
|
||||
result = hashline_edit(path="f.txt", edits=edits)
|
||||
assert "Applied 1 edit" in result
|
||||
assert f.read_text() == "aaa\nNEW\nbbb\nccc\n"
|
||||
|
||||
def test_insert_before(self, tools, tmp_path):
|
||||
"""insert_before adds lines before the anchor."""
|
||||
hashline_edit = tools[0]["hashline_edit"]
|
||||
f = tmp_path / "f.txt"
|
||||
f.write_text("aaa\nbbb\nccc\n")
|
||||
|
||||
edits = json.dumps(
|
||||
[
|
||||
{
|
||||
"op": "insert_before",
|
||||
"anchor": _anchor(2, "bbb"),
|
||||
"content": "NEW",
|
||||
}
|
||||
]
|
||||
)
|
||||
result = hashline_edit(path="f.txt", edits=edits)
|
||||
assert "Applied 1 edit" in result
|
||||
assert f.read_text() == "aaa\nNEW\nbbb\nccc\n"
|
||||
|
||||
|
||||
class TestHashlineEditReplace:
|
||||
def test_replace(self, tools, tmp_path):
|
||||
"""replace does string replacement."""
|
||||
hashline_edit = tools[0]["hashline_edit"]
|
||||
f = tmp_path / "f.txt"
|
||||
f.write_text("aaa\nbbb\nccc\n")
|
||||
|
||||
edits = json.dumps(
|
||||
[
|
||||
{
|
||||
"op": "replace",
|
||||
"old_content": "bbb",
|
||||
"new_content": "BBB",
|
||||
}
|
||||
]
|
||||
)
|
||||
result = hashline_edit(path="f.txt", edits=edits)
|
||||
assert "Applied 1 edit" in result
|
||||
assert f.read_text() == "aaa\nBBB\nccc\n"
|
||||
|
||||
def test_replace_not_found(self, tools, tmp_path):
|
||||
"""replace with missing old_content returns error."""
|
||||
hashline_edit = tools[0]["hashline_edit"]
|
||||
f = tmp_path / "f.txt"
|
||||
f.write_text("aaa\nbbb\nccc\n")
|
||||
|
||||
edits = json.dumps(
|
||||
[
|
||||
{
|
||||
"op": "replace",
|
||||
"old_content": "zzz",
|
||||
"new_content": "ZZZ",
|
||||
}
|
||||
]
|
||||
)
|
||||
result = hashline_edit(path="f.txt", edits=edits)
|
||||
assert "Error" in result
|
||||
assert "not found" in result
|
||||
|
||||
|
||||
class TestHashlineEditAppend:
|
||||
def test_append(self, tools, tmp_path):
|
||||
"""append adds content at end of file."""
|
||||
hashline_edit = tools[0]["hashline_edit"]
|
||||
f = tmp_path / "f.txt"
|
||||
f.write_text("aaa\nbbb\n")
|
||||
|
||||
edits = json.dumps([{"op": "append", "content": "ccc\nddd"}])
|
||||
result = hashline_edit(path="f.txt", edits=edits)
|
||||
assert "Applied 1 edit" in result
|
||||
assert f.read_text() == "aaa\nbbb\nccc\nddd\n"
|
||||
|
||||
|
||||
class TestHashlineEditOverlap:
|
||||
def test_overlapping_edits_rejected(self, tools, tmp_path):
|
||||
"""Overlapping splice ranges are rejected."""
|
||||
hashline_edit = tools[0]["hashline_edit"]
|
||||
f = tmp_path / "f.txt"
|
||||
f.write_text("aaa\nbbb\nccc\nddd\n")
|
||||
|
||||
edits = json.dumps(
|
||||
[
|
||||
{"op": "set_line", "anchor": _anchor(2, "bbb"), "content": "BBB"},
|
||||
{
|
||||
"op": "replace_lines",
|
||||
"start_anchor": _anchor(1, "aaa"),
|
||||
"end_anchor": _anchor(3, "ccc"),
|
||||
"content": "XXX",
|
||||
},
|
||||
]
|
||||
)
|
||||
result = hashline_edit(path="f.txt", edits=edits)
|
||||
assert "Error" in result
|
||||
assert "Overlapping" in result
|
||||
|
||||
|
||||
class TestHashlineEditAutoCleanup:
|
||||
def test_strips_hashline_prefix_multiline(self, tools, tmp_path):
|
||||
"""auto_cleanup strips N:hhhh| prefixes from multi-line content."""
|
||||
hashline_edit = tools[0]["hashline_edit"]
|
||||
f = tmp_path / "f.txt"
|
||||
f.write_text("aaa\nbbb\nccc\nddd\n")
|
||||
|
||||
h_bbb = compute_line_hash("bbb")
|
||||
h_ccc = compute_line_hash("ccc")
|
||||
# LLM echoes hashline prefixes in replace_lines content
|
||||
edits = json.dumps(
|
||||
[
|
||||
{
|
||||
"op": "replace_lines",
|
||||
"start_anchor": _anchor(2, "bbb"),
|
||||
"end_anchor": _anchor(3, "ccc"),
|
||||
"content": f"2:{h_bbb}|BBB\n3:{h_ccc}|CCC",
|
||||
}
|
||||
]
|
||||
)
|
||||
result = hashline_edit(path="f.txt", edits=edits)
|
||||
assert "Applied 1 edit" in result
|
||||
# Should have stripped the prefixes
|
||||
assert f.read_text() == "aaa\nBBB\nCCC\nddd\n"
|
||||
assert "cleanup" in result.lower()
|
||||
|
||||
def test_no_cleanup_when_disabled(self, tools, tmp_path):
|
||||
"""auto_cleanup=False writes content as-is."""
|
||||
hashline_edit = tools[0]["hashline_edit"]
|
||||
f = tmp_path / "f.txt"
|
||||
f.write_text("aaa\nbbb\nccc\n")
|
||||
|
||||
h = compute_line_hash("bbb")
|
||||
raw_content = f"2:{h}|BBB"
|
||||
edits = json.dumps(
|
||||
[
|
||||
{
|
||||
"op": "set_line",
|
||||
"anchor": _anchor(2, "bbb"),
|
||||
"content": raw_content,
|
||||
}
|
||||
]
|
||||
)
|
||||
result = hashline_edit(path="f.txt", edits=edits, auto_cleanup=False)
|
||||
assert "Applied 1 edit" in result
|
||||
assert f.read_text() == f"aaa\n{raw_content}\nccc\n"
|
||||
|
||||
|
||||
class TestHashlineEditAtomicWrite:
|
||||
def test_preserves_permissions(self, tools, tmp_path):
|
||||
"""Atomic write preserves original file permissions."""
|
||||
hashline_edit = tools[0]["hashline_edit"]
|
||||
f = tmp_path / "f.txt"
|
||||
f.write_text("aaa\nbbb\n")
|
||||
os.chmod(f, 0o755)
|
||||
|
||||
edits = json.dumps([{"op": "set_line", "anchor": _anchor(1, "aaa"), "content": "AAA"}])
|
||||
hashline_edit(path="f.txt", edits=edits)
|
||||
assert os.stat(f).st_mode & 0o777 == 0o755
|
||||
|
||||
def test_preserves_trailing_newline(self, tools, tmp_path):
|
||||
"""Files with trailing newline keep it after edit."""
|
||||
hashline_edit = tools[0]["hashline_edit"]
|
||||
f = tmp_path / "f.txt"
|
||||
f.write_text("aaa\nbbb\n")
|
||||
|
||||
edits = json.dumps([{"op": "set_line", "anchor": _anchor(1, "aaa"), "content": "AAA"}])
|
||||
hashline_edit(path="f.txt", edits=edits)
|
||||
assert f.read_text().endswith("\n")
|
||||
|
||||
def test_unknown_op(self, tools, tmp_path):
|
||||
"""Unknown op returns error."""
|
||||
hashline_edit = tools[0]["hashline_edit"]
|
||||
f = tmp_path / "f.txt"
|
||||
f.write_text("aaa\n")
|
||||
|
||||
edits = json.dumps([{"op": "delete_line", "anchor": "1:abcd"}])
|
||||
result = hashline_edit(path="f.txt", edits=edits)
|
||||
assert "Error" in result
|
||||
assert "unknown op" in result
|
||||
|
||||
def test_crlf_replace_op_no_double_conversion(self, tools, tmp_path):
|
||||
"""Replace op on a CRLF file should not corrupt \\r\\n in new_content."""
|
||||
hashline_edit = tools[0]["hashline_edit"]
|
||||
f = tmp_path / "f.txt"
|
||||
f.write_bytes(b"aaa\r\nbbb\r\nccc\r\n")
|
||||
|
||||
edits = json.dumps([{"op": "replace", "old_content": "aaa", "new_content": "x\r\ny"}])
|
||||
result = hashline_edit(path="f.txt", edits=edits)
|
||||
assert "Error" not in result
|
||||
|
||||
raw = f.read_bytes()
|
||||
assert b"\r\r\n" not in raw
|
||||
assert raw == b"x\r\ny\r\nbbb\r\nccc\r\n"
|
||||
|
||||
|
||||
class TestHashlineEditResponseFormat:
|
||||
def test_shows_updated_content(self, tools, tmp_path):
|
||||
"""Response includes updated hashline content."""
|
||||
hashline_edit = tools[0]["hashline_edit"]
|
||||
f = tmp_path / "f.txt"
|
||||
f.write_text("aaa\nbbb\nccc\n")
|
||||
|
||||
edits = json.dumps([{"op": "set_line", "anchor": _anchor(2, "bbb"), "content": "BBB"}])
|
||||
result = hashline_edit(path="f.txt", edits=edits)
|
||||
# Should show updated content in hashline format
|
||||
h_new = compute_line_hash("BBB")
|
||||
assert f"2:{h_new}|BBB" in result
|
||||
|
||||
def test_pagination_hint_for_large_files(self, tools, tmp_path):
|
||||
"""Response includes pagination hint when file > 200 lines."""
|
||||
hashline_edit = tools[0]["hashline_edit"]
|
||||
f = tmp_path / "f.txt"
|
||||
lines = [f"line{i}" for i in range(300)]
|
||||
f.write_text("\n".join(lines) + "\n")
|
||||
|
||||
edits = json.dumps([{"op": "set_line", "anchor": _anchor(1, "line0"), "content": "FIRST"}])
|
||||
result = hashline_edit(path="f.txt", edits=edits)
|
||||
assert "Showing first 200" in result
|
||||
assert "300 lines" in result
|
||||
@@ -1,5 +1,6 @@
|
||||
"""Tests for file_system_toolkits tools (FastMCP)."""
|
||||
|
||||
import json
|
||||
import os
|
||||
from unittest.mock import patch
|
||||
|
||||
@@ -70,7 +71,11 @@ def mock_secure_path(tmp_path):
|
||||
"aden_tools.tools.file_system_toolkits.execute_command_tool.execute_command_tool.WORKSPACES_DIR",
|
||||
str(tmp_path),
|
||||
):
|
||||
yield
|
||||
with patch(
|
||||
"aden_tools.tools.file_system_toolkits.hashline_edit.hashline_edit.get_secure_path",
|
||||
side_effect=_get_secure_path,
|
||||
):
|
||||
yield
|
||||
|
||||
|
||||
class TestViewFileTool:
|
||||
@@ -203,6 +208,42 @@ class TestViewFileTool:
|
||||
assert "error" in result
|
||||
assert "Failed to read file" in result["error"]
|
||||
|
||||
def test_offset_without_hashline_returns_error(
|
||||
self, view_file_fn, mock_workspace, mock_secure_path, tmp_path
|
||||
):
|
||||
"""Passing offset without hashline=True returns error."""
|
||||
test_file = tmp_path / "test.txt"
|
||||
test_file.write_text("aaa\nbbb\nccc\n")
|
||||
|
||||
result = view_file_fn(path="test.txt", offset=5, **mock_workspace)
|
||||
|
||||
assert "error" in result
|
||||
assert "hashline=True" in result["error"]
|
||||
|
||||
def test_limit_without_hashline_returns_error(
|
||||
self, view_file_fn, mock_workspace, mock_secure_path, tmp_path
|
||||
):
|
||||
"""Passing limit without hashline=True returns error."""
|
||||
test_file = tmp_path / "test.txt"
|
||||
test_file.write_text("aaa\nbbb\nccc\n")
|
||||
|
||||
result = view_file_fn(path="test.txt", limit=10, **mock_workspace)
|
||||
|
||||
assert "error" in result
|
||||
assert "hashline=True" in result["error"]
|
||||
|
||||
def test_offset_and_limit_without_hashline_returns_error(
|
||||
self, view_file_fn, mock_workspace, mock_secure_path, tmp_path
|
||||
):
|
||||
"""Passing both offset and limit without hashline=True returns error."""
|
||||
test_file = tmp_path / "test.txt"
|
||||
test_file.write_text("aaa\nbbb\nccc\n")
|
||||
|
||||
result = view_file_fn(path="test.txt", offset=2, limit=5, **mock_workspace)
|
||||
|
||||
assert "error" in result
|
||||
assert "hashline=True" in result["error"]
|
||||
|
||||
|
||||
class TestWriteToFileTool:
|
||||
"""Tests for write_to_file tool."""
|
||||
@@ -762,3 +803,297 @@ class TestApplyPatchTool:
|
||||
assert result["success"] is True
|
||||
assert result["all_successful"] is True
|
||||
assert test_file.read_text(encoding="utf-8") == modified
|
||||
|
||||
|
||||
class TestViewFileHashlineMode:
|
||||
"""Tests for view_file hashline mode."""
|
||||
|
||||
@pytest.fixture
|
||||
def view_file_fn(self, mcp):
|
||||
from aden_tools.tools.file_system_toolkits.view_file import register_tools
|
||||
|
||||
register_tools(mcp)
|
||||
return mcp._tool_manager._tools["view_file"].fn
|
||||
|
||||
def test_hashline_format(self, view_file_fn, mock_workspace, mock_secure_path, tmp_path):
|
||||
"""hashline=True returns N:hhhh|content format."""
|
||||
test_file = tmp_path / "test.txt"
|
||||
test_file.write_text("hello\nworld\n")
|
||||
|
||||
result = view_file_fn(path="test.txt", hashline=True, **mock_workspace)
|
||||
|
||||
assert result["success"] is True
|
||||
assert result["hashline"] is True
|
||||
lines = result["content"].split("\n")
|
||||
assert lines[0].startswith("1:")
|
||||
assert "|hello" in lines[0]
|
||||
assert lines[1].startswith("2:")
|
||||
assert "|world" in lines[1]
|
||||
|
||||
def test_hashline_offset(self, view_file_fn, mock_workspace, mock_secure_path, tmp_path):
|
||||
"""hashline with offset skips initial lines."""
|
||||
test_file = tmp_path / "test.txt"
|
||||
test_file.write_text("aaa\nbbb\nccc\n")
|
||||
|
||||
result = view_file_fn(path="test.txt", hashline=True, offset=2, **mock_workspace)
|
||||
|
||||
assert result["success"] is True
|
||||
assert result["offset"] == 2
|
||||
lines = result["content"].split("\n")
|
||||
assert lines[0].startswith("2:")
|
||||
assert "|bbb" in lines[0]
|
||||
|
||||
def test_hashline_limit(self, view_file_fn, mock_workspace, mock_secure_path, tmp_path):
|
||||
"""hashline with limit restricts number of lines."""
|
||||
test_file = tmp_path / "test.txt"
|
||||
test_file.write_text("aaa\nbbb\nccc\nddd\n")
|
||||
|
||||
result = view_file_fn(path="test.txt", hashline=True, limit=2, **mock_workspace)
|
||||
|
||||
assert result["success"] is True
|
||||
assert result["limit"] == 2
|
||||
assert result["shown_lines"] == 2
|
||||
assert result["total_lines"] == 4
|
||||
|
||||
def test_hashline_total_and_shown_lines(
|
||||
self, view_file_fn, mock_workspace, mock_secure_path, tmp_path
|
||||
):
|
||||
"""total_lines and shown_lines are reported correctly."""
|
||||
test_file = tmp_path / "test.txt"
|
||||
test_file.write_text("a\nb\nc\nd\ne\n")
|
||||
|
||||
result = view_file_fn(path="test.txt", hashline=True, offset=2, limit=2, **mock_workspace)
|
||||
|
||||
assert result["total_lines"] == 5
|
||||
assert result["shown_lines"] == 2
|
||||
|
||||
def test_default_mode_unchanged(self, view_file_fn, mock_workspace, mock_secure_path, tmp_path):
|
||||
"""Default mode (hashline=False) returns the same format as before."""
|
||||
test_file = tmp_path / "test.txt"
|
||||
test_file.write_text("hello\n")
|
||||
|
||||
result = view_file_fn(path="test.txt", **mock_workspace)
|
||||
|
||||
assert result["success"] is True
|
||||
assert "hashline" not in result
|
||||
assert result["content"] == "hello\n"
|
||||
assert result["lines"] == 1
|
||||
|
||||
def test_hashline_offset_zero_returns_error(
|
||||
self, view_file_fn, mock_workspace, mock_secure_path, tmp_path
|
||||
):
|
||||
"""hashline with offset=0 returns error (must be >= 1)."""
|
||||
test_file = tmp_path / "test.txt"
|
||||
test_file.write_text("aaa\nbbb\n")
|
||||
|
||||
result = view_file_fn(path="test.txt", hashline=True, offset=0, **mock_workspace)
|
||||
|
||||
assert "error" in result
|
||||
assert "offset" in result["error"].lower()
|
||||
|
||||
def test_hashline_negative_offset_returns_error(
|
||||
self, view_file_fn, mock_workspace, mock_secure_path, tmp_path
|
||||
):
|
||||
"""hashline with negative offset returns error."""
|
||||
test_file = tmp_path / "test.txt"
|
||||
test_file.write_text("aaa\nbbb\n")
|
||||
|
||||
result = view_file_fn(path="test.txt", hashline=True, offset=-1, **mock_workspace)
|
||||
|
||||
assert "error" in result
|
||||
assert "offset" in result["error"].lower()
|
||||
|
||||
def test_hashline_negative_limit_returns_error(
|
||||
self, view_file_fn, mock_workspace, mock_secure_path, tmp_path
|
||||
):
|
||||
"""hashline with negative limit returns error."""
|
||||
test_file = tmp_path / "test.txt"
|
||||
test_file.write_text("aaa\nbbb\n")
|
||||
|
||||
result = view_file_fn(path="test.txt", hashline=True, limit=-1, **mock_workspace)
|
||||
|
||||
assert "error" in result
|
||||
assert "limit" in result["error"].lower()
|
||||
|
||||
def test_hashline_truncated_file_returns_error(
|
||||
self, view_file_fn, mock_workspace, mock_secure_path, tmp_path
|
||||
):
|
||||
"""Large file with hashline=True and no offset/limit returns error directing to paginate."""
|
||||
test_file = tmp_path / "large.txt"
|
||||
# Create a file larger than the max_size we'll pass
|
||||
content = "line\n" * 100 # 500 bytes
|
||||
test_file.write_text(content)
|
||||
|
||||
result = view_file_fn(path="large.txt", hashline=True, max_size=50, **mock_workspace)
|
||||
|
||||
assert "error" in result
|
||||
assert "too large" in result["error"].lower()
|
||||
assert "offset" in result["error"].lower()
|
||||
assert "limit" in result["error"].lower()
|
||||
|
||||
def test_hashline_offset_beyond_end_returns_error(
|
||||
self, view_file_fn, mock_workspace, mock_secure_path, tmp_path
|
||||
):
|
||||
"""hashline with offset beyond total lines returns error."""
|
||||
test_file = tmp_path / "test.txt"
|
||||
test_file.write_text("aaa\nbbb\n")
|
||||
|
||||
result = view_file_fn(path="test.txt", hashline=True, offset=50, **mock_workspace)
|
||||
|
||||
assert "error" in result
|
||||
assert "beyond" in result["error"].lower()
|
||||
assert "2 lines" in result["error"]
|
||||
|
||||
def test_hashline_large_file_with_offset_limit_works(
|
||||
self, view_file_fn, mock_workspace, mock_secure_path, tmp_path
|
||||
):
|
||||
"""Large file using offset/limit bypasses full-file size check."""
|
||||
test_file = tmp_path / "large.txt"
|
||||
lines = [f"line {i}" for i in range(1, 101)]
|
||||
test_file.write_text("\n".join(lines) + "\n")
|
||||
|
||||
# File is large (> max_size=200), but offset/limit lets us page through it
|
||||
result = view_file_fn(
|
||||
path="large.txt", hashline=True, offset=10, limit=5, max_size=200, **mock_workspace
|
||||
)
|
||||
|
||||
assert result["success"] is True
|
||||
assert result["shown_lines"] == 5
|
||||
assert result["total_lines"] == 100
|
||||
# First shown line should be line 10
|
||||
first_line = result["content"].split("\n")[0]
|
||||
assert first_line.startswith("10:")
|
||||
assert "|line 10" in first_line
|
||||
|
||||
|
||||
class TestGrepSearchHashlineMode:
|
||||
"""Tests for grep_search hashline mode."""
|
||||
|
||||
@pytest.fixture
|
||||
def grep_search_fn(self, mcp):
|
||||
from aden_tools.tools.file_system_toolkits.grep_search import register_tools
|
||||
|
||||
register_tools(mcp)
|
||||
return mcp._tool_manager._tools["grep_search"].fn
|
||||
|
||||
def test_hashline_anchor_present(
|
||||
self, grep_search_fn, mock_workspace, mock_secure_path, tmp_path
|
||||
):
|
||||
"""hashline=True includes anchor field in matches."""
|
||||
test_file = tmp_path / "test.txt"
|
||||
test_file.write_text("hello world\ngoodbye world\n")
|
||||
|
||||
result = grep_search_fn(path="test.txt", pattern="hello", hashline=True, **mock_workspace)
|
||||
|
||||
assert result["success"] is True
|
||||
assert result["total_matches"] == 1
|
||||
match = result["matches"][0]
|
||||
assert "anchor" in match
|
||||
# Anchor format: N:hhhh (4-char hash)
|
||||
assert match["anchor"].startswith("1:")
|
||||
assert len(match["anchor"]) == 6 # "1:hhhh"
|
||||
|
||||
def test_hashline_anchor_absent_by_default(
|
||||
self, grep_search_fn, mock_workspace, mock_secure_path, tmp_path
|
||||
):
|
||||
"""hashline=False (default) does not include anchor field."""
|
||||
test_file = tmp_path / "test.txt"
|
||||
test_file.write_text("hello world\n")
|
||||
|
||||
result = grep_search_fn(path="test.txt", pattern="hello", **mock_workspace)
|
||||
|
||||
assert result["success"] is True
|
||||
assert result["total_matches"] == 1
|
||||
assert "anchor" not in result["matches"][0]
|
||||
|
||||
def test_grep_hashline_preserves_indentation(
|
||||
self, grep_search_fn, mock_workspace, mock_secure_path, tmp_path
|
||||
):
|
||||
"""hashline=True preserves leading whitespace in line_content."""
|
||||
test_file = tmp_path / "test.txt"
|
||||
test_file.write_text(" hello world\n")
|
||||
|
||||
result = grep_search_fn(path="test.txt", pattern="hello", hashline=True, **mock_workspace)
|
||||
|
||||
assert result["success"] is True
|
||||
assert result["total_matches"] == 1
|
||||
assert result["matches"][0]["line_content"] == " hello world"
|
||||
|
||||
def test_hashline_skips_large_files_with_notice(
|
||||
self, grep_search_fn, mock_workspace, mock_secure_path, tmp_path
|
||||
):
|
||||
"""hashline=True skips files > 10MB and reports them in the response."""
|
||||
search_dir = tmp_path / "search_dir"
|
||||
search_dir.mkdir()
|
||||
|
||||
small_file = search_dir / "small.txt"
|
||||
small_file.write_text("hello world\n")
|
||||
|
||||
large_file = search_dir / "large.txt"
|
||||
# Write just over 10MB
|
||||
large_file.write_bytes(b"hello large\n" * (1024 * 1024))
|
||||
|
||||
result = grep_search_fn(
|
||||
path="search_dir", pattern="hello", hashline=True, recursive=True, **mock_workspace
|
||||
)
|
||||
|
||||
assert result["success"] is True
|
||||
assert "skipped_large_files" in result
|
||||
assert any("large.txt" in f for f in result["skipped_large_files"])
|
||||
# Small file should still have matches
|
||||
assert result["total_matches"] >= 1
|
||||
|
||||
|
||||
class TestHashlineCrossToolConsistency:
|
||||
"""Cross-tool consistency tests for hashline workflows."""
|
||||
|
||||
@pytest.fixture
|
||||
def view_file_fn(self, mcp):
|
||||
from aden_tools.tools.file_system_toolkits.view_file import register_tools
|
||||
|
||||
register_tools(mcp)
|
||||
return mcp._tool_manager._tools["view_file"].fn
|
||||
|
||||
@pytest.fixture
|
||||
def grep_search_fn(self, mcp):
|
||||
from aden_tools.tools.file_system_toolkits.grep_search import register_tools
|
||||
|
||||
register_tools(mcp)
|
||||
return mcp._tool_manager._tools["grep_search"].fn
|
||||
|
||||
@pytest.fixture
|
||||
def hashline_edit_fn(self, mcp):
|
||||
from aden_tools.tools.file_system_toolkits.hashline_edit import register_tools
|
||||
|
||||
register_tools(mcp)
|
||||
return mcp._tool_manager._tools["hashline_edit"].fn
|
||||
|
||||
def test_unicode_line_separator_anchor_roundtrip(
|
||||
self,
|
||||
view_file_fn,
|
||||
grep_search_fn,
|
||||
hashline_edit_fn,
|
||||
mock_workspace,
|
||||
mock_secure_path,
|
||||
tmp_path,
|
||||
):
|
||||
"""Anchors from grep hashline mode should be consumable by hashline_edit."""
|
||||
test_file = tmp_path / "test.txt"
|
||||
test_file.write_text("A\u2028B\nC\n", encoding="utf-8")
|
||||
|
||||
# Hashline view sees U+2028 as a line boundary via splitlines()
|
||||
view_res = view_file_fn(path="test.txt", hashline=True, **mock_workspace)
|
||||
assert view_res["success"] is True
|
||||
assert view_res["total_lines"] == 3
|
||||
|
||||
# grep_search line iteration treats U+2028 as in-line content
|
||||
grep_res = grep_search_fn(path="test.txt", pattern="B", hashline=True, **mock_workspace)
|
||||
assert grep_res["success"] is True
|
||||
assert grep_res["total_matches"] == 1
|
||||
|
||||
anchor = grep_res["matches"][0]["anchor"]
|
||||
edits = json.dumps([{"op": "set_line", "anchor": anchor, "content": "X"}])
|
||||
edit_res = hashline_edit_fn(path="test.txt", edits=edits, **mock_workspace)
|
||||
|
||||
assert "error" not in edit_res, edit_res.get("error")
|
||||
assert edit_res["success"] is True
|
||||
|
||||
@@ -0,0 +1,186 @@
|
||||
"""Unit tests for the hashline utility module."""
|
||||
|
||||
import pytest
|
||||
|
||||
from aden_tools.hashline import (
|
||||
compute_line_hash,
|
||||
format_hashlines,
|
||||
parse_anchor,
|
||||
validate_anchor,
|
||||
)
|
||||
|
||||
|
||||
class TestComputeLineHash:
|
||||
"""Tests for compute_line_hash."""
|
||||
|
||||
def test_basic_output_format(self):
|
||||
"""Hash is a 4-char lowercase hex string."""
|
||||
h = compute_line_hash("hello world")
|
||||
assert len(h) == 4
|
||||
assert all(c in "0123456789abcdef" for c in h)
|
||||
|
||||
def test_space_stripping(self):
|
||||
"""Trailing spaces are stripped before hashing."""
|
||||
assert compute_line_hash("hello ") == compute_line_hash("hello")
|
||||
assert compute_line_hash(" hello") != compute_line_hash("hello")
|
||||
|
||||
def test_tab_stripping(self):
|
||||
"""Trailing tabs are stripped before hashing."""
|
||||
assert compute_line_hash("hello\t") == compute_line_hash("hello")
|
||||
assert compute_line_hash("\thello") != compute_line_hash("hello")
|
||||
|
||||
def test_empty_line(self):
|
||||
"""Empty line produces a valid 4-char hash."""
|
||||
h = compute_line_hash("")
|
||||
assert len(h) == 4
|
||||
assert all(c in "0123456789abcdef" for c in h)
|
||||
|
||||
def test_different_lines_different_hashes(self):
|
||||
"""Different lines produce different hashes (most of the time)."""
|
||||
h1 = compute_line_hash("def foo():")
|
||||
h2 = compute_line_hash("def bar():")
|
||||
# These specific strings should produce different hashes
|
||||
assert h1 != h2
|
||||
|
||||
def test_whitespace_only_equals_empty(self):
|
||||
"""A line of only spaces/tabs hashes the same as empty."""
|
||||
assert compute_line_hash(" \t ") == compute_line_hash("")
|
||||
|
||||
def test_formatter_resilience(self):
|
||||
"""Trailing whitespace-only variants stay stable across formatting noise."""
|
||||
assert compute_line_hash("if x:") == compute_line_hash("if x: ")
|
||||
assert compute_line_hash("return 0") == compute_line_hash("return 0\t\t")
|
||||
|
||||
def test_leading_whitespace_changes_hash(self):
|
||||
"""Leading whitespace changes the hash (indentation is semantic)."""
|
||||
assert compute_line_hash(" x") != compute_line_hash(" x")
|
||||
|
||||
def test_trailing_whitespace_ignored(self):
|
||||
"""Trailing spaces are ignored in hashing."""
|
||||
assert compute_line_hash("x ") == compute_line_hash("x")
|
||||
|
||||
|
||||
class TestFormatHashlines:
|
||||
"""Tests for format_hashlines."""
|
||||
|
||||
def test_basic_format(self):
|
||||
"""Lines are formatted as N:hhhh|content."""
|
||||
lines = ["hello", "world"]
|
||||
result = format_hashlines(lines)
|
||||
output_lines = result.split("\n")
|
||||
assert len(output_lines) == 2
|
||||
# Check format: N:hhhh|content
|
||||
assert output_lines[0].startswith("1:")
|
||||
assert "|hello" in output_lines[0]
|
||||
assert output_lines[1].startswith("2:")
|
||||
assert "|world" in output_lines[1]
|
||||
|
||||
def test_offset(self):
|
||||
"""Offset skips initial lines."""
|
||||
lines = ["a", "b", "c", "d"]
|
||||
result = format_hashlines(lines, offset=3)
|
||||
output_lines = result.split("\n")
|
||||
assert len(output_lines) == 2
|
||||
assert output_lines[0].startswith("3:")
|
||||
assert "|c" in output_lines[0]
|
||||
|
||||
def test_limit(self):
|
||||
"""Limit restricts number of lines returned."""
|
||||
lines = ["a", "b", "c", "d"]
|
||||
result = format_hashlines(lines, limit=2)
|
||||
output_lines = result.split("\n")
|
||||
assert len(output_lines) == 2
|
||||
assert "|a" in output_lines[0]
|
||||
assert "|b" in output_lines[1]
|
||||
|
||||
def test_offset_and_limit(self):
|
||||
"""Offset and limit work together."""
|
||||
lines = ["a", "b", "c", "d", "e"]
|
||||
result = format_hashlines(lines, offset=2, limit=2)
|
||||
output_lines = result.split("\n")
|
||||
assert len(output_lines) == 2
|
||||
assert output_lines[0].startswith("2:")
|
||||
assert "|b" in output_lines[0]
|
||||
assert output_lines[1].startswith("3:")
|
||||
assert "|c" in output_lines[1]
|
||||
|
||||
def test_empty_input(self):
|
||||
"""Empty input produces empty output."""
|
||||
result = format_hashlines([])
|
||||
assert result == ""
|
||||
|
||||
|
||||
class TestParseAnchor:
|
||||
"""Tests for parse_anchor."""
|
||||
|
||||
def test_valid_anchor(self):
|
||||
"""Valid anchor is parsed correctly."""
|
||||
line_num, hash_str = parse_anchor("5:a3b1")
|
||||
assert line_num == 5
|
||||
assert hash_str == "a3b1"
|
||||
|
||||
def test_valid_anchor_with_zeros(self):
|
||||
"""Anchor with zero-padded hash works."""
|
||||
line_num, hash_str = parse_anchor("1:0000")
|
||||
assert line_num == 1
|
||||
assert hash_str == "0000"
|
||||
|
||||
def test_no_colon(self):
|
||||
"""Missing colon raises ValueError."""
|
||||
with pytest.raises(ValueError, match="no colon"):
|
||||
parse_anchor("5a3")
|
||||
|
||||
@pytest.mark.parametrize("bad_anchor", ["5:abc", "5:a", "5:abcd1234"])
|
||||
def test_wrong_hash_length(self, bad_anchor):
|
||||
"""Hash with wrong length raises ValueError."""
|
||||
with pytest.raises(ValueError, match="4 chars"):
|
||||
parse_anchor(bad_anchor)
|
||||
|
||||
def test_uppercase_hash(self):
|
||||
"""Uppercase hex raises ValueError."""
|
||||
with pytest.raises(ValueError, match="lowercase hex"):
|
||||
parse_anchor("5:A3B1")
|
||||
|
||||
def test_non_hex_hash(self):
|
||||
"""Non-hex chars in hash raises ValueError."""
|
||||
with pytest.raises(ValueError, match="lowercase hex"):
|
||||
parse_anchor("5:zzxx")
|
||||
|
||||
def test_non_integer_line(self):
|
||||
"""Non-integer line number raises ValueError."""
|
||||
with pytest.raises(ValueError, match="not an integer"):
|
||||
parse_anchor("abc:a3b1")
|
||||
|
||||
|
||||
class TestValidateAnchor:
|
||||
"""Tests for validate_anchor."""
|
||||
|
||||
def test_valid_match(self):
|
||||
"""Valid anchor returns None."""
|
||||
lines = ["hello", "world"]
|
||||
h = compute_line_hash("hello")
|
||||
assert validate_anchor(f"1:{h}", lines) is None
|
||||
|
||||
def test_hash_mismatch(self):
|
||||
"""Mismatched hash returns error with re-read hint and current content."""
|
||||
lines = ["hello", "world"]
|
||||
err = validate_anchor("1:ffff", lines)
|
||||
assert err is not None
|
||||
assert "mismatch" in err.lower()
|
||||
assert "re-read" in err.lower()
|
||||
assert "hello" in err
|
||||
|
||||
@pytest.mark.parametrize("anchor", ["5:abcd", "0:0000"])
|
||||
def test_out_of_range(self, anchor):
|
||||
"""Line number beyond file length or zero returns error."""
|
||||
lines = ["hello"]
|
||||
err = validate_anchor(anchor, lines)
|
||||
assert err is not None
|
||||
assert "out of range" in err.lower()
|
||||
|
||||
def test_invalid_format(self):
|
||||
"""Invalid anchor format returns error."""
|
||||
lines = ["hello"]
|
||||
err = validate_anchor("bad", lines)
|
||||
assert err is not None
|
||||
assert "no colon" in err.lower()
|
||||
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,114 @@
|
||||
"""Tests for run_command PYTHONPATH handling (Windows compatibility).
|
||||
|
||||
On Windows, PYTHONPATH must use semicolon (;) as separator, not colon (:).
|
||||
These tests verify the correct behavior. They are Windows-only because
|
||||
the bug only manifests on Windows.
|
||||
"""
|
||||
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
import pytest
|
||||
|
||||
# Skip entire module on non-Windows (tests will pass when fixes are applied)
|
||||
pytestmark = pytest.mark.skipif(
|
||||
sys.platform != "win32",
|
||||
reason="Windows-only: PYTHONPATH separator behavior",
|
||||
)
|
||||
|
||||
|
||||
def _build_pythonpath_buggy(project_root: str) -> str:
|
||||
"""Replicate current (buggy) PYTHONPATH construction in run_command."""
|
||||
return f"{project_root}/core:{project_root}/exports:{project_root}/core/framework/agents"
|
||||
|
||||
|
||||
def _build_pythonpath_fixed(project_root: str) -> str:
|
||||
"""Correct PYTHONPATH construction using os.pathsep."""
|
||||
return os.pathsep.join(
|
||||
[
|
||||
os.path.join(project_root, "core"),
|
||||
os.path.join(project_root, "exports"),
|
||||
os.path.join(project_root, "core", "framework", "agents"),
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
class TestPythonpathSeparatorWindows:
|
||||
"""Verify PYTHONPATH uses correct separator on Windows."""
|
||||
|
||||
def test_pythonpath_with_semicolons_parses_multiple_paths(self, tmp_path):
|
||||
"""PYTHONPATH built with os.pathsep allows Python to find modules in multiple dirs."""
|
||||
# Create two dirs, each with a module
|
||||
core_dir = tmp_path / "core"
|
||||
core_dir.mkdir()
|
||||
(core_dir / "mod_a.py").write_text("x = 1\n")
|
||||
|
||||
exports_dir = tmp_path / "exports"
|
||||
exports_dir.mkdir()
|
||||
(exports_dir / "mod_b.py").write_text("y = 2\n")
|
||||
|
||||
pythonpath = os.pathsep.join([str(core_dir), str(exports_dir)])
|
||||
env = {**os.environ, "PYTHONPATH": pythonpath}
|
||||
|
||||
# Python should find both when we add them to path
|
||||
result = subprocess.run(
|
||||
[
|
||||
sys.executable,
|
||||
"-c",
|
||||
"import sys; "
|
||||
"sys.path = [p for p in sys.path if 'mod_a' not in p and 'mod_b' not in p]; "
|
||||
"import mod_a; import mod_b; print('ok')",
|
||||
],
|
||||
env=env,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
cwd=str(tmp_path),
|
||||
timeout=10,
|
||||
)
|
||||
|
||||
assert result.returncode == 0, f"Stdout: {result.stdout} Stderr: {result.stderr}"
|
||||
assert "ok" in result.stdout
|
||||
|
||||
def test_pythonpath_with_colons_fails_on_windows(self, tmp_path):
|
||||
"""PYTHONPATH built with colons (Unix style) fails on Windows - single path parsed."""
|
||||
core_dir = tmp_path / "core"
|
||||
core_dir.mkdir()
|
||||
(core_dir / "mod_c.py").write_text("z = 3\n")
|
||||
|
||||
exports_dir = tmp_path / "exports"
|
||||
exports_dir.mkdir()
|
||||
(exports_dir / "mod_d.py").write_text("w = 4\n")
|
||||
|
||||
# Buggy: colon-separated (Unix style)
|
||||
pythonpath = f"{tmp_path}/core:{tmp_path}/exports"
|
||||
env = {**os.environ, "PYTHONPATH": pythonpath}
|
||||
|
||||
# On Windows, Python splits by ; only. The colon string is one invalid path.
|
||||
result = subprocess.run(
|
||||
[
|
||||
sys.executable,
|
||||
"-c",
|
||||
"import sys; "
|
||||
"pp = [p for p in sys.path if 'core' in p or 'exports' in p]; "
|
||||
"import mod_c; import mod_d; print('ok')",
|
||||
],
|
||||
env=env,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
cwd=str(tmp_path),
|
||||
timeout=10,
|
||||
)
|
||||
|
||||
# Should fail: Python won't parse multiple paths from colon-separated string
|
||||
assert result.returncode != 0 or "ok" not in result.stdout
|
||||
|
||||
def test_fixed_pythonpath_construction_uses_pathsep(self, tmp_path):
|
||||
"""The fix pattern (os.pathsep.join) produces valid multi-path PYTHONPATH."""
|
||||
project_root = str(tmp_path)
|
||||
fixed = _build_pythonpath_fixed(project_root)
|
||||
|
||||
# On Windows, os.pathsep is ';'
|
||||
assert os.pathsep in fixed, "Fixed PYTHONPATH must use os.pathsep on Windows"
|
||||
# Three paths => two separators
|
||||
assert fixed.count(os.pathsep) == 2
|
||||
Reference in New Issue
Block a user