Compare commits

...

37 Commits

Author SHA1 Message Date
Timothy 41388efc31 fix: Windows compat — guard os.fchmod and remove deleted LLM_CREDENTIALS import
os.fchmod does not exist on Windows; guard with hasattr check.
Remove LLM_CREDENTIALS reference from test (module deleted in e1db3a4).
2026-03-04 17:22:21 -08:00
Timothy @aden fab5ce6fd0 Merge pull request #5824 from aden-hive/chore/fix-tool-tests
chore(micro-fix): fix test
2026-03-04 17:16:10 -08:00
Timothy 207d6baee5 chore: fix test 2026-03-04 16:49:39 -08:00
Timothy @aden fec72bb2b6 Merge pull request #5294 from Antiarin/feat/hashline-edit-tool
[Integration]feat: add hashline anchor-based file editing tool
2026-03-04 16:38:13 -08:00
Timothy c4c4c24c59 Merge branch 'main' into feat/hashline-edit-tool 2026-03-04 16:23:07 -08:00
Timothy @aden 2005ba2dca Merge pull request #5823 from aden-hive/micro-fix/lint
chore(micro-fix): lint
2026-03-04 16:11:51 -08:00
Timothy 557d5fd6e5 chore: lint 2026-03-04 16:10:35 -08:00
Timothy @aden 79d2a15f95 Merge pull request #5814 from fermano/feature/windows-filesysten
Feature/windows filesystem
2026-03-04 16:07:33 -08:00
Timothy ab32e44128 style: ruff format fixes
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-04 16:00:13 -08:00
Bryan @ Aden 047059f85f Merge pull request #5774 from kostasuser01gr/docs/4780-roadmap-updates
docs: update roadmap to reflect completed features (refs #4780)
2026-03-04 23:59:56 +00:00
Timothy e8364f616d Merge remote-tracking branch 'origin/main' into feature/windows-filesysten 2026-03-04 15:59:49 -08:00
Bryan @ Aden 9098c9b6c6 Merge pull request #5785 from code-Miracle49/fix/remove-duplicate-execute-subagent
micro-fix: remove duplicate _execute_subagent method in EventLoopNode
2026-03-04 23:55:36 +00:00
Timothy 84fd9ebac8 style: fix E501 line-too-long lint errors
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-04 15:53:45 -08:00
Timothy @aden 23d5d76d56 Merge pull request #5822 from aden-hive/fix/anthropic-vendor-issue
fix: remove hardcoded Anthropic dependencies from core framework
2026-03-04 15:46:16 -08:00
Timothy b0c86588b6 chore: lint 2026-03-04 15:44:11 -08:00
Timothy 5aff1f9489 chore: lint 2026-03-04 15:43:59 -08:00
Timothy Zhang 199cb3d8cc fix: stdin conflicts 2026-03-04 14:45:02 -08:00
Fernando Mano a98a4ca0b6 feature(WindowsFilesystemSupport): #5677 - Windows File System Support and Testing -- fixing lint issues 2026-03-04 21:22:11 -03:00
Fernando Mano c4f49aadfa feature(WindowsFilesystemSupport): #5677 - Windows File System Support and Testing -- fixing lint issues 2026-03-04 21:20:33 -03:00
Fernando Mano ca5ac389cf feature(WindowsFilesystemSupport): #5677 - Windows File System Support and Testing -- fixing lint issues 2026-03-04 21:15:08 -03:00
Fernando Mano 7a658f7953 feature(WindowsFilesystemSupport): #5677 - Windows File System Support and Testing -- fixing lint issues 2026-03-04 21:05:49 -03:00
Timothy e05fc99da7 Merge branch 'main' into fix/anthropic-vendor-issue 2026-03-04 14:43:27 -08:00
Bryan @ Aden 787090667e Merge pull request #5816 from aden-hive/fix/pause-stop-worker
(micro-fix): update pause in pipeline uses stop_worker like queen
2026-03-04 21:50:23 +00:00
Antiarin 80b36b4052 fix: CRLF double-conversion in hashline edit and add large file skip reporting
- Replace joined.replace("\n", "\r\n") with re.sub(r"(?<!\r)\n", "\r\n", joined to prevent \r\n in replace op new_content from becoming \r\r\n (fixed in both hashline_edit.py and file_ops.py)
 - Track and report skipped large files in grep_search instead of silently skipping them
 - Extract HASHLINE_MAX_FILE_BYTES constant to hashline.py as single source of truth, imported by view_file, grep_search, hashline_edit, and file_ops
 - Add tests for CRLF replace op (both copies) and large file skip reporting
2026-03-05 03:12:35 +05:30
Timothy 1c58ccb0c1 chore: lint 2026-03-04 12:45:27 -08:00
Timothy 79b80fe817 feat: coder tools to also support hashline editing 2026-03-04 12:41:07 -08:00
Antiarin c0f3841af7 feat: add file size check in grep_search to skip large files and switch case in hashline edit
- Implemented a check to skip files larger than 10MB in the grep_search function to optimize memory usage.
2026-03-04 12:41:07 -08:00
Antiarin 2b7d9bc471 feat:Updating the docs 2026-03-04 12:41:07 -08:00
Antiarin 98dc493a39 feat: Add cross-tool hashing anchor, grep search, and all viewfile 2026-03-04 12:41:07 -08:00
Antiarin cfaa57b28d feat:Add hashing tool 2026-03-04 12:40:25 -08:00
Fernando Mano 704a0fd63a feature(WindowsFilesystemSupport): #5677 - Windows File System Support and Testing -- remove testing codeand prepare for PR 2026-03-04 15:09:06 -03:00
Fernando Mano bf4101ac38 feature(WindowsFilesystemSupport): #5677 - Windows File System Support and Testing -- remove testing codeand prepare for PR 2026-03-04 15:03:02 -03:00
Fernando Mano bc349e8fde feature(WindowsFilesystemSupport): #5677 - Windows File System Support and Testing -- remove testing codeand prepare for PR 2026-03-04 14:41:42 -03:00
Fernando Mano 8470c6a980 feature(WindowsFilesystemSupport): #5677 - Windows File System Support and Testing 2026-03-04 14:16:48 -03:00
code-Miracle49 c313ea7ee2 micro-fix: remove duplicate _execute_subagent method in EventLoopNode 2026-03-04 12:54:43 +01:00
kostasuser01gr 99d41d8cc6 docs: update roadmap to reflect completed features (refs #4780) 2026-03-04 08:37:14 +02:00
Timothy e1db3a4af9 fix: remove hardcoded anthropic logics 2026-02-27 10:23:59 -08:00
44 changed files with 4667 additions and 836 deletions
+5 -2
View File
@@ -62,8 +62,11 @@ jobs:
uv run pytest tests/ -v
test-tools:
name: Test Tools
runs-on: ubuntu-latest
name: Test Tools (${{ matrix.os }})
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ubuntu-latest, windows-latest]
steps:
- uses: actions/checkout@v4
+13 -1
View File
@@ -20,8 +20,20 @@ check: ## Run all checks without modifying files (CI-safe)
cd core && ruff format --check .
cd tools && ruff format --check .
test: ## Run all tests
test: ## Run all tests (core + tools, excludes live)
cd core && uv run python -m pytest tests/ -v
cd tools && uv run python -m pytest -v
test-tools: ## Run tool tests only (mocked, no credentials needed)
cd tools && uv run python -m pytest -v
test-live: ## Run live integration tests (requires real API credentials)
cd tools && uv run python -m pytest -m live -s -o "addopts=" --log-cli-level=INFO
test-all: ## Run everything including live tests
cd core && uv run python -m pytest tests/ -v
cd tools && uv run python -m pytest -v
cd tools && uv run python -m pytest -m live -s -o "addopts=" --log-cli-level=INFO
install-hooks: ## Install pre-commit hooks
uv pip install pre-commit
+1
View File
@@ -82,6 +82,7 @@ Use Hive when you need:
- Python 3.11+ for agent development
- An LLM provider that powers the agents
- **ripgrep (optional, recommended on Windows):** The `search_files` tool uses ripgrep for faster file search. If not installed, a Python fallback is used. On Windows: `winget install BurntSushi.ripgrep` or `scoop install ripgrep`
> **Note for Windows Users:** It is strongly recommended to use **WSL (Windows Subsystem for Linux)** or **Git Bash** to run this framework. Some core automation scripts may not execute correctly in standard Command Prompt or PowerShell.
@@ -46,6 +46,7 @@ _SHARED_TOOLS = [
"read_file",
"write_file",
"edit_file",
"hashline_edit",
"list_directory",
"search_files",
"run_command",
@@ -131,12 +132,23 @@ errors yourself. Don't declare success until validation passes.
# Tools
## Paths (MANDATORY)
**Always use RELATIVE paths**
(e.g. `exports/agent_name/config.py`, `exports/agent_name/nodes/__init__.py`).
**Never use absolute paths** like `/mnt/data/...` or `/workspace/...` they fail.
The project root is implicit.
## File I/O
- read_file(path, offset?, limit?) read with line numbers
- read_file(path, offset?, limit?, hashline?) read with line numbers; \
hashline=True for N:hhhh|content anchors (use with hashline_edit)
- write_file(path, content) create/overwrite, auto-mkdir
- edit_file(path, old_text, new_text, replace_all?) fuzzy-match edit
- hashline_edit(path, edits, auto_cleanup?, encoding?) anchor-based \
editing using N:hhhh refs from read_file(hashline=True). Ops: set_line, \
replace_lines, insert_after, insert_before, replace, append
- list_directory(path, recursive?) list contents
- search_files(pattern, path?, include?) regex search
- search_files(pattern, path?, include?, hashline?) regex search; \
hashline=True for anchors in results
- run_command(command, cwd?, timeout?) shell execution
- undo_changes(path?) restore from git snapshot
+1 -2
View File
@@ -431,8 +431,7 @@ class GraphSpec(BaseModel):
max_tokens: int = Field(default=None) # resolved by _resolve_max_tokens validator
# Cleanup LLM for JSON extraction fallback (fast/cheap model preferred)
# If not set, uses CEREBRAS_API_KEY -> cerebras/llama-3.3-70b or
# ANTHROPIC_API_KEY -> claude-haiku-4-5 as fallback
# If not set, uses CEREBRAS_API_KEY -> cerebras/llama-3.3-70b
cleanup_llm_model: str | None = None
# Execution limits
-326
View File
@@ -4058,329 +4058,3 @@ class EventLoopNode(NodeProtocol):
content=json.dumps(result_json, indent=2),
is_error=True,
)
# -------------------------------------------------------------------
# Subagent Execution
# -------------------------------------------------------------------
async def _execute_subagent(
self,
ctx: NodeContext,
agent_id: str,
task: str,
*,
accumulator: OutputAccumulator | None = None,
) -> ToolResult:
"""Execute a subagent and return the result as a ToolResult.
The subagent:
- Gets a fresh conversation with just the task
- Has read-only access to the parent's readable memory
- Cannot delegate to its own subagents (prevents recursion)
- Returns its output in structured JSON format
Args:
ctx: Parent node's context (for memory, tools, LLM access).
agent_id: The node ID of the subagent to invoke.
task: The task description to give the subagent.
accumulator: Parent's OutputAccumulator — provides outputs that
have been set via ``set_output`` but not yet written to
shared memory (which only happens after the node completes).
Returns:
ToolResult with structured JSON output containing:
- message: Human-readable summary
- data: Subagent's output (free-form JSON)
- metadata: Execution metadata (success, tokens, latency)
"""
from framework.graph.node import NodeContext, SharedMemory
# Log subagent invocation start
logger.info(
"\n" + "=" * 60 + "\n"
"🤖 SUBAGENT INVOCATION\n"
"=" * 60 + "\n"
"Parent Node: %s\n"
"Subagent ID: %s\n"
"Task: %s\n" + "=" * 60,
ctx.node_id,
agent_id,
task[:500] + "..." if len(task) > 500 else task,
)
# 1. Validate agent exists in registry
if agent_id not in ctx.node_registry:
return ToolResult(
tool_use_id="",
content=json.dumps(
{
"message": f"Sub-agent '{agent_id}' not found in registry",
"data": None,
"metadata": {"agent_id": agent_id, "success": False, "error": "not_found"},
}
),
is_error=True,
)
subagent_spec = ctx.node_registry[agent_id]
# 2. Create read-only memory snapshot
# Start with everything the parent can read from shared memory.
parent_data = ctx.memory.read_all()
# Merge in-flight outputs from the parent's accumulator.
# set_output() writes to the accumulator but shared memory is only
# updated after the parent node completes — so the subagent would
# otherwise miss any keys the parent set before delegating.
if accumulator:
for key, value in accumulator.to_dict().items():
if key not in parent_data:
parent_data[key] = value
subagent_memory = SharedMemory()
for key, value in parent_data.items():
subagent_memory.write(key, value, validate=False)
# Allow reads for parent data AND the subagent's declared input_keys
# (input_keys may reference keys that exist but weren't in read_all,
# or keys that were just written by the accumulator).
read_keys = set(parent_data.keys()) | set(subagent_spec.input_keys or [])
scoped_memory = subagent_memory.with_permissions(
read_keys=list(read_keys),
write_keys=[], # Read-only!
)
# 2b. Set up report callback (one-way channel to parent / event bus)
subagent_reports: list[dict] = []
async def _report_callback(
message: str,
data: dict | None = None,
*,
wait_for_response: bool = False,
) -> str | None:
subagent_reports.append({"message": message, "data": data, "timestamp": time.time()})
if self._event_bus:
await self._event_bus.emit_subagent_report(
stream_id=ctx.node_id,
node_id=f"{ctx.node_id}:subagent:{agent_id}",
subagent_id=agent_id,
message=message,
data=data,
execution_id=ctx.execution_id,
)
if not wait_for_response:
return None
if not self._event_bus:
logger.warning(
"Subagent '%s' requested user response but no event_bus available",
agent_id,
)
return None
# Create isolated receiver and register for input routing
import uuid
escalation_id = f"{ctx.node_id}:escalation:{uuid.uuid4().hex[:8]}"
receiver = _EscalationReceiver()
registry = ctx.shared_node_registry
registry[escalation_id] = receiver
try:
# Stream message to user (parent's node_id so TUI shows parent talking)
await self._event_bus.emit_client_output_delta(
stream_id=ctx.node_id,
node_id=ctx.node_id,
content=message,
snapshot=message,
execution_id=ctx.execution_id,
)
# Request input (escalation_id for routing response back)
await self._event_bus.emit_client_input_requested(
stream_id=ctx.node_id,
node_id=escalation_id,
prompt=message,
execution_id=ctx.execution_id,
)
# Block until user responds
return await receiver.wait()
finally:
registry.pop(escalation_id, None)
# 3. Filter tools for subagent
# Use the full tool catalog (ctx.all_tools) so subagents can access tools
# that aren't in the parent node's filtered set (e.g. browser tools for a
# GCU subagent when the parent only has web_scrape/save_data).
# Falls back to ctx.available_tools if all_tools is empty (e.g. in tests).
subagent_tool_names = set(subagent_spec.tools or [])
tool_source = ctx.all_tools if ctx.all_tools else ctx.available_tools
subagent_tools = [
t
for t in tool_source
if t.name in subagent_tool_names and t.name != "delegate_to_sub_agent"
]
missing = subagent_tool_names - {t.name for t in subagent_tools}
if missing:
logger.warning(
"Subagent '%s' requested tools not found in catalog: %s",
agent_id,
sorted(missing),
)
logger.info(
"📦 Subagent '%s' configuration:\n"
" - System prompt: %s\n"
" - Tools available (%d): %s\n"
" - Memory keys inherited: %s",
agent_id,
(subagent_spec.system_prompt[:200] + "...")
if subagent_spec.system_prompt and len(subagent_spec.system_prompt) > 200
else subagent_spec.system_prompt,
len(subagent_tools),
[t.name for t in subagent_tools],
list(parent_data.keys()),
)
# 4. Build subagent context
max_iter = min(self._config.max_iterations, 10)
subagent_ctx = NodeContext(
runtime=ctx.runtime,
node_id=f"{ctx.node_id}:subagent:{agent_id}",
node_spec=subagent_spec,
memory=scoped_memory,
input_data={"task": task, **parent_data},
llm=ctx.llm,
available_tools=subagent_tools,
goal_context=(
f"Your specific task: {task}\n\n"
f"COMPLETION REQUIREMENTS:\n"
f"When your task is done, you MUST call set_output() "
f"for each required key: {subagent_spec.output_keys}\n"
f"Alternatively, call report_to_parent(mark_complete=true) "
f"with your findings in message/data.\n"
f"You have a maximum of {max_iter} turns to complete this task."
),
goal=ctx.goal,
max_tokens=ctx.max_tokens,
runtime_logger=ctx.runtime_logger,
is_subagent_mode=True, # Prevents nested delegation
report_callback=_report_callback,
node_registry={}, # Empty - no nested subagents
shared_node_registry=ctx.shared_node_registry, # For escalation routing
)
# 5. Create and execute subagent EventLoopNode
# Derive a conversation store for the subagent from the parent's store.
# Each invocation gets a unique path so that repeated delegate calls
# (e.g. one per profile) don't restore a stale completed conversation.
self._subagent_instance_counter.setdefault(agent_id, 0)
self._subagent_instance_counter[agent_id] += 1
subagent_instance = str(self._subagent_instance_counter[agent_id])
subagent_conv_store = None
if self._conversation_store is not None:
from framework.storage.conversation_store import FileConversationStore
parent_base = getattr(self._conversation_store, "_base", None)
if parent_base is not None:
# Store subagent conversations parallel to the parent node,
# not nested inside it. e.g. conversations/{node}:subagent:{agent_id}:{instance}/
conversations_dir = parent_base.parent # e.g. conversations/
subagent_dir_name = f"{agent_id}-{subagent_instance}"
subagent_store_path = conversations_dir / subagent_dir_name
subagent_conv_store = FileConversationStore(base_path=subagent_store_path)
# Derive a subagent-scoped spillover dir so large tool results
# (e.g. browser_snapshot) get written to disk instead of being
# silently truncated. Each instance gets its own directory to
# avoid file collisions between concurrent subagents.
subagent_spillover = None
if self._config.spillover_dir:
subagent_spillover = str(
Path(self._config.spillover_dir) / agent_id / subagent_instance
)
subagent_node = EventLoopNode(
event_bus=None, # Subagents don't emit events to parent's bus
judge=SubagentJudge(task=task, max_iterations=max_iter),
config=LoopConfig(
max_iterations=max_iter, # Tighter budget
max_tool_calls_per_turn=self._config.max_tool_calls_per_turn,
tool_call_overflow_margin=self._config.tool_call_overflow_margin,
max_history_tokens=self._config.max_history_tokens,
stall_detection_threshold=self._config.stall_detection_threshold,
max_tool_result_chars=self._config.max_tool_result_chars,
spillover_dir=subagent_spillover,
),
tool_executor=self._tool_executor,
conversation_store=subagent_conv_store,
)
try:
logger.info("🚀 Starting subagent '%s' execution...", agent_id)
start_time = time.time()
result = await subagent_node.execute(subagent_ctx)
latency_ms = int((time.time() - start_time) * 1000)
logger.info(
"\n" + "-" * 60 + "\n"
"✅ SUBAGENT '%s' COMPLETED\n"
"-" * 60 + "\n"
"Success: %s\n"
"Latency: %dms\n"
"Tokens used: %s\n"
"Output keys: %s\n" + "-" * 60,
agent_id,
result.success,
latency_ms,
result.tokens_used,
list(result.output.keys()) if result.output else [],
)
result_json = {
"message": (
f"Sub-agent '{agent_id}' completed successfully"
if result.success
else f"Sub-agent '{agent_id}' failed: {result.error}"
),
"data": result.output,
"reports": subagent_reports if subagent_reports else None,
"metadata": {
"agent_id": agent_id,
"success": result.success,
"tokens_used": result.tokens_used,
"latency_ms": latency_ms,
"report_count": len(subagent_reports),
},
}
return ToolResult(
tool_use_id="",
content=json.dumps(result_json, indent=2, default=str),
is_error=not result.success,
)
except Exception as e:
logger.exception(
"\n" + "!" * 60 + "\n❌ SUBAGENT '%s' FAILED\nError: %s\n" + "!" * 60,
agent_id,
str(e),
)
result_json = {
"message": f"Sub-agent '{agent_id}' raised exception: {e}",
"data": None,
"metadata": {
"agent_id": agent_id,
"success": False,
"error": str(e),
},
}
return ToolResult(
tool_use_id="",
content=json.dumps(result_json, indent=2),
is_error=True,
)
+3 -2
View File
@@ -183,11 +183,12 @@ class GraphExecutor:
self.tool_provider_map = tool_provider_map
self.dynamic_tools_provider = dynamic_tools_provider
# Initialize output cleaner
# Initialize output cleaner — uses its own dedicated fast model (CEREBRAS_API_KEY),
# never the main agent LLM. Passing the main LLM here would cause expensive
# Anthropic calls for output cleaning whenever ANTHROPIC_API_KEY is set.
self.cleansing_config = cleansing_config or CleansingConfig()
self.output_cleaner = OutputCleaner(
config=self.cleansing_config,
llm_provider=llm,
)
# Parallel execution settings
+4 -56
View File
@@ -154,69 +154,17 @@ class HITLProtocol:
"""
Parse human's raw input into structured response.
Uses Haiku to intelligently extract answers for each question.
Maps the raw input to the first question. For multi-question HITL,
the caller should present one question at a time.
"""
import os
response = HITLResponse(request_id=request.request_id, raw_input=raw_input)
# If no questions, just return raw input
if not request.questions:
return response
# Try to use Haiku for intelligent parsing
api_key = os.environ.get("ANTHROPIC_API_KEY")
if not use_haiku or not api_key:
# Simple fallback: treat as answer to first question
if request.questions:
response.answers[request.questions[0].id] = raw_input
return response
# Use Haiku to extract answers
try:
import json
import anthropic
questions_str = "\n".join(
[f"{i + 1}. {q.question} (id: {q.id})" for i, q in enumerate(request.questions)]
)
prompt = f"""Parse the user's response and extract answers for each question.
Questions asked:
{questions_str}
User's response:
{raw_input}
Extract the answer for each question. Output JSON with question IDs as keys.
Example format:
{{"question-1": "answer here", "question-2": "answer here"}}"""
client = anthropic.Anthropic(api_key=api_key)
message = client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=500,
messages=[{"role": "user", "content": prompt}],
)
# Parse Haiku's response
import re
response_text = message.content[0].text.strip()
json_match = re.search(r"\{[^{}]*\}", response_text, re.DOTALL)
if json_match:
parsed = json.loads(json_match.group())
response.answers = parsed
except Exception:
# Fallback: use raw input for first question
if request.questions:
response.answers[request.questions[0].id] = raw_input
# Map raw input to first question
response.answers[request.questions[0].id] = raw_input
return response
@staticmethod
+7 -54
View File
@@ -585,7 +585,6 @@ class NodeResult:
Generate a human-readable summary of this node's execution and output.
This is like toString() - it describes what the node produced in its current state.
Uses Haiku to intelligently summarize complex outputs.
"""
if not self.success:
return f"❌ Failed: {self.error}"
@@ -593,59 +592,13 @@ class NodeResult:
if not self.output:
return "✓ Completed (no output)"
# Use Haiku to generate intelligent summary
import os
api_key = os.environ.get("ANTHROPIC_API_KEY")
if not api_key:
# Fallback: simple key-value listing
parts = [f"✓ Completed with {len(self.output)} outputs:"]
for key, value in list(self.output.items())[:5]: # Limit to 5 keys
value_str = str(value)[:100]
if len(str(value)) > 100:
value_str += "..."
parts.append(f"{key}: {value_str}")
return "\n".join(parts)
# Use Haiku to generate intelligent summary
try:
import json
import anthropic
node_context = ""
if node_spec:
node_context = f"\nNode: {node_spec.name}\nPurpose: {node_spec.description}"
output_json = json.dumps(self.output, indent=2, default=str)[:2000]
prompt = (
f"Generate a 1-2 sentence human-readable summary of "
f"what this node produced.{node_context}\n\n"
f"Node output:\n{output_json}\n\n"
"Provide a concise, clear summary that a human can quickly "
"understand. Focus on the key information produced."
)
client = anthropic.Anthropic(api_key=api_key)
message = client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=200,
messages=[{"role": "user", "content": prompt}],
)
summary = message.content[0].text.strip()
return f"{summary}"
except Exception:
# Fallback on error
parts = [f"✓ Completed with {len(self.output)} outputs:"]
for key, value in list(self.output.items())[:3]:
value_str = str(value)[:80]
if len(str(value)) > 80:
value_str += "..."
parts.append(f"{key}: {value_str}")
return "\n".join(parts)
parts = [f"✓ Completed with {len(self.output)} outputs:"]
for key, value in list(self.output.items())[:5]: # Limit to 5 keys
value_str = str(value)[:100]
if len(str(value)) > 100:
value_str += "..."
parts.append(f"{key}: {value_str}")
return "\n".join(parts)
class NodeProtocol(ABC):
@@ -2899,6 +2899,7 @@ def run_tests(
text=True,
timeout=600, # 10 minute timeout
env=env,
stdin=subprocess.DEVNULL,
)
except subprocess.TimeoutExpired:
return json.dumps(
@@ -3091,6 +3092,7 @@ def debug_test(
text=True,
timeout=120, # 2 minute timeout for single test
env=env,
stdin=subprocess.DEVNULL,
)
except subprocess.TimeoutExpired:
return json.dumps(
+8 -51
View File
@@ -1090,62 +1090,19 @@ def _interactive_approval(request):
def _format_natural_language_to_json(
user_input: str, input_keys: list[str], agent_description: str, session_context: dict = None
) -> dict:
"""Use Haiku to convert natural language input to JSON based on agent's input schema."""
import os
"""Convert natural language input to JSON based on agent's input schema.
import anthropic
Maps user input to the primary input field. For follow-up inputs,
appends to the existing value.
"""
main_field = input_keys[0] if input_keys else "objective"
client = anthropic.Anthropic(api_key=os.environ.get("ANTHROPIC_API_KEY"))
# Build prompt for Haiku
session_info = ""
if session_context:
# Extract the main field (usually 'objective') that we'll append to
main_field = input_keys[0] if input_keys else "objective"
existing_value = session_context.get(main_field, "")
if existing_value:
return {main_field: f"{existing_value}\n\n{user_input}"}
session_info = (
f'\n\nExisting {main_field}: "{existing_value}"\n\n'
f"The user is providing ADDITIONAL information. Append this new "
f"information to the existing {main_field} to create an enriched, "
"more detailed version."
)
prompt = f"""You are formatting user input for an agent that requires specific input fields.
Agent: {agent_description}
Required input fields: {", ".join(input_keys)}{session_info}
User input: {user_input}
{"If this is a follow-up, APPEND new info to the existing field value." if session_context else ""}
Output ONLY valid JSON, no explanation:"""
try:
message = client.messages.create(
model="claude-haiku-4-5-20251001", # Fast and cheap
max_tokens=500,
messages=[{"role": "user", "content": prompt}],
)
json_str = message.content[0].text.strip()
# Remove markdown code blocks if present
if json_str.startswith("```"):
json_str = json_str.split("```")[1]
if json_str.startswith("json"):
json_str = json_str[4:]
json_str = json_str.strip()
return json.loads(json_str)
except Exception:
# Fallback: try to infer the main field
if len(input_keys) == 1:
return {input_keys[0]: user_input}
else:
# Put it in the first field as fallback
return {input_keys[0]: user_input}
return {main_field: user_input}
def cmd_shell(args: argparse.Namespace) -> int:
+27 -8
View File
@@ -7,6 +7,8 @@ Supports both STDIO and HTTP transports using the official MCP Python SDK.
import asyncio
import logging
import os
import sys
import threading
from dataclasses import dataclass, field
from typing import Any, Literal
@@ -73,6 +75,8 @@ class MCPClient:
# Background event loop for persistent STDIO connection
self._loop = None
self._loop_thread = None
# Serialize STDIO tool calls (avoids races, helps on Windows)
self._stdio_call_lock = threading.Lock()
def _run_async(self, coro):
"""
@@ -156,11 +160,19 @@ class MCPClient:
# Create server parameters
# Always inherit parent environment and merge with any custom env vars
merged_env = {**os.environ, **(self.config.env or {})}
# On Windows, passing cwd can cause WinError 267 ("invalid directory name").
# tool_registry passes cwd=None and uses absolute script paths when applicable.
cwd = self.config.cwd
if os.name == "nt" and cwd is not None:
# Avoid passing cwd on Windows; tool_registry should have set cwd=None
# and absolute script paths for tools-dir servers. If cwd is still set,
# pass None to prevent WinError 267 (caller should use absolute paths).
cwd = None
server_params = StdioServerParameters(
command=self.config.command,
args=self.config.args,
env=merged_env,
cwd=self.config.cwd,
cwd=cwd,
)
# Store for later use
@@ -184,10 +196,12 @@ class MCPClient:
from mcp.client.stdio import stdio_client
# Create persistent stdio client context.
# Redirect server stderr to devnull to prevent raw
# output from leaking behind the TUI.
devnull = open(os.devnull, "w") # noqa: SIM115
self._stdio_context = stdio_client(server_params, errlog=devnull)
# On Windows, use stderr so subprocess startup errors are visible.
if os.name == "nt":
errlog = sys.stderr
else:
errlog = open(os.devnull, "w") # noqa: SIM115
self._stdio_context = stdio_client(server_params, errlog=errlog)
(
self._read_stream,
self._write_stream,
@@ -353,7 +367,8 @@ class MCPClient:
raise ValueError(f"Unknown tool: {tool_name}")
if self.config.transport == "stdio":
return self._run_async(self._call_tool_stdio_async(tool_name, arguments))
with self._stdio_call_lock:
return self._run_async(self._call_tool_stdio_async(tool_name, arguments))
else:
return self._call_tool_http(tool_name, arguments)
@@ -448,11 +463,15 @@ class MCPClient:
if self._stdio_context:
await self._stdio_context.__aexit__(None, None, None)
except asyncio.CancelledError:
logger.warning(
logger.debug(
"STDIO context cleanup was cancelled; proceeding with best-effort shutdown"
)
except Exception as e:
logger.warning(f"Error closing STDIO context: {e}")
msg = str(e).lower()
if "cancel scope" in msg or "different task" in msg:
logger.debug("STDIO context teardown (known anyio quirk): %s", e)
else:
logger.warning(f"Error closing STDIO context: {e}")
finally:
self._stdio_context = None
+103 -3
View File
@@ -326,6 +326,103 @@ class ToolRegistry:
"""Restore execution context to its previous state."""
_execution_context.reset(token)
@staticmethod
def resolve_mcp_stdio_config(server_config: dict[str, Any], base_dir: Path) -> dict[str, Any]:
"""Resolve cwd and script paths for MCP stdio config (Windows compatibility).
Use this when building MCPServerConfig from a config file (e.g. in
list_agent_tools, discover_mcp_tools) so hive-tools and other servers
work on Windows. Call with base_dir = directory containing the config.
"""
registry = ToolRegistry()
return registry._resolve_mcp_server_config(server_config, base_dir)
def _resolve_mcp_server_config(
self, server_config: dict[str, Any], base_dir: Path
) -> dict[str, Any]:
"""Resolve cwd and script paths for MCP stdio servers (Windows compatibility).
On Windows, passing cwd to subprocess can cause WinError 267. We use cwd=None
and absolute script paths when the server runs a .py script from the tools dir.
If the resolved cwd doesn't exist (e.g. config from ~/.hive/agents/), fall back
to Path.cwd() / "tools".
"""
config = dict(server_config)
if config.get("transport") != "stdio":
return config
cwd = config.get("cwd")
args = list(config.get("args", []))
if not cwd and not args:
return config
# Resolve cwd relative to base_dir
resolved_cwd: Path | None = None
if cwd:
if Path(cwd).is_absolute():
resolved_cwd = Path(cwd)
else:
resolved_cwd = (base_dir / cwd).resolve()
# Find .py script in args (e.g. coder_tools_server.py, files_server.py)
script_name = None
for i, arg in enumerate(args):
if isinstance(arg, str) and arg.endswith(".py"):
script_name = arg
script_idx = i
break
if resolved_cwd is None:
return config
# If resolved cwd doesn't exist or (when we have a script) doesn't contain it,
# try fallback
tools_fallback = Path.cwd() / "tools"
need_fallback = not resolved_cwd.is_dir()
if script_name and not need_fallback:
need_fallback = not (resolved_cwd / script_name).exists()
if need_fallback:
fallback_ok = tools_fallback.is_dir()
if script_name:
fallback_ok = fallback_ok and (tools_fallback / script_name).exists()
else:
# No script (e.g. GCU); just need tools dir to exist
pass
if fallback_ok:
resolved_cwd = tools_fallback
logger.debug(
"MCP server '%s': using fallback tools dir %s",
config.get("name", "?"),
resolved_cwd,
)
else:
config["cwd"] = str(resolved_cwd)
return config
if not script_name:
# No .py script (e.g. GCU uses -m gcu.server); just set cwd
config["cwd"] = str(resolved_cwd)
return config
# For coder_tools_server, inject --project-root so writes go to the expected workspace
if script_name and "coder_tools" in script_name:
project_root = str(resolved_cwd.parent.resolve())
args = list(args)
if "--project-root" not in args:
args.extend(["--project-root", project_root])
config["args"] = args
if os.name == "nt":
# Windows: cwd=None avoids WinError 267; use absolute script path
config["cwd"] = None
abs_script = str((resolved_cwd / script_name).resolve())
args = list(config["args"])
args[script_idx] = abs_script
config["args"] = args
else:
config["cwd"] = str(resolved_cwd)
return config
def load_mcp_config(self, config_path: Path) -> None:
"""
Load and register MCP servers from a config file.
@@ -357,9 +454,7 @@ class ToolRegistry:
server_list = [{"name": name, **cfg} for name, cfg in config.items()]
for server_config in server_list:
cwd = server_config.get("cwd")
if cwd and not Path(cwd).is_absolute():
server_config["cwd"] = str((base_dir / cwd).resolve())
server_config = self._resolve_mcp_server_config(server_config, base_dir)
try:
self.register_mcp_server(server_config)
except Exception as e:
@@ -480,6 +575,11 @@ class ToolRegistry:
except Exception as e:
logger.error(f"Failed to register MCP server: {e}")
if "Connection closed" in str(e) and os.name == "nt":
logger.debug(
"On Windows, check that the MCP subprocess starts (e.g. uv in PATH, "
"script path correct). Worker config uses base_dir = mcp_servers.json parent."
)
return 0
def _convert_mcp_tool_to_framework_tool(self, mcp_tool: Any) -> Tool:
+5 -5
View File
@@ -145,7 +145,7 @@ Implement the core execution engine where every Agent operates as an isolated, a
- [x] SharedState manager (runtime/shared_state.py)
- [x] Session-based storage (storage/session_store.py)
- [x] Isolation levels: ISOLATED, SHARED, SYNCHRONIZED
- [ ] **Default Monitoring Hooks**
- [x] **Default Monitoring Hooks**
- [ ] Performance metrics collection
- [ ] Resource usage tracking
- [ ] Health check endpoints
@@ -590,7 +590,7 @@ Write the Quick Start guide, detailed tool usage documentation, and set up the M
- [x] README with examples
- [x] Contributing guidelines
- [x] GitHub Page setup
- [ ] **Tool Usage Documentation**
- [x] **Tool Usage Documentation**
- [ ] Comprehensive tool documentation
- [ ] Tool integration examples
- [ ] Best practices guide
@@ -643,7 +643,7 @@ Expose basic REST/WebSocket endpoints for external control (Start, Stop, Pause,
- [x] Load/unload/start/restart in AgentRuntime
- [x] State persistence
- [x] Recovery mechanisms
- [ ] **REST API Endpoints**
- [x] **REST API Endpoints**
- [ ] Start endpoint for agent execution
- [ ] Stop endpoint for graceful shutdown
- [ ] Pause endpoint for execution suspension
@@ -661,7 +661,7 @@ Implement automated test execution, agent version control, and mandatory test-pa
- [x] Test framework with pytest integration (testing/)
- [x] Test result reporting
- [x] Test CLI commands (test-run, test-debug, etc.)
- [ ] **Automated Testing Pipeline**
- [x] **Automated Testing Pipeline**
- [ ] CI integration (GitHub Actions, etc.)
- [ ] Mandatory test-passing gates
- [ ] Coverage reporting
@@ -873,7 +873,7 @@ Build native frontend configurations to easily connect Open Hive's backend to lo
- [ ] Node.js runtime support
- [ ] Browser runtime support
- [ ] **Platform Compatibility**
- [ ] Windows support improvements
- [x] Windows support improvements
- [ ] macOS optimization
- [ ] Linux distribution support
+2 -1
View File
@@ -78,5 +78,6 @@ if (-not $env:HIVE_CREDENTIAL_KEY) {
}
# ── Run the Hive CLI ────────────────────────────────────────────────
# PYTHONUTF8=1: use UTF-8 for default encoding (fixes charmap decode errors on Windows)
$env:PYTHONUTF8 = "1"
& uv run hive @args
+15 -6
View File
@@ -130,8 +130,8 @@ function Test-DefenderExclusions {
# Normalize and filter null/empty values
$safePrefixes = $safePrefixes | Where-Object { $_ } | ForEach-Object {
[System.IO.Path]::GetFullPath($_)
}
try { [System.IO.Path]::GetFullPath($_) } catch { $null }
} | Where-Object { $_ }
try {
# Check if Defender cmdlets are available (may not exist on older Windows)
@@ -157,15 +157,20 @@ function Test-DefenderExclusions {
$existing = $prefs.ExclusionPath
if (-not $existing) { $existing = @() }
# Normalize existing paths for comparison
# Normalize existing paths for comparison (some may contain wildcards
# or env vars that GetFullPath rejects — skip those gracefully)
$existing = $existing | Where-Object { $_ } | ForEach-Object {
[System.IO.Path]::GetFullPath($_)
try { [System.IO.Path]::GetFullPath($_) } catch { $_ }
}
# Normalize paths and find missing exclusions
$missing = @()
foreach ($path in $Paths) {
$normalized = [System.IO.Path]::GetFullPath($path)
try {
$normalized = [System.IO.Path]::GetFullPath($path)
} catch {
continue # Skip paths with unsupported format
}
# Security: Ensure path is within safe boundaries
$isSafe = $false
@@ -250,7 +255,11 @@ function Add-DefenderExclusions {
foreach ($path in $Paths) {
try {
$normalized = [System.IO.Path]::GetFullPath($path)
try {
$normalized = [System.IO.Path]::GetFullPath($path)
} catch {
$normalized = $path # Use raw path if normalization fails
}
Add-MpPreference -ExclusionPath $normalized -ErrorAction Stop
$added += $normalized
} catch {
+12 -8
View File
@@ -72,6 +72,7 @@ python mcp_server.py
| `apply_diff` | Apply diff patches to files |
| `apply_patch` | Apply unified patches to files |
| `grep_search` | Search file contents with regex |
| `hashline_edit` | Anchor-based file editing with hash-validated line references |
| `execute_command_tool` | Execute shell commands |
| `save_data` / `load_data` | Persist and retrieve structured data across steps |
| `serve_file_to_user` | Serve a file for the user to download |
@@ -175,14 +176,17 @@ tools/
│ └── tools/ # Tool implementations
│ ├── example_tool/
│ ├── file_system_toolkits/ # File operation tools
│ │ ├── view_file.py
│ │ ├── write_to_file.py
│ │ ├── list_dir.py
│ │ ├── replace_file_content.py
│ │ ├── apply_diff.py
│ │ ├── apply_patch.py
│ │ ├── grep_search.py
│ │ ── execute_command_tool.py
│ │ ├── security.py
│ │ ├── hashline.py
│ │ ├── view_file/
│ │ ├── write_to_file/
│ │ ├── list_dir/
│ │ ├── replace_file_content/
│ │ ├── apply_diff/
│ │ ── apply_patch/
│ │ ├── grep_search/
│ │ ├── hashline_edit/
│ │ └── execute_command_tool/
│ ├── web_search_tool/
│ ├── web_scrape_tool/
│ ├── pdf_read_tool/
+124 -37
View File
@@ -71,8 +71,49 @@ def _find_project_root() -> str:
def _resolve_path(path: str) -> str:
"""Resolve path relative to PROJECT_ROOT. Raises ValueError if outside."""
# Normalize slashes for cross-platform (e.g. exports/hi_agent from LLM)
path = path.replace("/", os.sep)
if os.path.isabs(path):
resolved = os.path.abspath(path)
try:
common = os.path.commonpath([resolved, PROJECT_ROOT])
except ValueError:
common = ""
if common != PROJECT_ROOT:
# LLM may emit wrong-root paths (/mnt/data, /workspace, etc.).
# Strip known prefixes and treat the remainder as relative to PROJECT_ROOT.
path_norm = path.replace("\\", "/")
for prefix in (
"/mnt/data/",
"/mnt/data",
"/workspace/",
"/workspace",
"/repo/",
"/repo",
):
p = prefix.rstrip("/") + "/"
prefix_stripped = prefix.rstrip("/")
if path_norm.startswith(p) or (
path_norm.startswith(prefix_stripped) and len(path_norm) > len(prefix)
):
suffix = path_norm[len(prefix_stripped) :].lstrip("/")
if suffix:
path = suffix.replace("/", os.sep)
resolved = os.path.abspath(os.path.join(PROJECT_ROOT, path))
break
else:
# Try extracting exports/ or core/ subpath from the absolute path
parts = path.split(os.sep)
if "exports" in parts:
idx = parts.index("exports")
path = os.sep.join(parts[idx:])
resolved = os.path.abspath(os.path.join(PROJECT_ROOT, path))
elif "core" in parts:
idx = parts.index("core")
path = os.sep.join(parts[idx:])
resolved = os.path.abspath(os.path.join(PROJECT_ROOT, path))
else:
raise ValueError(f"Access denied: '{path}' is outside the project root.")
else:
resolved = os.path.abspath(os.path.join(PROJECT_ROOT, path))
try:
@@ -91,11 +132,7 @@ def _snapshot_git(*args: str) -> str:
"""Run a git command with the snapshot GIT_DIR and PROJECT_ROOT worktree."""
cmd = ["git", "--git-dir", SNAPSHOT_DIR, "--work-tree", PROJECT_ROOT, *args]
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=30,
encoding="utf-8",
cmd, capture_output=True, text=True, timeout=30, encoding="utf-8", stdin=subprocess.DEVNULL
)
return result.stdout.strip()
@@ -110,6 +147,7 @@ def _ensure_snapshot_repo():
["git", "init", "--bare", SNAPSHOT_DIR],
capture_output=True,
timeout=10,
stdin=subprocess.DEVNULL,
encoding="utf-8",
)
_snapshot_git("config", "core.autocrlf", "false")
@@ -132,6 +170,37 @@ def _take_snapshot() -> str:
MAX_COMMAND_OUTPUT = 30_000 # chars before truncation
def _translate_command_for_windows(command: str) -> str:
"""Translate common Unix commands to Windows equivalents."""
if os.name != "nt":
return command
cmd = command.strip()
# mkdir -p: Unix creates parents; Windows mkdir already does; -p becomes a dir name
if cmd.startswith("mkdir -p ") or cmd.startswith("mkdir -p\t"):
rest = cmd[9:].lstrip().replace("/", os.sep)
return "mkdir " + rest
# ls / pwd: cmd.exe uses dir and cd
# Order matters: replace longer patterns first
for unix, win in [
("ls -la", "dir /a"),
("ls -al", "dir /a"),
("ls -l", "dir"),
("ls -a", "dir /a"),
("ls ", "dir "),
("pwd", "cd"),
]:
cmd = cmd.replace(unix, win)
# Standalone "ls" at end (e.g. "cd x && ls")
if cmd.endswith(" ls"):
cmd = cmd[:-3] + " dir"
elif cmd == "ls":
cmd = "dir"
return cmd
@mcp.tool()
def run_command(command: str, cwd: str = "", timeout: int = 120) -> str:
"""Execute a shell command in the project context.
@@ -151,6 +220,7 @@ def run_command(command: str, cwd: str = "", timeout: int = 120) -> str:
work_dir = _resolve_path(cwd) if cwd else PROJECT_ROOT
try:
command = _translate_command_for_windows(command)
start = time.monotonic()
result = subprocess.run(
command,
@@ -159,12 +229,16 @@ def run_command(command: str, cwd: str = "", timeout: int = 120) -> str:
capture_output=True,
text=True,
timeout=timeout,
stdin=subprocess.DEVNULL,
encoding="utf-8",
env={
**os.environ,
"PYTHONPATH": (
f"{PROJECT_ROOT}/core:{PROJECT_ROOT}/exports"
f":{PROJECT_ROOT}/core/framework/agents"
"PYTHONPATH": os.pathsep.join(
[
os.path.join(PROJECT_ROOT, "core"),
os.path.join(PROJECT_ROOT, "exports"),
os.path.join(PROJECT_ROOT, "core", "framework", "agents"),
]
),
},
)
@@ -236,6 +310,7 @@ def undo_changes(path: str = "") -> str:
capture_output=True,
text=True,
timeout=10,
stdin=subprocess.DEVNULL,
encoding="utf-8",
)
return f"Restored: {path}"
@@ -309,28 +384,31 @@ def list_agent_tools(
return json.dumps({"error": f"Failed to read config: {e}"})
try:
from pathlib import Path
from framework.runner.mcp_client import MCPClient, MCPServerConfig
from framework.runner.tool_registry import ToolRegistry
except ImportError:
return json.dumps({"error": "Cannot import MCPClient"})
all_tools: list[dict] = []
errors = []
config_dir = os.path.dirname(config_path)
config_dir = Path(config_path).parent
for server_name, server_conf in servers_config.items():
cwd = server_conf.get("cwd", "")
if cwd and not os.path.isabs(cwd):
cwd = os.path.abspath(os.path.join(config_dir, cwd))
resolved = ToolRegistry.resolve_mcp_stdio_config(
{"name": server_name, **server_conf}, config_dir
)
try:
config = MCPServerConfig(
name=server_name,
transport=server_conf.get("transport", "stdio"),
command=server_conf.get("command"),
args=server_conf.get("args", []),
env=server_conf.get("env", {}),
cwd=cwd or None,
url=server_conf.get("url"),
headers=server_conf.get("headers", {}),
transport=resolved.get("transport", "stdio"),
command=resolved.get("command"),
args=resolved.get("args", []),
env=resolved.get("env", {}),
cwd=resolved.get("cwd"),
url=resolved.get("url"),
headers=resolved.get("headers", {}),
)
client = MCPClient(config)
client.connect()
@@ -419,19 +497,24 @@ def validate_agent_tools(agent_path: str) -> str:
if not os.path.isdir(resolved):
return json.dumps({"error": f"Agent directory not found: {agent_path}"})
agent_dir = resolved # Keep path; 'resolved' is reused for MCP config in loop
# --- Discover available tools from agent's MCP servers ---
mcp_config_path = os.path.join(resolved, "mcp_servers.json")
mcp_config_path = os.path.join(agent_dir, "mcp_servers.json")
if not os.path.isfile(mcp_config_path):
return json.dumps({"error": f"No mcp_servers.json found in {agent_path}"})
try:
from pathlib import Path
from framework.runner.mcp_client import MCPClient, MCPServerConfig
from framework.runner.tool_registry import ToolRegistry
except ImportError:
return json.dumps({"error": "Cannot import MCPClient"})
available_tools: set[str] = set()
discovery_errors = []
config_dir = os.path.dirname(mcp_config_path)
config_dir = Path(mcp_config_path).parent
try:
with open(mcp_config_path, encoding="utf-8") as f:
@@ -440,19 +523,19 @@ def validate_agent_tools(agent_path: str) -> str:
return json.dumps({"error": f"Failed to read mcp_servers.json: {e}"})
for server_name, server_conf in servers_config.items():
cwd = server_conf.get("cwd", "")
if cwd and not os.path.isabs(cwd):
cwd = os.path.abspath(os.path.join(config_dir, cwd))
resolved = ToolRegistry.resolve_mcp_stdio_config(
{"name": server_name, **server_conf}, config_dir
)
try:
config = MCPServerConfig(
name=server_name,
transport=server_conf.get("transport", "stdio"),
command=server_conf.get("command"),
args=server_conf.get("args", []),
env=server_conf.get("env", {}),
cwd=cwd or None,
url=server_conf.get("url"),
headers=server_conf.get("headers", {}),
transport=resolved.get("transport", "stdio"),
command=resolved.get("command"),
args=resolved.get("args", []),
env=resolved.get("env", {}),
cwd=resolved.get("cwd"),
url=resolved.get("url"),
headers=resolved.get("headers", {}),
)
client = MCPClient(config)
client.connect()
@@ -463,7 +546,7 @@ def validate_agent_tools(agent_path: str) -> str:
discovery_errors.append({"server": server_name, "error": str(e)})
# --- Load agent nodes and extract declared tools ---
agent_py = os.path.join(resolved, "agent.py")
agent_py = os.path.join(agent_dir, "agent.py")
if not os.path.isfile(agent_py):
return json.dumps({"error": f"No agent.py found in {agent_path}"})
@@ -471,8 +554,8 @@ def validate_agent_tools(agent_path: str) -> str:
import importlib.util
import sys
package_name = os.path.basename(resolved)
parent_dir = os.path.dirname(os.path.abspath(resolved))
package_name = os.path.basename(agent_dir)
parent_dir = os.path.dirname(os.path.abspath(agent_dir))
if parent_dir not in sys.path:
sys.path.insert(0, parent_dir)
@@ -1015,13 +1098,16 @@ def run_agent_tests(
cmd.append("-x")
cmd.append("--tb=short")
# Set PYTHONPATH
# Set PYTHONPATH (use pathsep for Windows)
env = os.environ.copy()
pythonpath = env.get("PYTHONPATH", "")
core_path = os.path.join(PROJECT_ROOT, "core")
exports_path = os.path.join(PROJECT_ROOT, "exports")
fw_agents_path = os.path.join(PROJECT_ROOT, "core", "framework", "agents")
env["PYTHONPATH"] = f"{core_path}:{exports_path}:{fw_agents_path}:{PROJECT_ROOT}:{pythonpath}"
path_parts = [core_path, exports_path, fw_agents_path, PROJECT_ROOT]
if pythonpath:
path_parts.append(pythonpath)
env["PYTHONPATH"] = os.pathsep.join(path_parts)
try:
result = subprocess.run(
@@ -1030,6 +1116,7 @@ def run_agent_tests(
text=True,
timeout=120,
env=env,
stdin=subprocess.DEVNULL,
encoding="utf-8",
)
except subprocess.TimeoutExpired:
@@ -1154,7 +1241,7 @@ def main() -> None:
register_file_tools(
mcp,
resolve_path=_resolve_path,
before_write=_take_snapshot,
before_write=None, # Git snapshot causes stdio deadlock on Windows; undo_changes limited
project_root=PROJECT_ROOT,
)
+4
View File
@@ -114,6 +114,10 @@ lint.isort.section-order = [
[tool.pytest.ini_options]
testpaths = ["tests"]
asyncio_mode = "auto"
addopts = "-m 'not live'"
markers = [
"live: Tests that call real external APIs (require credentials, never run in CI)",
]
[dependency-groups]
dev = [
@@ -33,7 +33,6 @@ Usage:
})
Credential categories:
- llm.py: LLM provider credentials (anthropic, openai, etc.)
- search.py: Search tool credentials (brave_search, google_search, etc.)
- email.py: Email provider credentials (resend, google/gmail)
- apollo.py: Apollo.io API credentials
@@ -97,7 +96,6 @@ from .jira import JIRA_CREDENTIALS
from .kafka import KAFKA_CREDENTIALS
from .langfuse import LANGFUSE_CREDENTIALS
from .linear import LINEAR_CREDENTIALS
from .llm import LLM_CREDENTIALS
from .lusha import LUSHA_CREDENTIALS
from .microsoft_graph import MICROSOFT_GRAPH_CREDENTIALS
from .mongodb import MONGODB_CREDENTIALS
@@ -148,7 +146,6 @@ from .zoom import ZOOM_CREDENTIALS
# Merged registry of all credentials
CREDENTIAL_SPECS = {
**AIRTABLE_CREDENTIALS,
**LLM_CREDENTIALS,
**NEWS_CREDENTIALS,
**SEARCH_CREDENTIALS,
**EMAIL_CREDENTIALS,
@@ -248,7 +245,6 @@ __all__ = [
"CREDENTIAL_SPECS",
# Category registries (for direct access if needed)
"AIRTABLE_CREDENTIALS",
"LLM_CREDENTIALS",
"NEWS_CREDENTIALS",
"SEARCH_CREDENTIALS",
"EMAIL_CREDENTIALS",
@@ -690,83 +690,6 @@ class CalendlyHealthChecker:
)
class AnthropicHealthChecker:
"""Health checker for Anthropic API credentials."""
ENDPOINT = "https://api.anthropic.com/v1/messages"
TIMEOUT = 10.0
def check(self, api_key: str) -> HealthCheckResult:
"""
Validate Anthropic API key without consuming tokens.
Sends a deliberately invalid request (empty messages) to the messages endpoint.
A 401 means invalid key; 400 (bad request) means the key authenticated
but the payload was rejected confirming the key is valid without
generating any tokens. 429 (rate limited) also indicates a valid key.
"""
try:
with httpx.Client(timeout=self.TIMEOUT) as client:
response = client.post(
self.ENDPOINT,
headers={
"x-api-key": api_key,
"anthropic-version": "2023-06-01",
"Content-Type": "application/json",
},
# Empty messages triggers 400 (not 200), so no tokens are consumed.
json={
"model": "claude-sonnet-4-20250514",
"max_tokens": 1,
"messages": [],
},
)
if response.status_code == 200:
return HealthCheckResult(
valid=True,
message="Anthropic API key valid",
)
elif response.status_code == 401:
return HealthCheckResult(
valid=False,
message="Anthropic API key is invalid",
details={"status_code": 401},
)
elif response.status_code == 429:
# Rate limited but key is valid
return HealthCheckResult(
valid=True,
message="Anthropic API key valid (rate limited)",
details={"status_code": 429, "rate_limited": True},
)
elif response.status_code == 400:
# Bad request but key authenticated - key is valid
return HealthCheckResult(
valid=True,
message="Anthropic API key valid",
details={"status_code": 400},
)
else:
return HealthCheckResult(
valid=False,
message=f"Anthropic API returned status {response.status_code}",
details={"status_code": response.status_code},
)
except httpx.TimeoutException:
return HealthCheckResult(
valid=False,
message="Anthropic API request timed out",
details={"error": "timeout"},
)
except httpx.RequestError as e:
return HealthCheckResult(
valid=False,
message=f"Failed to connect to Anthropic API: {e}",
details={"error": str(e)},
)
class GitHubHealthChecker:
"""Health checker for GitHub Personal Access Token."""
@@ -1376,7 +1299,6 @@ class YouTubeHealthChecker(BaseHttpHealthChecker):
# Registry of health checkers
HEALTH_CHECKERS: dict[str, CredentialHealthChecker] = {
"anthropic": AnthropicHealthChecker(),
"apify": ApifyHealthChecker(),
"apollo": ApolloHealthChecker(),
"asana": AsanaHealthChecker(),
-44
View File
@@ -1,44 +0,0 @@
"""
LLM provider credentials.
Contains credentials for language model providers like Anthropic, OpenAI, etc.
"""
from .base import CredentialSpec
LLM_CREDENTIALS = {
"anthropic": CredentialSpec(
env_var="ANTHROPIC_API_KEY",
tools=[],
node_types=["event_loop"],
required=False, # Not required - agents can use other providers via LiteLLM
startup_required=False, # MCP server doesn't need LLM credentials
help_url="https://console.anthropic.com/settings/keys",
description="API key for Anthropic Claude models",
# Auth method support
direct_api_key_supported=True,
api_key_instructions="""To get an Anthropic API key:
1. Go to https://console.anthropic.com/settings/keys
2. Sign in or create an Anthropic account
3. Click "Create Key"
4. Give your key a descriptive name (e.g., "Hive Agent")
5. Copy the API key (starts with sk-ant-)
6. Store it securely - you won't be able to see the full key again!""",
# Health check configuration
health_check_endpoint="https://api.anthropic.com/v1/messages",
health_check_method="POST",
# Credential store mapping
credential_id="anthropic",
credential_key="api_key",
),
# Future LLM providers:
# "openai": CredentialSpec(
# env_var="OPENAI_API_KEY",
# tools=[],
# node_types=["openai_generate"],
# required=False,
# startup_required=False,
# help_url="https://platform.openai.com/api-keys",
# description="API key for OpenAI models",
# ),
}
+471 -23
View File
@@ -1,9 +1,10 @@
"""
Shared file operation tools for MCP servers.
Provides 6 tools (read_file, write_file, edit_file, list_directory, search_files,
run_command) plus supporting helpers. Used by both files_server.py (unsandboxed)
and coder_tools_server.py (project-root sandboxed with git snapshots).
Provides 7 tools (read_file, write_file, edit_file, hashline_edit,
list_directory, search_files, run_command) plus supporting helpers.
Used by both files_server.py (unsandboxed) and coder_tools_server.py
(project-root sandboxed with git snapshots).
Usage:
from aden_tools.file_ops import register_file_tools
@@ -15,16 +16,31 @@ Usage:
from __future__ import annotations
import contextlib
import difflib
import fnmatch
import json
import os
import re
import subprocess
import tempfile
from collections.abc import Callable
from pathlib import Path
from fastmcp import FastMCP
from aden_tools.hashline import (
HASHLINE_MAX_FILE_BYTES,
compute_line_hash,
format_hashlines,
maybe_strip,
parse_anchor,
strip_boundary_echo,
strip_content_prefixes,
strip_insert_echo,
validate_anchor,
)
# ── Constants ─────────────────────────────────────────────────────────────
MAX_READ_LINES = 2000
@@ -245,16 +261,21 @@ def register_file_tools(
_resolve = resolve_path or _default_resolve_path
@mcp.tool()
def read_file(path: str, offset: int = 1, limit: int = 0) -> str:
def read_file(path: str, offset: int = 1, limit: int = 0, hashline: bool = False) -> str:
"""Read file contents with line numbers and byte-budget truncation.
Binary files are detected and rejected. Large files are automatically
truncated at 2000 lines or 50KB. Use offset and limit to paginate.
Set hashline=True to get N:hhhh|content format with content-hash
anchors for use with hashline_edit. Line truncation is disabled in
hashline mode to preserve hash integrity.
Args:
path: Absolute file path to read.
offset: Starting line number, 1-indexed (default: 1).
limit: Max lines to return, 0 = up to 2000 (default: 0).
hashline: If True, return N:hhhh|content anchors (default: False).
"""
resolved = _resolve(path)
@@ -276,8 +297,10 @@ def register_file_tools(
try:
with open(resolved, encoding="utf-8", errors="replace") as f:
all_lines = f.readlines()
content = f.read()
# Use splitlines() for consistent line splitting with hashline module
all_lines = content.splitlines()
total_lines = len(all_lines)
start_idx = max(0, offset - 1)
effective_limit = limit if limit > 0 else MAX_READ_LINES
@@ -287,10 +310,15 @@ def register_file_tools(
byte_count = 0
truncated_by_bytes = False
for i in range(start_idx, end_idx):
line = all_lines[i].rstrip("\n\r")
if len(line) > MAX_LINE_LENGTH:
line = line[:MAX_LINE_LENGTH] + "..."
formatted = f"{i + 1:>6}\t{line}"
line = all_lines[i]
if hashline:
# No line truncation in hashline mode (would corrupt hashes)
h = compute_line_hash(line)
formatted = f"{i + 1}:{h}|{line}"
else:
if len(line) > MAX_LINE_LENGTH:
line = line[:MAX_LINE_LENGTH] + "..."
formatted = f"{i + 1:>6}\t{line}"
line_bytes = len(formatted.encode("utf-8")) + 1
if byte_count + line_bytes > MAX_OUTPUT_BYTES:
truncated_by_bytes = True
@@ -323,19 +351,31 @@ def register_file_tools(
content: Complete file content to write.
"""
resolved = _resolve(path)
resolved_path = Path(resolved)
try:
# Create parent dirs first (before git snapshot) so structure exists
resolved_path.parent.mkdir(parents=True, exist_ok=True)
if before_write:
before_write()
try:
before_write()
except Exception:
# Don't block the write if git snapshot fails. Do NOT log here —
# logging writes to stderr and can deadlock the MCP stdio pipe.
pass
existed = os.path.isfile(resolved)
os.makedirs(os.path.dirname(resolved), exist_ok=True)
with open(resolved, "w", encoding="utf-8") as f:
f.write(content)
existed = resolved_path.is_file()
content_str = content if content is not None else ""
with open(resolved_path, "w", encoding="utf-8") as f:
f.write(content_str)
f.flush()
os.fsync(f.fileno())
line_count = content.count("\n") + (1 if content and not content.endswith("\n") else 0)
line_count = content_str.count("\n") + (
1 if content_str and not content_str.endswith("\n") else 0
)
action = "Updated" if existed else "Created"
return f"{action} {path} ({len(content):,} bytes, {line_count} lines)"
return f"{action} {path} ({len(content_str):,} bytes, {line_count} lines)"
except Exception as e:
return f"Error writing file: {e}"
@@ -472,15 +512,19 @@ def register_file_tools(
return f"Error listing directory: {e}"
@mcp.tool()
def search_files(pattern: str, path: str = ".", include: str = "") -> str:
def search_files(
pattern: str, path: str = ".", include: str = "", hashline: bool = False
) -> str:
"""Search file contents using regex. Uses ripgrep if available.
Results sorted by file with line numbers.
Results sorted by file with line numbers. Set hashline=True to include
content-hash anchors (N:hhhh) for use with hashline_edit.
Args:
pattern: Regex pattern to search for.
path: Absolute directory path to search (default: current directory).
include: File glob filter (e.g. '*.py').
hashline: If True, include hash anchors in results (default: False).
"""
resolved = _resolve(path)
if not os.path.isdir(resolved):
@@ -507,6 +551,7 @@ def register_file_tools(
text=True,
timeout=30,
encoding="utf-8",
stdin=subprocess.DEVNULL,
)
if rg_result.returncode <= 1:
output = rg_result.stdout.strip()
@@ -517,6 +562,30 @@ def register_file_tools(
for line in output.split("\n")[:SEARCH_RESULT_LIMIT]:
if project_root:
line = line.replace(project_root + "/", "")
if hashline:
# Parse file:linenum:content and insert hash anchor
parts = line.split(":", 2)
if len(parts) >= 3:
content = parts[2]
h = compute_line_hash(content)
line = f"{parts[0]}:{parts[1]}:{h}|{content}"
else:
# Platform-agnostic relativization: ripgrep may output
# forward or backslash paths; normalize before relpath (Windows).
match = re.match(r"^(.+):(\d+):", line)
if match:
path_part, line_num, rest = (
match.group(1),
match.group(2),
line[match.end() :],
)
path_part = os.path.normpath(path_part.replace("/", os.sep))
proj_norm = os.path.normpath(project_root.replace("/", os.sep))
try:
rel = os.path.relpath(path_part, proj_norm)
line = f"{rel}:{line_num}:{rest}"
except ValueError:
pass
if len(line) > MAX_LINE_LENGTH:
line = line[:MAX_LINE_LENGTH] + "..."
lines.append(line)
@@ -544,14 +613,26 @@ def register_file_tools(
if include and not fnmatch.fnmatch(fname, include):
continue
fpath = os.path.join(root, fname)
display_path = os.path.relpath(fpath, project_root) if project_root else fpath
if project_root:
proj_norm = os.path.normpath(project_root.replace("/", os.sep))
try:
display_path = os.path.relpath(fpath, proj_norm)
except ValueError:
display_path = fpath
else:
display_path = fpath
try:
with open(fpath, encoding="utf-8", errors="ignore") as f:
for i, line in enumerate(f, 1):
if compiled.search(line):
matches.append(
f"{display_path}:{i}:{line.rstrip()[:MAX_LINE_LENGTH]}"
)
stripped = line.rstrip()
if compiled.search(stripped):
if hashline:
h = compute_line_hash(stripped)
matches.append(f"{display_path}:{i}:{h}|{stripped}")
else:
matches.append(
f"{display_path}:{i}:{stripped[:MAX_LINE_LENGTH]}"
)
if len(matches) >= SEARCH_RESULT_LIMIT:
return "\n".join(matches) + "\n... (truncated)"
except (OSError, UnicodeDecodeError):
@@ -560,3 +641,370 @@ def register_file_tools(
return "\n".join(matches) if matches else "No matches found."
except re.error as e:
return f"Error: Invalid regex: {e}"
@mcp.tool()
def hashline_edit(
path: str,
edits: str,
auto_cleanup: bool = True,
encoding: str = "utf-8",
) -> str:
"""Edit a file using anchor-based line references (N:hash) for precise edits.
After reading a file with read_file(hashline=True), use the anchors to make
targeted edits without reproducing exact file content.
Anchors must match current file content (hash validation). All edits in a
batch are validated before any are applied (atomic). Overlapping line ranges
within a single call are rejected.
Args:
path: Absolute file path to edit.
edits: JSON string containing a list of edit operations. Each op is a
dict with "op" key and operation-specific fields:
- set_line: anchor, content (single line replacement)
- replace_lines: start_anchor, end_anchor, content (multi-line)
- insert_after: anchor, content
- insert_before: anchor, content
- replace: old_content, new_content, allow_multiple
- append: content
auto_cleanup: Strip hashline prefixes and echoed context from edit
content (default: True).
encoding: File encoding (default: "utf-8").
"""
# 1. Parse JSON
try:
edit_ops = json.loads(edits)
except (json.JSONDecodeError, TypeError) as e:
return f"Error: Invalid JSON in edits: {e}"
if not isinstance(edit_ops, list):
return "Error: edits must be a JSON array of operations"
if not edit_ops:
return "Error: edits array is empty"
if len(edit_ops) > 100:
return "Error: Too many edits in one call (max 100). Split into multiple calls."
# 2. Read file
resolved = _resolve(path)
if not os.path.isfile(resolved):
return f"Error: File not found: {path}"
try:
with open(resolved, "rb") as f:
raw_head = f.read(8192)
eol = "\r\n" if b"\r\n" in raw_head else "\n"
with open(resolved, encoding=encoding) as f:
content = f.read()
except Exception as e:
return f"Error: Failed to read file: {e}"
content_bytes = len(content.encode(encoding))
if content_bytes > HASHLINE_MAX_FILE_BYTES:
return f"Error: File too large for hashline_edit ({content_bytes} bytes, max 10MB)"
trailing_newline = content.endswith("\n")
lines = content.splitlines()
# 3. Categorize and validate ops
splices = [] # (start_0idx, end_0idx, new_lines, op_index)
replaces = [] # (old_content, new_content, op_index, allow_multiple)
cleanup_actions: list[str] = []
for i, op in enumerate(edit_ops):
if not isinstance(op, dict):
return f"Error: Edit #{i + 1}: operation must be a dict"
match op.get("op"):
case "set_line":
anchor = op.get("anchor", "")
err = validate_anchor(anchor, lines)
if err:
return f"Error: Edit #{i + 1} (set_line): {err}"
if "content" not in op:
return f"Error: Edit #{i + 1} (set_line): missing required field 'content'"
if not isinstance(op["content"], str):
return f"Error: Edit #{i + 1} (set_line): content must be a string"
if "\n" in op["content"] or "\r" in op["content"]:
return (
f"Error: Edit #{i + 1} (set_line): content must be a single line. "
f"Use replace_lines for multi-line replacement."
)
line_num, _ = parse_anchor(anchor)
idx = line_num - 1
new_content = op["content"]
new_lines = [new_content] if new_content else []
new_lines = maybe_strip(
new_lines,
strip_content_prefixes,
"prefix_strip",
auto_cleanup,
cleanup_actions,
)
splices.append((idx, idx, new_lines, i))
case "replace_lines":
start_anchor = op.get("start_anchor", "")
end_anchor = op.get("end_anchor", "")
err = validate_anchor(start_anchor, lines)
if err:
return f"Error: Edit #{i + 1} (replace_lines start): {err}"
err = validate_anchor(end_anchor, lines)
if err:
return f"Error: Edit #{i + 1} (replace_lines end): {err}"
start_num, _ = parse_anchor(start_anchor)
end_num, _ = parse_anchor(end_anchor)
if start_num > end_num:
return (
f"Error: Edit #{i + 1} (replace_lines): "
f"start line {start_num} > end line {end_num}"
)
if "content" not in op:
return (
f"Error: Edit #{i + 1} (replace_lines): "
f"missing required field 'content'"
)
if not isinstance(op["content"], str):
return f"Error: Edit #{i + 1} (replace_lines): content must be a string"
new_content = op["content"]
new_lines = new_content.splitlines() if new_content else []
new_lines = maybe_strip(
new_lines,
strip_content_prefixes,
"prefix_strip",
auto_cleanup,
cleanup_actions,
)
new_lines = maybe_strip(
new_lines,
lambda nl, s=start_num, e=end_num: strip_boundary_echo(lines, s, e, nl),
"boundary_echo_strip",
auto_cleanup,
cleanup_actions,
)
splices.append((start_num - 1, end_num - 1, new_lines, i))
case "insert_after":
anchor = op.get("anchor", "")
err = validate_anchor(anchor, lines)
if err:
return f"Error: Edit #{i + 1} (insert_after): {err}"
line_num, _ = parse_anchor(anchor)
idx = line_num - 1
new_content = op.get("content", "")
if not isinstance(new_content, str):
return f"Error: Edit #{i + 1} (insert_after): content must be a string"
if not new_content:
return f"Error: Edit #{i + 1} (insert_after): content is empty"
new_lines = new_content.splitlines()
new_lines = maybe_strip(
new_lines,
strip_content_prefixes,
"prefix_strip",
auto_cleanup,
cleanup_actions,
)
new_lines = maybe_strip(
new_lines,
lambda nl, _idx=idx: strip_insert_echo(lines[_idx], nl),
"insert_echo_strip",
auto_cleanup,
cleanup_actions,
)
splices.append((idx + 1, idx, new_lines, i))
case "insert_before":
anchor = op.get("anchor", "")
err = validate_anchor(anchor, lines)
if err:
return f"Error: Edit #{i + 1} (insert_before): {err}"
line_num, _ = parse_anchor(anchor)
idx = line_num - 1
new_content = op.get("content", "")
if not isinstance(new_content, str):
return f"Error: Edit #{i + 1} (insert_before): content must be a string"
if not new_content:
return f"Error: Edit #{i + 1} (insert_before): content is empty"
new_lines = new_content.splitlines()
new_lines = maybe_strip(
new_lines,
strip_content_prefixes,
"prefix_strip",
auto_cleanup,
cleanup_actions,
)
new_lines = maybe_strip(
new_lines,
lambda nl, _idx=idx: strip_insert_echo(lines[_idx], nl, position="last"),
"insert_echo_strip",
auto_cleanup,
cleanup_actions,
)
splices.append((idx, idx - 1, new_lines, i))
case "replace":
old_content = op.get("old_content")
new_content = op.get("new_content")
if old_content is None:
return f"Error: Edit #{i + 1} (replace): missing old_content"
if not isinstance(old_content, str):
return f"Error: Edit #{i + 1} (replace): old_content must be a string"
if not old_content:
return f"Error: Edit #{i + 1} (replace): old_content must not be empty"
if new_content is None:
return f"Error: Edit #{i + 1} (replace): missing new_content"
if not isinstance(new_content, str):
return f"Error: Edit #{i + 1} (replace): new_content must be a string"
allow_multiple = op.get("allow_multiple", False)
if not isinstance(allow_multiple, bool):
return f"Error: Edit #{i + 1} (replace): allow_multiple must be a boolean"
replaces.append((old_content, new_content, i, allow_multiple))
case "append":
new_content = op.get("content")
if new_content is None:
return f"Error: Edit #{i + 1} (append): missing content"
if not isinstance(new_content, str):
return f"Error: Edit #{i + 1} (append): content must be a string"
if not new_content:
return f"Error: Edit #{i + 1} (append): content must not be empty"
new_lines = new_content.splitlines()
new_lines = maybe_strip(
new_lines,
strip_content_prefixes,
"prefix_strip",
auto_cleanup,
cleanup_actions,
)
insert_point = len(lines)
splices.append((insert_point, insert_point - 1, new_lines, i))
case unknown:
return f"Error: Edit #{i + 1}: unknown op '{unknown}'"
# 4. Check for overlapping splice ranges
for j in range(len(splices)):
for k in range(j + 1, len(splices)):
s_a, e_a, _, idx_a = splices[j]
s_b, e_b, _, idx_b = splices[k]
is_insert_a = s_a > e_a
is_insert_b = s_b > e_b
if is_insert_a and is_insert_b:
continue
if is_insert_a and not is_insert_b:
if s_b <= s_a <= e_b + 1:
return (
f"Error: Overlapping edits: edit #{idx_a + 1} "
f"and edit #{idx_b + 1} affect overlapping line ranges"
)
continue
if is_insert_b and not is_insert_a:
if s_a <= s_b <= e_a + 1:
return (
f"Error: Overlapping edits: edit #{idx_a + 1} "
f"and edit #{idx_b + 1} affect overlapping line ranges"
)
continue
if not (e_a < s_b or e_b < s_a):
return (
f"Error: Overlapping edits: edit #{idx_a + 1} "
f"and edit #{idx_b + 1} affect overlapping line ranges"
)
# 5. Apply splices bottom-up
changes_made = 0
working = list(lines)
for start, end, new_lines, _ in sorted(splices, key=lambda s: (s[0], s[3]), reverse=True):
if start > end:
changes_made += 1
for k, nl in enumerate(new_lines):
working.insert(start + k, nl)
else:
old_slice = working[start : end + 1]
if old_slice != new_lines:
changes_made += 1
working[start : end + 1] = new_lines
# 6. Apply str_replace ops
joined = "\n".join(working)
replace_counts = []
for old_content, new_content, op_idx, allow_multiple in replaces:
count = joined.count(old_content)
if count == 0:
return (
f"Error: Edit #{op_idx + 1} (replace): "
f"old_content not found "
f"(note: anchor-based edits in this batch are applied first)"
)
if count > 1 and not allow_multiple:
return (
f"Error: Edit #{op_idx + 1} (replace): "
f"old_content found {count} times (must be unique). "
f"Include more surrounding context to make it unique, "
f"or use anchor-based ops instead."
)
if allow_multiple:
joined = joined.replace(old_content, new_content)
replace_counts.append((op_idx, count))
else:
joined = joined.replace(old_content, new_content, 1)
if count > 0 and old_content != new_content:
changes_made += 1
# 7. Restore trailing newline
if trailing_newline and joined and not joined.endswith("\n"):
joined += "\n"
# 8. Restore original EOL style (only convert bare \n, not existing \r\n)
if eol == "\r\n":
joined = re.sub(r"(?<!\r)\n", "\r\n", joined)
# 9. Snapshot + atomic write
try:
if before_write:
before_write()
original_mode = os.stat(resolved).st_mode
fd, tmp_path = tempfile.mkstemp(dir=os.path.dirname(resolved))
fd_open = True
try:
if hasattr(os, "fchmod"):
os.fchmod(fd, original_mode)
with os.fdopen(fd, "w", encoding=encoding, newline="") as f:
fd_open = False
f.write(joined)
os.replace(tmp_path, resolved)
except BaseException:
if fd_open:
os.close(fd)
with contextlib.suppress(OSError):
os.unlink(tmp_path)
raise
except Exception as e:
return f"Error: Failed to write file: {e}"
# 10. Build response
updated_lines = joined.splitlines()
total_lines = len(updated_lines)
# Limit returned content to first 200 lines
preview_limit = 200
hashline_content = format_hashlines(updated_lines, limit=preview_limit)
parts = [f"Applied {changes_made} edit(s) to {path}"]
if changes_made == 0:
parts.append("(content unchanged after applying edits)")
if cleanup_actions:
parts.append(f"Auto-cleanup: {', '.join(cleanup_actions)}")
if replace_counts:
for op_idx, count in replace_counts:
parts.append(f"Edit #{op_idx + 1} replaced {count} occurrence(s)")
parts.append("")
parts.append(hashline_content)
if total_lines > preview_limit:
parts.append(
f"\n(Showing first {preview_limit} of {total_lines} lines. "
f"Use read_file with offset to see more.)"
)
return "\n".join(parts)
+230
View File
@@ -0,0 +1,230 @@
"""Hashline utilities for anchor-based file editing.
Each line gets a short content hash anchor (line_number:hash). Models reference
lines by anchor instead of reproducing text. If the file changed since the model
read it, the hash won't match and the edit is cleanly rejected.
"""
import re
import zlib
# ── Constants ─────────────────────────────────────────────────────────────
# Files beyond this size are skipped/rejected in hashline mode because
# hashline anchors are not practical on files this large (minified
# bundles, logs, data dumps). Shared by view_file, grep_search, and
# hashline_edit.
HASHLINE_MAX_FILE_BYTES = 10 * 1024 * 1024 # 10 MB
# ── Hash computation ──────────────────────────────────────────────────────
def compute_line_hash(line: str) -> str:
"""Compute a 4-char hex hash for a line of text.
Uses CRC32 mod 65536, formatted as lowercase hex. Only trailing spaces
and tabs are stripped before hashing. Leading whitespace (indentation)
is included in the hash so indentation changes invalidate anchors.
This keeps stale-anchor detection safe for indentation-sensitive files
while still ignoring common trailing-whitespace noise.
Collision probability is ~0.0015% per changed line (4-char hex,
migrated from 2-char hex which had ~0.39% collision rate).
"""
stripped = line.rstrip(" \t")
crc = zlib.crc32(stripped.encode("utf-8")) & 0xFFFFFFFF
return f"{crc % 65536:04x}"
def format_hashlines(lines: list[str], offset: int = 1, limit: int = 0) -> str:
"""Format lines with N:hhhh|content prefixes.
Args:
lines: The file content split into lines.
offset: 1-indexed start line (default 1).
limit: Maximum lines to return, 0 means all.
Returns:
Formatted string with hashline prefixes.
"""
start = offset - 1 # convert to 0-indexed
if limit > 0:
selected = lines[start : start + limit]
else:
selected = lines[start:]
result_parts = []
for i, line in enumerate(selected):
line_num = offset + i
h = compute_line_hash(line)
result_parts.append(f"{line_num}:{h}|{line}")
return "\n".join(result_parts)
# ── Anchor parsing & validation ───────────────────────────────────────────
def parse_anchor(anchor: str) -> tuple[int, str]:
"""Parse an anchor string like '2:a3b1' into (line_number, hash).
Raises:
ValueError: If the anchor format is invalid.
"""
if ":" not in anchor:
raise ValueError(f"Invalid anchor format (no colon): '{anchor}'")
parts = anchor.split(":", 1)
try:
line_num = int(parts[0])
except ValueError as exc:
raise ValueError(f"Invalid anchor format (line number not an integer): '{anchor}'") from exc
hash_str = parts[1]
if len(hash_str) != 4:
raise ValueError(f"Invalid anchor format (hash must be 4 chars): '{anchor}'")
if not all(c in "0123456789abcdef" for c in hash_str):
raise ValueError(f"Invalid anchor format (hash must be lowercase hex): '{anchor}'")
return line_num, hash_str
def validate_anchor(anchor: str, lines: list[str]) -> str | None:
"""Validate an anchor against file lines.
Returns:
None if valid, error message string if invalid.
"""
try:
line_num, expected_hash = parse_anchor(anchor)
except ValueError as e:
return str(e)
if line_num < 1 or line_num > len(lines):
return f"Line {line_num} out of range (file has {len(lines)} lines)"
actual_line = lines[line_num - 1]
actual_hash = compute_line_hash(actual_line)
if actual_hash != expected_hash:
preview = actual_line.strip()
if len(preview) > 80:
preview = preview[:77] + "..."
return (
f"Hash mismatch at line {line_num}: expected '{expected_hash}', "
f"got '{actual_hash}'. Current content: {preview!r}. "
f"Re-read the file to get current anchors."
)
return None
# ── Auto-cleanup helpers ──────────────────────────────────────────────────
# Shared by both file_ops.hashline_edit and file_system_toolkits.hashline_edit.
HASHLINE_PREFIX_RE = re.compile(r"^\d+:[0-9a-f]{4}\|")
def strip_content_prefixes(lines: list[str]) -> list[str]:
"""Strip hashline prefixes from content lines when all have them.
LLMs frequently copy hashline-formatted text (e.g. '5:a3b1|content') into
their content fields. Only strips when 2+ non-empty lines all match the
exact hashline prefix pattern (N:hhhh|). Single-line content is left alone
to avoid false positives on literal text that happens to match the pattern.
"""
if not lines:
return lines
non_empty = [ln for ln in lines if ln]
if len(non_empty) < 2:
return lines
prefix_count = sum(1 for ln in non_empty if HASHLINE_PREFIX_RE.match(ln))
if prefix_count < len(non_empty):
return lines
return [HASHLINE_PREFIX_RE.sub("", ln) for ln in lines]
def whitespace_equal(a: str, b: str) -> bool:
"""Compare strings ignoring spaces and tabs."""
return a.replace(" ", "").replace("\t", "") == b.replace(" ", "").replace("\t", "")
def strip_insert_echo(
anchor_line: str, new_lines: list[str], *, position: str = "first"
) -> list[str]:
"""Strip echoed anchor line from insert content.
If the model echoes the anchor line in inserted content, remove it to
avoid duplication. Only applies when content has 2+ lines and both the
anchor and checked content line are non-blank.
position="first" (insert_after): check first line, strip from front.
position="last" (insert_before): check last line, strip from end.
"""
if len(new_lines) <= 1:
return new_lines
if position == "last":
if not anchor_line.strip() or not new_lines[-1].strip():
return new_lines
if whitespace_equal(new_lines[-1], anchor_line):
return new_lines[:-1]
else:
if not anchor_line.strip() or not new_lines[0].strip():
return new_lines
if whitespace_equal(new_lines[0], anchor_line):
return new_lines[1:]
return new_lines
def strip_boundary_echo(
file_lines: list[str], start_1idx: int, end_1idx: int, new_lines: list[str]
) -> list[str]:
"""Strip echoed boundary context from replace_lines content.
If the model includes the line before AND after the replaced range as part
of the replacement content, strip those echoed boundary lines. Both
boundaries must echo simultaneously before either is stripped (a single
boundary match is too likely to be a coincidence with real content).
Only applies when the replacement has more lines than the range being
replaced, and both the boundary line and content line are non-blank.
"""
range_count = end_1idx - start_1idx + 1
if len(new_lines) <= 1 or len(new_lines) <= range_count:
return new_lines
# Check if leading boundary echoes
before_idx = start_1idx - 2 # 0-indexed line before range
leading_echoes = (
before_idx >= 0
and new_lines[0].strip()
and file_lines[before_idx].strip()
and whitespace_equal(new_lines[0], file_lines[before_idx])
)
# Check if trailing boundary echoes
after_idx = end_1idx # 0-indexed line after range
trailing_echoes = (
after_idx < len(file_lines)
and new_lines[-1].strip()
and file_lines[after_idx].strip()
and whitespace_equal(new_lines[-1], file_lines[after_idx])
)
# Only strip if BOTH boundaries echo and there is content between them.
# len < 3 means no real content between the two boundary lines, so
# stripping would produce an empty list (accidental deletion).
if not (leading_echoes and trailing_echoes) or len(new_lines) < 3:
return new_lines
return new_lines[1:-1]
def maybe_strip(new_lines, strip_fn, action_name, auto_cleanup, cleanup_actions):
"""Apply a strip function if auto_cleanup is enabled, tracking actions."""
if not auto_cleanup:
return new_lines
cleaned = strip_fn(new_lines)
if cleaned != new_lines:
if action_name not in cleanup_actions:
cleanup_actions.append(action_name)
return cleaned
return new_lines
+5
View File
@@ -56,6 +56,8 @@ from .email_tool import register_tools as register_email
from .exa_search_tool import register_tools as register_exa_search
from .example_tool import register_tools as register_example
from .excel_tool import register_tools as register_excel
# File system toolkits
from .file_system_toolkits.apply_diff import register_tools as register_apply_diff
from .file_system_toolkits.apply_patch import register_tools as register_apply_patch
from .file_system_toolkits.data_tools import register_tools as register_data_tools
@@ -63,6 +65,7 @@ from .file_system_toolkits.execute_command_tool import (
register_tools as register_execute_command,
)
from .file_system_toolkits.grep_search import register_tools as register_grep_search
from .file_system_toolkits.hashline_edit import register_tools as register_hashline_edit
from .file_system_toolkits.list_dir import register_tools as register_list_dir
from .file_system_toolkits.replace_file_content import (
register_tools as register_replace_file_content,
@@ -162,6 +165,8 @@ def _register_verified(
register_apply_diff(mcp)
register_apply_patch(mcp)
register_grep_search(mcp)
# hashline_edit: anchor-based editing, pairs with view_file/grep_search hashline mode
register_hashline_edit(mcp)
register_execute_command(mcp)
register_data_tools(mcp)
register_csv(mcp)
@@ -36,12 +36,13 @@ grep_search(
| `agent_id` | str | Yes | - | The ID of the agent |
| `session_id` | str | Yes | - | The ID of the current session |
| `recursive` | bool | No | False | Whether to search recursively in subdirectories |
| `hashline` | bool | No | False | If True, include an `anchor` field (`N:hhhh`) in each match for use with `hashline_edit` |
## Returns
Returns a dictionary with the following structure:
**Success:**
**Success (default mode):**
```python
{
"success": True,
@@ -64,6 +65,25 @@ Returns a dictionary with the following structure:
}
```
**Success (hashline mode):**
```python
{
"success": True,
"pattern": "def \\w+\\(",
"path": "src",
"recursive": True,
"matches": [
{
"file": "src/main.py",
"line_number": 10,
"line_content": "def process_data(args):",
"anchor": "10:a3f2"
}
],
"total_matches": 1
}
```
**No matches:**
```python
{
@@ -3,6 +3,8 @@ import re
from mcp.server.fastmcp import FastMCP
from aden_tools.hashline import HASHLINE_MAX_FILE_BYTES, compute_line_hash
from ..security import WORKSPACES_DIR, get_secure_path
@@ -17,12 +19,14 @@ def register_tools(mcp: FastMCP) -> None:
agent_id: str,
session_id: str,
recursive: bool = False,
hashline: bool = False,
) -> dict:
"""
Search for a pattern in a file or directory within the session sandbox.
Use this when you need to find specific content or patterns in files using regex.
Set recursive=True to search through all subdirectories.
Set hashline=True to include anchor hashes in results for use with hashline_edit.
Args:
path: The path to search in (file or directory, relative to session root)
@@ -31,6 +35,7 @@ def register_tools(mcp: FastMCP) -> None:
agent_id: The ID of the agent
session_id: The ID of the current session
recursive: Whether to search recursively in directories (default: False)
hashline: If True, include anchor field (N:hhhh) in each match (default: False)
Returns:
Dict with search results and match details, or error dict
@@ -48,6 +53,7 @@ def register_tools(mcp: FastMCP) -> None:
session_root = os.path.join(WORKSPACES_DIR, workspace_id, agent_id, session_id)
matches = []
skipped_large_files = []
if os.path.isfile(secure_path):
files = [secure_path]
@@ -67,21 +73,46 @@ def register_tools(mcp: FastMCP) -> None:
# Calculate relative path for display
display_path = os.path.relpath(file_path, session_root)
try:
with open(file_path, encoding="utf-8") as f:
for i, line in enumerate(f, 1):
if regex.search(line):
if hashline:
# Use splitlines() for anchor consistency with
# view_file/hashline_edit (handles Unicode line
# separators like \u2028, \x85).
# Skip files > 10MB to avoid excessive memory use.
file_size = os.path.getsize(file_path)
if file_size > HASHLINE_MAX_FILE_BYTES:
skipped_large_files.append(display_path)
continue
with open(file_path, encoding="utf-8") as f:
content = f.read()
for i, line in enumerate(content.splitlines(), 1):
if not regex.search(line):
continue
matches.append(
{
"file": display_path,
"line_number": i,
"line_content": line,
"anchor": f"{i}:{compute_line_hash(line)}",
}
)
else:
with open(file_path, encoding="utf-8") as f:
for i, line in enumerate(f, 1):
bare = line.rstrip("\n\r")
if not regex.search(bare):
continue
matches.append(
{
"file": display_path,
"line_number": i,
"line_content": line.strip(),
"line_content": bare.strip(),
}
)
except (UnicodeDecodeError, PermissionError):
# Skips files that cannot be decoded or lack permissions
continue
return {
result = {
"success": True,
"pattern": pattern,
"path": path,
@@ -89,6 +120,9 @@ def register_tools(mcp: FastMCP) -> None:
"matches": matches,
"total_matches": len(matches),
}
if skipped_large_files:
result["skipped_large_files"] = skipped_large_files
return result
# 2. Specific Exception Handling (Issue #55 Requirements)
except FileNotFoundError:
@@ -0,0 +1,19 @@
"""Backward-compatible re-exports from aden_tools.hashline.
This module has been moved to aden_tools.hashline for shared use across
both file_system_toolkits and file_ops (coder tools). All imports continue
to work via this shim.
"""
from aden_tools.hashline import ( # noqa: F401
HASHLINE_PREFIX_RE,
compute_line_hash,
format_hashlines,
maybe_strip,
parse_anchor,
strip_boundary_echo,
strip_content_prefixes,
strip_insert_echo,
validate_anchor,
whitespace_equal,
)
@@ -0,0 +1,216 @@
# Hashline Edit Tool
Edit files using anchor-based line references for precise, hash-validated edits.
## Description
The `hashline_edit` tool enables file editing using short content-hash anchors (`N:hhhh`) instead of requiring exact text reproduction. Each line's anchor includes a 4-character hash of its content. If the file has changed since the model last read it, the hash won't match and the edit is cleanly rejected.
Use this tool together with `view_file(hashline=True)` and `grep_search(hashline=True)`, which return anchors for each line.
## Use Cases
- Making targeted edits after reading a file with `view_file(hashline=True)`
- Replacing single lines, line ranges, or inserting new lines by anchor
- Batch editing multiple locations in a single atomic call
- Falling back to string replacement when anchors are not available
## Usage
```python
import json
# First, read the file with hashline mode to get anchors
content = view_file(path="app.py", hashline=True, workspace_id="ws-1", agent_id="a-1", session_id="s-1")
# Returns lines like: 1:a3b1|def main(): 2:f1c2| print("hello") ...
# Then edit using the anchors
hashline_edit(
path="app.py",
edits=json.dumps([
{"op": "set_line", "anchor": "2:f1c2", "content": ' print("goodbye")'}
]),
workspace_id="ws-1",
agent_id="a-1",
session_id="s-1"
)
```
## Arguments
| Argument | Type | Required | Default | Description |
|----------|------|----------|---------|-------------|
| `path` | str | Yes | - | The path to the file (relative to session root) |
| `edits` | str | Yes | - | JSON string containing a list of edit operations (see Operations below) |
| `workspace_id` | str | Yes | - | The ID of the workspace |
| `agent_id` | str | Yes | - | The ID of the agent |
| `session_id` | str | Yes | - | The ID of the current session |
| `auto_cleanup` | bool | No | `True` | Strip hashline prefixes and echoed context from content. Set to `False` to write content exactly as provided. |
| `encoding` | str | No | `"utf-8"` | File encoding. Must match the file's actual encoding. |
## Operations
The `edits` parameter is a JSON array of operation objects. Each object must have an `"op"` field:
| Op | Fields | Behavior |
|---|---|---|
| `set_line` | `anchor`, `content` | Replace one line identified by anchor (use `content: ""` to delete the line) |
| `replace_lines` | `start_anchor`, `end_anchor`, `content` | Replace a range of lines (can expand or shrink) |
| `insert_after` | `anchor`, `content` | Insert new lines after the anchor line |
| `insert_before` | `anchor`, `content` | Insert new lines before the anchor line |
| `replace` | `old_content`, `new_content`, `allow_multiple` (optional) | Fallback string replacement; errors if 0 or 2+ matches (unless `allow_multiple: true`) |
| `append` | `content` | Append new lines to end of file (works for empty files too) |
## Returns
**Success:**
```python
{
"success": True,
"path": "app.py",
"edits_applied": 2,
"content": "1:b2c4|def main():\n2:c4a1| print(\"goodbye\")\n..."
}
```
**Success (noop, content unchanged after applying edits):**
```python
{
"success": True,
"path": "app.py",
"edits_applied": 0,
"note": "Content unchanged after applying edits",
"content": "1:b2c4|def main():\n..."
}
```
**Success (with auto-cleanup applied):**
```python
{
"success": True,
"path": "app.py",
"edits_applied": 1,
"content": "...",
"cleanup_applied": ["prefix_strip"]
}
```
The `cleanup_applied` field is only present when cleanup actually modified content. Possible values: `prefix_strip`, `boundary_echo_strip`, `insert_echo_strip`.
**Success (replace with allow_multiple):**
```python
{
"success": True,
"path": "app.py",
"edits_applied": 1,
"content": "...",
"replacements": {"edit_1": 3}
}
```
The `replacements` field is only present when `allow_multiple: true` was used, showing the count per replace op.
**Error:**
```python
{
"error": "Edit #1 (set_line): Hash mismatch at line 2: expected 'f1c2', got 'a3b1'. Re-read the file to get current anchors."
}
```
## Error Handling
- Returns an error if the file doesn't exist
- Returns an error if any anchor hash doesn't match (stale read)
- Returns an error if a line number is out of range
- Returns an error if splice ranges overlap within a batch
- Returns an error if a `replace` op matches 0 or 2+ times (unless `allow_multiple: true`)
- Returns an error for unknown op types or invalid JSON
- All edits are validated before any writes occur (atomic): on any error the file is unchanged
## Examples
### Replacing a single line
```python
edits = json.dumps([
{"op": "set_line", "anchor": "5:a3b1", "content": " return result"}
])
result = hashline_edit(path="app.py", edits=edits, workspace_id="ws-1", agent_id="a-1", session_id="s-1")
# Returns: {"success": True, "path": "app.py", "edits_applied": 1, "content": "..."}
```
### Replacing a range of lines
```python
edits = json.dumps([{
"op": "replace_lines",
"start_anchor": "10:b1c2",
"end_anchor": "15:c2d3",
"content": " # simplified\n return x + y"
}])
result = hashline_edit(path="math.py", edits=edits, workspace_id="ws-1", agent_id="a-1", session_id="s-1")
```
### Inserting new lines after
```python
edits = json.dumps([
{"op": "insert_after", "anchor": "3:d4e5", "content": "import os\nimport sys"}
])
result = hashline_edit(path="app.py", edits=edits, workspace_id="ws-1", agent_id="a-1", session_id="s-1")
```
### Inserting new lines before
```python
edits = json.dumps([
{"op": "insert_before", "anchor": "1:a1b2", "content": "#!/usr/bin/env python3"}
])
result = hashline_edit(path="app.py", edits=edits, workspace_id="ws-1", agent_id="a-1", session_id="s-1")
```
### Batch editing
```python
edits = json.dumps([
{"op": "set_line", "anchor": "1:a1b2", "content": "#!/usr/bin/env python3"},
{"op": "insert_after", "anchor": "2:b2c3", "content": "import logging"},
{"op": "set_line", "anchor": "10:c3d4", "content": " logging.info('done')"},
])
result = hashline_edit(path="app.py", edits=edits, workspace_id="ws-1", agent_id="a-1", session_id="s-1")
```
### Replace all occurrences
```python
edits = json.dumps([
{"op": "replace", "old_content": "old_name", "new_content": "new_name", "allow_multiple": True}
])
result = hashline_edit(path="app.py", edits=edits, workspace_id="ws-1", agent_id="a-1", session_id="s-1")
# Returns: {..., "replacements": {"edit_1": 5}}
```
## Notes
- Anchors are generated by `view_file(hashline=True)` and `grep_search(hashline=True)`
- The hash is a CRC32-based 4-char hex digest of the line content (with trailing spaces and tabs stripped; leading whitespace is included so indentation changes invalidate anchors). Collision probability is ~0.0015% per changed line.
- All anchor-based ops are validated before any writes occur; if any op fails validation, the file is left unchanged
- String `replace` ops are applied after all anchor-based splices, so they match against post-splice content
- Original line endings (LF or CRLF) are preserved
- The response includes the updated file content in hashline format, so subsequent edits can use the new anchors without re-reading
## Auto-Cleanup Details
When `auto_cleanup=True` (the default), the tool strips hashline prefixes and echoed context that LLMs frequently include in edit content. Prefix stripping uses a **2+ non-empty line threshold** to avoid false positives. The prefix regex matches the `N:hhhh|` pattern (4-char hex hash).
**Why the threshold matters:** Single-line content matching the `N:hhhh|` pattern is ambiguous. It could be literal content (CSV data, config values, log format strings) that happens to match the pattern. With 2+ lines all matching, the probability of a false positive drops dramatically.
**Single-line example (NOT stripped):**
```python
# set_line with content "5:a3b1|hello" writes literally "5:a3b1|hello"
{"op": "set_line", "anchor": "2:f1c2", "content": "5:a3b1|hello"}
```
**Multi-line example (stripped):**
```python
# replace_lines where all lines match N:hhhh| pattern gets stripped
{"op": "replace_lines", "start_anchor": "2:f1c2", "end_anchor": "3:b2d3",
"content": "2:a3b1|BBB\n3:c4d2|CCC"}
# Writes "BBB\nCCC" (prefixes removed)
```
**Escape hatch:** Set `auto_cleanup=False` to write content exactly as provided, bypassing all cleanup heuristics.
@@ -0,0 +1,3 @@
from .hashline_edit import register_tools
__all__ = ["register_tools"]
@@ -0,0 +1,420 @@
import contextlib
import json
import os
import re
import tempfile
from mcp.server.fastmcp import FastMCP
from aden_tools.hashline import (
HASHLINE_MAX_FILE_BYTES,
format_hashlines,
maybe_strip,
parse_anchor,
strip_boundary_echo,
strip_content_prefixes,
strip_insert_echo,
validate_anchor,
)
from ..security import get_secure_path
def register_tools(mcp: FastMCP) -> None:
"""Register hashline edit tools with the MCP server."""
@mcp.tool()
def hashline_edit(
path: str,
edits: str,
workspace_id: str,
agent_id: str,
session_id: str,
auto_cleanup: bool = True,
encoding: str = "utf-8",
) -> dict:
"""
Purpose
Edit a file using anchor-based line references (N:hash) for precise edits.
When to use
After reading a file with view_file(hashline=True), use the anchors to make
targeted edits without reproducing exact file content.
Rules & Constraints
Anchors must match the current file content (hash validation).
All edits in a batch are validated before any are applied (atomic).
Overlapping line ranges within a single call are rejected.
Args:
path: The path to the file (relative to session root)
edits: JSON string containing a list of edit operations.
Each op is a dict with:
- set_line: anchor, content
- replace_lines: start_anchor, end_anchor, content
- insert_after: anchor, content
- insert_before: anchor, content
- replace: old_content, new_content, allow_multiple
- append: content
workspace_id: The ID of workspace
agent_id: The ID of agent
session_id: The ID of the current session
auto_cleanup: If True (default), automatically strip hashline prefixes and
echoed context from edit content. Set to False to write content exactly
as provided.
encoding: File encoding (default "utf-8"). Must match the file's actual encoding.
Returns:
Dict with success status, updated hashline content, and edit count, or error dict
"""
# 1. Parse JSON
try:
edit_ops = json.loads(edits)
except (json.JSONDecodeError, TypeError) as e:
return {"error": f"Invalid JSON in edits: {e}"}
if not isinstance(edit_ops, list):
return {"error": "edits must be a JSON array of operations"}
if not edit_ops:
return {"error": "edits array is empty"}
if len(edit_ops) > 100:
return {"error": "Too many edits in one call (max 100). Split into multiple calls."}
# 2. Read file
try:
secure_path = get_secure_path(path, workspace_id, agent_id, session_id)
if not os.path.exists(secure_path):
return {"error": f"File not found at {path}"}
if not os.path.isfile(secure_path):
return {"error": f"Path is not a file: {path}"}
with open(secure_path, "rb") as f:
raw_head = f.read(8192)
eol = "\r\n" if b"\r\n" in raw_head else "\n"
with open(secure_path, encoding=encoding) as f:
content = f.read()
except Exception as e:
return {"error": f"Failed to read file: {e}"}
content_bytes = len(content.encode(encoding))
if content_bytes > HASHLINE_MAX_FILE_BYTES:
return {"error": f"File too large for hashline_edit ({content_bytes} bytes, max 10MB)"}
trailing_newline = content.endswith("\n")
lines = content.splitlines()
# 3. Categorize and validate ops
splices = [] # (start_0idx, end_0idx, new_lines, op_index)
replaces = [] # (old_content, new_content, op_index, allow_multiple)
cleanup_actions = []
for i, op in enumerate(edit_ops):
if not isinstance(op, dict):
return {"error": f"Edit #{i + 1}: operation must be a dict"}
match op.get("op"):
case "set_line":
anchor = op.get("anchor", "")
err = validate_anchor(anchor, lines)
if err:
return {"error": f"Edit #{i + 1} (set_line): {err}"}
if "content" not in op:
return {
"error": f"Edit #{i + 1} (set_line): missing required field 'content'"
}
if not isinstance(op["content"], str):
return {"error": f"Edit #{i + 1} (set_line): content must be a string"}
if "\n" in op["content"] or "\r" in op["content"]:
return {
"error": f"Edit #{i + 1} (set_line): content must be a single line. "
f"Use replace_lines for multi-line replacement."
}
line_num, _ = parse_anchor(anchor)
idx = line_num - 1
new_content = op["content"]
new_lines = [new_content] if new_content else []
new_lines = maybe_strip(
new_lines,
strip_content_prefixes,
"prefix_strip",
auto_cleanup,
cleanup_actions,
)
splices.append((idx, idx, new_lines, i))
case "replace_lines":
start_anchor = op.get("start_anchor", "")
end_anchor = op.get("end_anchor", "")
err = validate_anchor(start_anchor, lines)
if err:
return {"error": f"Edit #{i + 1} (replace_lines start): {err}"}
err = validate_anchor(end_anchor, lines)
if err:
return {"error": f"Edit #{i + 1} (replace_lines end): {err}"}
start_num, _ = parse_anchor(start_anchor)
end_num, _ = parse_anchor(end_anchor)
if start_num > end_num:
return {
"error": f"Edit #{i + 1} (replace_lines): "
f"start line {start_num} > end line {end_num}"
}
if "content" not in op:
return {
"error": (
f"Edit #{i + 1} (replace_lines): missing required field 'content'"
)
}
if not isinstance(op["content"], str):
return {"error": f"Edit #{i + 1} (replace_lines): content must be a string"}
new_content = op["content"]
new_lines = new_content.splitlines() if new_content else []
new_lines = maybe_strip(
new_lines,
strip_content_prefixes,
"prefix_strip",
auto_cleanup,
cleanup_actions,
)
new_lines = maybe_strip(
new_lines,
lambda nl, s=start_num, e=end_num: strip_boundary_echo(lines, s, e, nl),
"boundary_echo_strip",
auto_cleanup,
cleanup_actions,
)
splices.append((start_num - 1, end_num - 1, new_lines, i))
case "insert_after":
anchor = op.get("anchor", "")
err = validate_anchor(anchor, lines)
if err:
return {"error": f"Edit #{i + 1} (insert_after): {err}"}
line_num, _ = parse_anchor(anchor)
idx = line_num - 1
new_content = op.get("content", "")
if not isinstance(new_content, str):
return {"error": f"Edit #{i + 1} (insert_after): content must be a string"}
if not new_content:
return {"error": f"Edit #{i + 1} (insert_after): content is empty"}
new_lines = new_content.splitlines()
new_lines = maybe_strip(
new_lines,
strip_content_prefixes,
"prefix_strip",
auto_cleanup,
cleanup_actions,
)
new_lines = maybe_strip(
new_lines,
lambda nl, _idx=idx: strip_insert_echo(lines[_idx], nl),
"insert_echo_strip",
auto_cleanup,
cleanup_actions,
)
splices.append((idx + 1, idx, new_lines, i))
case "insert_before":
anchor = op.get("anchor", "")
err = validate_anchor(anchor, lines)
if err:
return {"error": f"Edit #{i + 1} (insert_before): {err}"}
line_num, _ = parse_anchor(anchor)
idx = line_num - 1
new_content = op.get("content", "")
if not isinstance(new_content, str):
return {"error": f"Edit #{i + 1} (insert_before): content must be a string"}
if not new_content:
return {"error": f"Edit #{i + 1} (insert_before): content is empty"}
new_lines = new_content.splitlines()
new_lines = maybe_strip(
new_lines,
strip_content_prefixes,
"prefix_strip",
auto_cleanup,
cleanup_actions,
)
new_lines = maybe_strip(
new_lines,
lambda nl, _idx=idx: strip_insert_echo(lines[_idx], nl, position="last"),
"insert_echo_strip",
auto_cleanup,
cleanup_actions,
)
splices.append((idx, idx - 1, new_lines, i))
case "replace":
old_content = op.get("old_content")
new_content = op.get("new_content")
if old_content is None:
return {"error": f"Edit #{i + 1} (replace): missing old_content"}
if not isinstance(old_content, str):
return {"error": f"Edit #{i + 1} (replace): old_content must be a string"}
if not old_content:
return {"error": f"Edit #{i + 1} (replace): old_content must not be empty"}
if new_content is None:
return {"error": f"Edit #{i + 1} (replace): missing new_content"}
if not isinstance(new_content, str):
return {"error": f"Edit #{i + 1} (replace): new_content must be a string"}
allow_multiple = op.get("allow_multiple", False)
if not isinstance(allow_multiple, bool):
return {
"error": f"Edit #{i + 1} (replace): allow_multiple must be a boolean"
}
replaces.append((old_content, new_content, i, allow_multiple))
case "append":
new_content = op.get("content")
if new_content is None:
return {"error": f"Edit #{i + 1} (append): missing content"}
if not isinstance(new_content, str):
return {"error": f"Edit #{i + 1} (append): content must be a string"}
if not new_content:
return {"error": f"Edit #{i + 1} (append): content must not be empty"}
new_lines = new_content.splitlines()
new_lines = maybe_strip(
new_lines,
strip_content_prefixes,
"prefix_strip",
auto_cleanup,
cleanup_actions,
)
insert_point = len(lines)
splices.append((insert_point, insert_point - 1, new_lines, i))
case unknown:
return {"error": f"Edit #{i + 1}: unknown op '{unknown}'"}
# 4. Check for overlapping splice ranges
for j in range(len(splices)):
for k in range(j + 1, len(splices)):
s_a, e_a, _, idx_a = splices[j]
s_b, e_b, _, idx_b = splices[k]
is_insert_a = s_a > e_a
is_insert_b = s_b > e_b
if is_insert_a and is_insert_b:
continue
if is_insert_a and not is_insert_b:
if s_b <= s_a <= e_b + 1:
return {
"error": (
f"Overlapping edits: edit #{idx_a + 1} "
f"and edit #{idx_b + 1} affect overlapping line ranges"
)
}
continue
if is_insert_b and not is_insert_a:
if s_a <= s_b <= e_a + 1:
return {
"error": (
f"Overlapping edits: edit #{idx_a + 1} "
f"and edit #{idx_b + 1} affect overlapping line ranges"
)
}
continue
if not (e_a < s_b or e_b < s_a):
return {
"error": (
f"Overlapping edits: edit #{idx_a + 1} "
f"and edit #{idx_b + 1} affect overlapping line ranges"
)
}
# 5. Apply splices bottom-up
changes_made = 0
working = list(lines)
for start, end, new_lines, _ in sorted(splices, key=lambda s: (s[0], s[3]), reverse=True):
if start > end:
changes_made += 1
for k, nl in enumerate(new_lines):
working.insert(start + k, nl)
else:
old_slice = working[start : end + 1]
if old_slice != new_lines:
changes_made += 1
working[start : end + 1] = new_lines
# 6. Apply str_replace ops
joined = "\n".join(working)
replace_counts = []
for old_content, new_content, op_idx, allow_multiple in replaces:
count = joined.count(old_content)
if count == 0:
return {
"error": (
f"Edit #{op_idx + 1} (replace): "
f"old_content not found "
f"(note: anchor-based edits in this batch are applied first)"
)
}
if count > 1 and not allow_multiple:
return {
"error": (
f"Edit #{op_idx + 1} (replace): "
f"old_content found {count} times (must be unique). "
f"Include more surrounding context to make it unique, "
f"or use anchor-based ops instead."
)
}
if allow_multiple:
joined = joined.replace(old_content, new_content)
replace_counts.append((op_idx, count))
else:
joined = joined.replace(old_content, new_content, 1)
if count > 0 and old_content != new_content:
changes_made += 1
# 7. Restore trailing newline
if trailing_newline and joined and not joined.endswith("\n"):
joined += "\n"
# 8. Restore original EOL style (only convert bare \n, not existing \r\n)
if eol == "\r\n":
joined = re.sub(r"(?<!\r)\n", "\r\n", joined)
# 9. Atomic write (write-to-tmp + os.replace)
try:
original_mode = os.stat(secure_path).st_mode
fd, tmp_path = tempfile.mkstemp(dir=os.path.dirname(secure_path))
fd_open = True
try:
if hasattr(os, "fchmod"):
os.fchmod(fd, original_mode)
with os.fdopen(fd, "w", encoding=encoding, newline="") as f:
fd_open = False
f.write(joined)
os.replace(tmp_path, secure_path)
except BaseException:
if fd_open:
os.close(fd)
with contextlib.suppress(OSError):
os.unlink(tmp_path)
raise
except Exception as e:
return {"error": f"Failed to write file: {e}"}
# 10. Build response
updated_lines = joined.splitlines()
hashline_content = format_hashlines(updated_lines)
result = {
"success": True,
"path": path,
"edits_applied": changes_made,
"content": hashline_content,
}
if changes_made == 0:
result["note"] = "Content unchanged after applying edits"
if cleanup_actions:
result["cleanup_applied"] = cleanup_actions
if replace_counts:
result["replacements"] = {
f"edit_{op_idx + 1}": count for op_idx, count in replace_counts
}
return result
@@ -32,12 +32,17 @@ view_file(
| `workspace_id` | str | Yes | - | The ID of the workspace |
| `agent_id` | str | Yes | - | The ID of the agent |
| `session_id` | str | Yes | - | The ID of the current session |
| `encoding` | str | No | `"utf-8"` | The encoding to use for reading the file |
| `max_size` | int | No | `10485760` | Maximum size of file content to return in bytes (10 MB) |
| `hashline` | bool | No | `False` | If True, return content with `N:hhhh\|content` anchors for use with `hashline_edit` |
| `offset` | int | No | `1` | 1-indexed start line (only used when `hashline=True`) |
| `limit` | int | No | `0` | Max lines to return, 0 = all (only used when `hashline=True`) |
## Returns
Returns a dictionary with the following structure:
**Success:**
**Success (default mode):**
```python
{
"success": True,
@@ -48,6 +53,21 @@ Returns a dictionary with the following structure:
}
```
**Success (hashline mode):**
```python
{
"success": True,
"path": "app.py",
"content": "1:a3f2|def main():\n2:f1c4| print(\"hello\")",
"hashline": True,
"offset": 1,
"limit": 0,
"total_lines": 2,
"shown_lines": 2,
"size_bytes": 35
}
```
**Error:**
```python
{
@@ -2,6 +2,8 @@ import os
from mcp.server.fastmcp import FastMCP
from aden_tools.hashline import HASHLINE_MAX_FILE_BYTES, format_hashlines
from ..security import get_secure_path
@@ -18,7 +20,10 @@ def register_tools(mcp: FastMCP) -> None:
agent_id: str,
session_id: str,
encoding: str = "utf-8",
max_size: int = 10 * 1024 * 1024,
max_size: int = HASHLINE_MAX_FILE_BYTES,
hashline: bool = False,
offset: int = 1,
limit: int = 0,
) -> dict:
"""
Purpose
@@ -41,6 +46,10 @@ def register_tools(mcp: FastMCP) -> None:
session_id: The ID of the current session
encoding: The encoding to use for reading the file (default: "utf-8")
max_size: The maximum size of file content to return in bytes (default: 10MB)
hashline: If True, return content with N:hhhh|content anchors
for use with hashline_edit (default: False)
offset: 1-indexed start line, only used when hashline=True (default: 1)
limit: Max lines to return, 0 = all, only used when hashline=True (default: 0)
Returns:
Dict with file content and metadata, or error dict
@@ -57,8 +66,59 @@ def register_tools(mcp: FastMCP) -> None:
return {"error": f"Path is not a file: {path}"}
with open(secure_path, encoding=encoding) as f:
content = f.read()
content_raw = f.read()
if not hashline and (offset != 1 or limit != 0):
return {
"error": "offset and limit are only supported when hashline=True. "
"Set hashline=True to use paging."
}
if hashline:
if offset < 1:
return {"error": f"offset must be >= 1, got {offset}"}
if limit < 0:
return {"error": f"limit must be >= 0, got {limit}"}
all_lines = content_raw.splitlines()
total_lines = len(all_lines)
raw_size = len(content_raw.encode(encoding))
if offset > max(total_lines, 1):
return {"error": f"offset {offset} is beyond end of file ({total_lines} lines)"}
# Check size after considering offset/limit. When paging
# (offset or limit set), only check the formatted output size.
# When reading the full file, check the raw size.
is_paging = offset > 1 or limit > 0
if not is_paging and raw_size > max_size:
return {
"error": f"File too large for hashline mode ({raw_size} bytes, "
f"max {max_size}). Use offset and limit to read a section at a time."
}
formatted = format_hashlines(all_lines, offset=offset, limit=limit)
shown_lines = len(formatted.splitlines()) if formatted else 0
if is_paging and len(formatted.encode(encoding)) > max_size:
return {
"error": f"Requested section too large ({shown_lines} lines). "
f"Reduce limit to read a smaller section."
}
return {
"success": True,
"path": path,
"content": formatted,
"hashline": True,
"offset": offset,
"limit": limit,
"total_lines": total_lines,
"shown_lines": shown_lines,
"size_bytes": raw_size,
}
content = content_raw
if len(content.encode(encoding)) > max_size:
content = content[:max_size]
content += "\n\n[... Content truncated due to size limit ...]"
@@ -67,7 +127,7 @@ def register_tools(mcp: FastMCP) -> None:
"success": True,
"path": path,
"content": content,
"size_bytes": len(content.encode("utf-8")),
"size_bytes": len(content.encode(encoding)),
"lines": len(content.splitlines()),
}
except Exception as e:
+60 -1
View File
@@ -1,11 +1,18 @@
"""Shared fixtures for tools tests."""
from __future__ import annotations
import logging
import os
from collections.abc import Callable
from pathlib import Path
import pytest
from fastmcp import FastMCP
from aden_tools.credentials import CredentialStoreAdapter
from aden_tools.credentials import CREDENTIAL_SPECS, CredentialStoreAdapter
logger = logging.getLogger(__name__)
@pytest.fixture
@@ -56,3 +63,55 @@ def large_text_file(tmp_path: Path) -> Path:
large_file = tmp_path / "large.txt"
large_file.write_text("x" * 20_000_000) # 20MB
return large_file
@pytest.fixture(scope="session")
def live_credential_resolver() -> Callable[[str], str | None]:
"""Resolve live credentials for integration tests.
Tries two sources in order:
1. Environment variable (spec.env_var)
2. CredentialStoreAdapter.default() (encrypted store + env fallback)
Returns a callable: resolver(credential_name) -> str | None.
Credential values are never logged or exposed in test output.
"""
_adapter: CredentialStoreAdapter | None = None
_adapter_init_failed = False
def _get_adapter() -> CredentialStoreAdapter | None:
nonlocal _adapter, _adapter_init_failed
if _adapter is not None:
return _adapter
if _adapter_init_failed:
return None
try:
_adapter = CredentialStoreAdapter.default()
except Exception as exc:
logger.debug("Could not initialize CredentialStoreAdapter: %s", exc)
_adapter_init_failed = True
return _adapter
def resolve(credential_name: str) -> str | None:
spec = CREDENTIAL_SPECS.get(credential_name)
if spec is None:
return None
# 1. Try env var directly
value = os.environ.get(spec.env_var)
if value:
return value
# 2. Try the adapter (encrypted store + fallback)
adapter = _get_adapter()
if adapter is not None:
try:
value = adapter.get(credential_name)
if value:
return value
except Exception:
pass
return None
return resolve
@@ -23,7 +23,6 @@ from aden_tools.credentials import (
EMAIL_CREDENTIALS,
GITHUB_CREDENTIALS,
HUBSPOT_CREDENTIALS,
LLM_CREDENTIALS,
SEARCH_CREDENTIALS,
SLACK_CREDENTIALS,
)
@@ -206,7 +205,6 @@ class TestSpecsMergedIntoCredentialSpecs:
"""All category credential dicts must be merged into the global CREDENTIAL_SPECS."""
CATEGORY_DICTS = {
"LLM_CREDENTIALS": LLM_CREDENTIALS,
"SEARCH_CREDENTIALS": SEARCH_CREDENTIALS,
"EMAIL_CREDENTIALS": EMAIL_CREDENTIALS,
"GITHUB_CREDENTIALS": GITHUB_CREDENTIALS,
-12
View File
@@ -308,18 +308,6 @@ class TestCredentialSpecs:
assert spec.startup_required is False
assert "brave.com" in spec.help_url
def test_anthropic_spec_exists(self):
"""CREDENTIAL_SPECS includes anthropic with startup_required=True."""
assert "anthropic" in CREDENTIAL_SPECS
spec = CREDENTIAL_SPECS["anthropic"]
assert spec.env_var == "ANTHROPIC_API_KEY"
assert spec.tools == []
assert "event_loop" in spec.node_types
assert spec.required is False
assert spec.startup_required is False
assert "anthropic.com" in spec.help_url
class TestNodeTypeValidation:
"""Tests for node type credential validation."""
+15 -97
View File
@@ -6,7 +6,6 @@ import httpx
from aden_tools.credentials.health_check import (
HEALTH_CHECKERS,
AnthropicHealthChecker,
ApolloHealthChecker,
BrevoHealthChecker,
CalcomHealthChecker,
@@ -36,11 +35,6 @@ class TestHealthCheckerRegistry:
assert "google_search" in HEALTH_CHECKERS
assert isinstance(HEALTH_CHECKERS["google_search"], GoogleSearchHealthChecker)
def test_anthropic_registered(self):
"""AnthropicHealthChecker is registered in HEALTH_CHECKERS."""
assert "anthropic" in HEALTH_CHECKERS
assert isinstance(HEALTH_CHECKERS["anthropic"], AnthropicHealthChecker)
def test_github_registered(self):
"""GitHubHealthChecker is registered in HEALTH_CHECKERS."""
assert "github" in HEALTH_CHECKERS
@@ -74,29 +68,32 @@ class TestHealthCheckerRegistry:
def test_all_expected_checkers_registered(self):
"""All expected health checkers are in the registry."""
expected = {
"anthropic",
"hubspot",
"brave_search",
"google_search",
"google_maps",
"github",
"resend",
"google_calendar_oauth",
"google",
"slack",
"discord",
"stripe",
"exa_search",
"google_docs",
"calcom",
"serpapi",
"apify",
"apollo",
"asana",
"attio",
"brave_search",
"brevo",
"calcom",
"calendly_pat",
"discord",
"docker_hub",
"exa_search",
"finlight",
"github",
"gitlab_token",
"google",
"google_calendar_oauth",
"google_docs",
"google_maps",
"google_search",
"google_search_console",
"greenhouse_token",
"hubspot",
"huggingface",
"intercom",
"linear",
@@ -106,10 +103,6 @@ class TestHealthCheckerRegistry:
"notion_token",
"pinecone",
"pipedrive",
"resend",
"serpapi",
"slack",
"stripe",
"telegram",
"trello_key",
"trello_token",
@@ -120,81 +113,6 @@ class TestHealthCheckerRegistry:
assert set(HEALTH_CHECKERS.keys()) == expected
class TestAnthropicHealthChecker:
"""Tests for AnthropicHealthChecker."""
def _mock_response(self, status_code, json_data=None):
response = MagicMock(spec=httpx.Response)
response.status_code = status_code
if json_data:
response.json.return_value = json_data
return response
@patch("aden_tools.credentials.health_check.httpx.Client")
def test_valid_key_200(self, mock_client_cls):
mock_client = MagicMock()
mock_client_cls.return_value.__enter__ = MagicMock(return_value=mock_client)
mock_client_cls.return_value.__exit__ = MagicMock(return_value=False)
mock_client.post.return_value = self._mock_response(200)
checker = AnthropicHealthChecker()
result = checker.check("sk-ant-test-key")
assert result.valid is True
assert "valid" in result.message.lower()
@patch("aden_tools.credentials.health_check.httpx.Client")
def test_invalid_key_401(self, mock_client_cls):
mock_client = MagicMock()
mock_client_cls.return_value.__enter__ = MagicMock(return_value=mock_client)
mock_client_cls.return_value.__exit__ = MagicMock(return_value=False)
mock_client.post.return_value = self._mock_response(401)
checker = AnthropicHealthChecker()
result = checker.check("invalid-key")
assert result.valid is False
assert result.details["status_code"] == 401
@patch("aden_tools.credentials.health_check.httpx.Client")
def test_rate_limited_429(self, mock_client_cls):
mock_client = MagicMock()
mock_client_cls.return_value.__enter__ = MagicMock(return_value=mock_client)
mock_client_cls.return_value.__exit__ = MagicMock(return_value=False)
mock_client.post.return_value = self._mock_response(429)
checker = AnthropicHealthChecker()
result = checker.check("sk-ant-test-key")
assert result.valid is True
assert result.details.get("rate_limited") is True
@patch("aden_tools.credentials.health_check.httpx.Client")
def test_bad_request_400_still_valid(self, mock_client_cls):
mock_client = MagicMock()
mock_client_cls.return_value.__enter__ = MagicMock(return_value=mock_client)
mock_client_cls.return_value.__exit__ = MagicMock(return_value=False)
mock_client.post.return_value = self._mock_response(400)
checker = AnthropicHealthChecker()
result = checker.check("sk-ant-test-key")
assert result.valid is True
@patch("aden_tools.credentials.health_check.httpx.Client")
def test_timeout(self, mock_client_cls):
mock_client = MagicMock()
mock_client_cls.return_value.__enter__ = MagicMock(return_value=mock_client)
mock_client_cls.return_value.__exit__ = MagicMock(return_value=False)
mock_client.post.side_effect = httpx.TimeoutException("timed out")
checker = AnthropicHealthChecker()
result = checker.check("sk-ant-test-key")
assert result.valid is False
assert result.details["error"] == "timeout"
class TestGitHubHealthChecker:
"""Tests for GitHubHealthChecker."""
+197
View File
@@ -0,0 +1,197 @@
"""Live integration tests for credential health checkers.
These tests make REAL API calls. They are gated behind the ``live`` marker
and never run in CI. Run them manually::
pytest -m live -s --log-cli-level=INFO # all live tests
pytest -m live -k anthropic -s # just anthropic
pytest -m live -k "not google" -s # skip google variants
pytest -m live --tb=short -q # quick summary
Prerequisites:
- Credentials available via env vars or ~/.hive/credentials/ encrypted store
- Tests skip gracefully when credentials are unavailable
- Rate-limited responses (429) are treated as PASS (credential is valid)
"""
from __future__ import annotations
import logging
import pytest
from aden_tools.credentials import CREDENTIAL_SPECS
from aden_tools.credentials.health_check import (
HEALTH_CHECKERS,
check_credential_health,
validate_integration_wiring,
)
logger = logging.getLogger(__name__)
# All credential names that have registered health checkers
CHECKER_NAMES = sorted(HEALTH_CHECKERS.keys())
def _redact(value: str) -> str:
"""Redact a credential for safe logging."""
if len(value) <= 8:
return "****"
return f"{value[:4]}...{value[-2:]}"
# ---------------------------------------------------------------------------
# 1. Direct checker tests
# ---------------------------------------------------------------------------
@pytest.mark.live
class TestLiveHealthCheckers:
"""Call each health checker against the real API."""
@pytest.mark.parametrize("credential_name", CHECKER_NAMES, ids=CHECKER_NAMES)
def test_checker_returns_valid(self, credential_name, live_credential_resolver):
"""Health checker returns valid=True with a real credential."""
credential_value = live_credential_resolver(credential_name)
if credential_value is None:
spec = CREDENTIAL_SPECS.get(credential_name)
env_var = spec.env_var if spec else "???"
pytest.skip(f"No credential available ({env_var})")
checker = HEALTH_CHECKERS[credential_name]
result = checker.check(credential_value)
logger.info(
"Live check %s: valid=%s message=%r",
credential_name,
result.valid,
result.message,
)
assert result.valid is True, (
f"Health check for '{credential_name}' returned valid=False: "
f"{result.message} (details: {result.details})"
)
assert result.message
@pytest.mark.parametrize("credential_name", CHECKER_NAMES, ids=CHECKER_NAMES)
def test_checker_extracts_identity(self, credential_name, live_credential_resolver):
"""Identity metadata (when present) contains non-empty strings."""
credential_value = live_credential_resolver(credential_name)
if credential_value is None:
pytest.skip(f"No credential available for '{credential_name}'")
checker = HEALTH_CHECKERS[credential_name]
result = checker.check(credential_value)
assert result.valid is True, (
f"Cannot verify identity -- health check failed: {result.message}"
)
identity = result.details.get("identity", {})
if identity:
logger.info("Identity for %s: %s", credential_name, identity)
for key, value in identity.items():
assert isinstance(value, str), (
f"Identity key '{key}' is not a string: {type(value)}"
)
assert value, f"Identity key '{key}' is empty"
else:
logger.info("No identity metadata for %s (OK for some APIs)", credential_name)
# ---------------------------------------------------------------------------
# 2. Dispatcher path (check_credential_health)
# ---------------------------------------------------------------------------
@pytest.mark.live
class TestLiveDispatcher:
"""Verify the full check_credential_health() dispatch path."""
@pytest.mark.parametrize("credential_name", CHECKER_NAMES, ids=CHECKER_NAMES)
def test_dispatcher_returns_valid(self, credential_name, live_credential_resolver):
"""check_credential_health() returns valid=True via dispatcher."""
credential_value = live_credential_resolver(credential_name)
if credential_value is None:
pytest.skip(f"No credential available for '{credential_name}'")
result = check_credential_health(credential_name, credential_value)
logger.info(
"Dispatcher check %s: valid=%s message=%r",
credential_name,
result.valid,
result.message,
)
assert result.valid is True, (
f"Dispatcher check for '{credential_name}' returned valid=False: "
f"{result.message} (details: {result.details})"
)
# ---------------------------------------------------------------------------
# 3. Integration wiring verification
# ---------------------------------------------------------------------------
@pytest.mark.live
class TestLiveIntegrationWiring:
"""validate_integration_wiring() passes for every registered checker."""
@pytest.mark.parametrize("credential_name", CHECKER_NAMES, ids=CHECKER_NAMES)
def test_wiring_valid(self, credential_name):
"""No wiring issues for credentials with health checkers."""
issues = validate_integration_wiring(credential_name)
assert not issues, f"Wiring issues for '{credential_name}':\n" + "\n".join(
f" - {i}" for i in issues
)
# ---------------------------------------------------------------------------
# 4. Summary reporter
# ---------------------------------------------------------------------------
@pytest.mark.live
class TestLiveCredentialSummary:
"""Print a human-readable summary of tested vs skipped credentials."""
def test_credential_availability_summary(self, live_credential_resolver):
"""Report which credentials were available for live testing."""
available = []
skipped = []
for name in CHECKER_NAMES:
value = live_credential_resolver(name)
spec = CREDENTIAL_SPECS.get(name)
env_var = spec.env_var if spec else "???"
if value:
available.append((name, env_var))
else:
skipped.append((name, env_var))
lines = [
"",
"=" * 60,
"LIVE CREDENTIAL TEST SUMMARY",
"=" * 60,
f" Available: {len(available)} / {len(CHECKER_NAMES)}",
f" Skipped: {len(skipped)} / {len(CHECKER_NAMES)}",
"",
]
if available:
lines.append(" TESTED:")
for name, env_var in available:
lines.append(f" [PASS] {name} ({env_var})")
if skipped:
lines.append("")
lines.append(" SKIPPED (no credential):")
for name, env_var in skipped:
lines.append(f" [SKIP] {name} ({env_var})")
lines.append("=" * 60)
summary = "\n".join(lines)
logger.info(summary)
print(summary) # noqa: T201 -- visible with pytest -s
+143
View File
@@ -0,0 +1,143 @@
"""Tests for aden_tools.file_ops (shared file tools).
These tests cover Windows compatibility concerns: path relativization
in search_files (ripgrep and Python fallback) and cross-platform behavior.
"""
import os
from unittest.mock import patch
import pytest
from fastmcp import FastMCP
from aden_tools.file_ops import register_file_tools
@pytest.fixture
def file_ops_mcp(tmp_path):
"""Create FastMCP with file_ops registered, sandboxed to tmp_path."""
def resolve_path(p: str) -> str:
if os.path.isabs(p):
return os.path.normpath(p)
return str((tmp_path / p).resolve())
mcp = FastMCP("test-file-ops")
register_file_tools(
mcp,
resolve_path=resolve_path,
project_root=str(tmp_path),
)
return mcp
def _get_tool_fn(mcp, name):
"""Extract the raw function for a registered tool."""
return mcp._tool_manager._tools[name].fn
class TestSearchFilesPathRelativization:
"""Tests for search_files path handling (Windows path separator fix)."""
def test_ripgrep_output_with_backslash_relativized(self, file_ops_mcp, tmp_path):
"""Ripgrep output with backslashes (Windows) relativized when project_root set.
Simulates: rg outputs 'C:\\Users\\...\\proj\\src\\foo.py:1:needle'
Expected: output should show 'src\\foo.py:1:needle' or 'src/foo.py:1:needle'
(relativized, not full path).
"""
# Create a file so the search has something to find
(tmp_path / "src").mkdir()
(tmp_path / "src" / "foo.py").write_text("needle\n")
project_root = str(tmp_path)
# Ripgrep on Windows outputs backslash-separated paths
# Format: path:line_num:content
rg_output = f"{project_root}{os.sep}src{os.sep}foo.py:1:needle"
search_fn = _get_tool_fn(file_ops_mcp, "search_files")
with patch("aden_tools.file_ops.subprocess.run") as mock_run:
mock_run.return_value = type(
"Result", (), {"returncode": 0, "stdout": rg_output, "stderr": ""}
)()
result = search_fn(
pattern="needle",
path=str(tmp_path),
)
# Output should be relativized (no full project_root in the line)
assert project_root not in result, (
f"Output should not contain full project_root. Got: {result!r}"
)
# Should contain the relative path part
assert "foo.py" in result
assert "1:" in result or ":1:" in result
def test_ripgrep_output_with_forward_slash_relativized(self, file_ops_mcp, tmp_path):
"""Ripgrep output using forward slashes (Unix/rg default) should be relativized."""
(tmp_path / "src").mkdir()
(tmp_path / "src" / "bar.py").write_text("pattern_match\n")
project_root = str(tmp_path)
# Some ripgrep builds output forward slashes even on Windows
rg_output = f"{project_root}/src/bar.py:1:pattern_match"
search_fn = _get_tool_fn(file_ops_mcp, "search_files")
with patch("aden_tools.file_ops.subprocess.run") as mock_run:
mock_run.return_value = type(
"Result", (), {"returncode": 0, "stdout": rg_output, "stderr": ""}
)()
result = search_fn(
pattern="pattern_match",
path=str(tmp_path),
)
assert project_root not in result or "src/bar.py" in result
assert "bar.py" in result
def test_python_fallback_relativizes_paths(self, file_ops_mcp, tmp_path):
"""Python fallback (no ripgrep) uses os.path.relpath - should work on all platforms."""
(tmp_path / "subdir").mkdir()
(tmp_path / "subdir" / "baz.txt").write_text("find_me\n")
search_fn = _get_tool_fn(file_ops_mcp, "search_files")
# Ensure ripgrep is not used
with patch("aden_tools.file_ops.subprocess.run", side_effect=FileNotFoundError()):
result = search_fn(
pattern="find_me",
path=str(tmp_path),
)
# Python fallback uses os.path.relpath - should produce relative path
project_root = str(tmp_path)
assert project_root not in result or "subdir" in result
assert "baz.txt" in result
assert "1:" in result or ":1:" in result
class TestSearchFilesBasic:
"""Basic search_files behavior (no path mocking)."""
def test_search_finds_content(self, file_ops_mcp, tmp_path):
"""search_files finds matching content via Python fallback when rg absent."""
(tmp_path / "hello.txt").write_text("world\n")
search_fn = _get_tool_fn(file_ops_mcp, "search_files")
with patch("aden_tools.file_ops.subprocess.run", side_effect=FileNotFoundError()):
result = search_fn(pattern="world", path=str(tmp_path))
assert "world" in result
assert "hello.txt" in result
def test_search_nonexistent_dir_returns_error(self, file_ops_mcp, tmp_path):
"""search_files on non-existent directory returns error."""
search_fn = _get_tool_fn(file_ops_mcp, "search_files")
result = search_fn(pattern="x", path=str(tmp_path / "nonexistent"))
assert "Error" in result
assert "not found" in result.lower()
+459
View File
@@ -0,0 +1,459 @@
"""Tests for hashline support in file_ops (coder tools)."""
import json
import os
import pytest
from fastmcp import FastMCP
from aden_tools.hashline import compute_line_hash
def _anchor(line_num, line_text):
"""Build an anchor string N:hhhh."""
return f"{line_num}:{compute_line_hash(line_text)}"
@pytest.fixture
def tools(tmp_path):
"""Register file_ops tools with tmp_path as project root."""
from aden_tools.file_ops import register_file_tools
mcp = FastMCP("test-server")
write_calls = []
def _resolve(p):
return str(tmp_path / p)
def _before_write():
write_calls.append(1)
register_file_tools(
mcp,
resolve_path=_resolve,
before_write=_before_write,
project_root=str(tmp_path),
)
tool_map = {name: t.fn for name, t in mcp._tool_manager._tools.items()}
return tool_map, write_calls
# ── read_file hashline ────────────────────────────────────────────────────
class TestReadFileHashline:
def test_hashline_format(self, tools, tmp_path):
"""hashline=True returns N:hhhh|content format."""
read_file = tools[0]["read_file"]
(tmp_path / "f.txt").write_text("hello\nworld\n")
result = read_file(path="f.txt", hashline=True)
lines = result.strip().split("\n")
# First two lines should be hashline formatted
h1 = compute_line_hash("hello")
h2 = compute_line_hash("world")
assert lines[0] == f"1:{h1}|hello"
assert lines[1] == f"2:{h2}|world"
def test_hashline_false_unchanged(self, tools, tmp_path):
"""Default (hashline=False) returns standard line-number format."""
read_file = tools[0]["read_file"]
(tmp_path / "f.txt").write_text("hello\n")
result = read_file(path="f.txt", hashline=False)
# Standard format uses tab-separated line numbers
assert "\t" in result
assert "hello" in result
def test_hashline_offset_limit(self, tools, tmp_path):
"""offset and limit work in hashline mode."""
read_file = tools[0]["read_file"]
lines = [f"line{i}" for i in range(1, 11)]
(tmp_path / "f.txt").write_text("\n".join(lines) + "\n")
result = read_file(path="f.txt", offset=3, limit=2, hashline=True)
output_lines = [ln for ln in result.split("\n") if ln and not ln.startswith("(")]
assert len(output_lines) == 2
h3 = compute_line_hash("line3")
assert output_lines[0] == f"3:{h3}|line3"
def test_hashline_no_line_truncation(self, tools, tmp_path):
"""hashline mode doesn't truncate long lines (would corrupt hashes)."""
read_file = tools[0]["read_file"]
long_line = "x" * 3000
(tmp_path / "f.txt").write_text(long_line + "\n")
result = read_file(path="f.txt", hashline=True)
h = compute_line_hash(long_line)
assert f"1:{h}|{long_line}" in result
# ── search_files hashline ─────────────────────────────────────────────────
class TestSearchFilesHashline:
def test_hashline_in_results(self, tools, tmp_path):
"""hashline=True adds hash anchors to search results."""
search_files = tools[0]["search_files"]
(tmp_path / "f.py").write_text("def foo():\n pass\n")
result = search_files(pattern="def foo", path=".", hashline=True)
# Result should contain hash anchor
h = compute_line_hash("def foo():")
assert h in result
assert f":{h}|" in result
def test_hashline_false_unchanged(self, tools, tmp_path):
"""Default search has no hash anchors."""
search_files = tools[0]["search_files"]
(tmp_path / "f.py").write_text("def foo():\n pass\n")
result = search_files(pattern="def foo", path=".", hashline=False)
h = compute_line_hash("def foo():")
assert f":{h}|" not in result
# ── hashline_edit ─────────────────────────────────────────────────────────
class TestHashlineEditBasic:
def test_returns_string(self, tools, tmp_path):
"""hashline_edit returns a string, not a dict."""
hashline_edit = tools[0]["hashline_edit"]
f = tmp_path / "f.txt"
f.write_text("aaa\nbbb\nccc\n")
edits = json.dumps([{"op": "set_line", "anchor": _anchor(2, "bbb"), "content": "BBB"}])
result = hashline_edit(path="f.txt", edits=edits)
assert isinstance(result, str)
assert "Applied" in result
def test_calls_before_write(self, tools, tmp_path):
"""hashline_edit calls the before_write hook."""
hashline_edit = tools[0]["hashline_edit"]
write_calls = tools[1]
f = tmp_path / "f.txt"
f.write_text("aaa\nbbb\nccc\n")
edits = json.dumps([{"op": "set_line", "anchor": _anchor(2, "bbb"), "content": "BBB"}])
hashline_edit(path="f.txt", edits=edits)
assert len(write_calls) == 1
def test_invalid_json(self, tools, tmp_path):
"""Invalid JSON returns error string."""
hashline_edit = tools[0]["hashline_edit"]
(tmp_path / "f.txt").write_text("aaa\n")
result = hashline_edit(path="f.txt", edits="not json")
assert "Error" in result
assert "Invalid JSON" in result
def test_empty_edits(self, tools, tmp_path):
"""Empty edits array returns error."""
hashline_edit = tools[0]["hashline_edit"]
(tmp_path / "f.txt").write_text("aaa\n")
result = hashline_edit(path="f.txt", edits="[]")
assert "Error" in result
assert "empty" in result
def test_file_not_found(self, tools, tmp_path):
"""Missing file returns error."""
hashline_edit = tools[0]["hashline_edit"]
edits = json.dumps([{"op": "set_line", "anchor": "1:abcd", "content": "x"}])
result = hashline_edit(path="nope.txt", edits=edits)
assert "Error" in result
assert "not found" in result
class TestHashlineEditSetLine:
def test_set_line(self, tools, tmp_path):
"""set_line replaces a single line."""
hashline_edit = tools[0]["hashline_edit"]
f = tmp_path / "f.txt"
f.write_text("aaa\nbbb\nccc\n")
edits = json.dumps([{"op": "set_line", "anchor": _anchor(2, "bbb"), "content": "BBB"}])
result = hashline_edit(path="f.txt", edits=edits)
assert "Applied 1 edit" in result
assert f.read_text() == "aaa\nBBB\nccc\n"
def test_set_line_hash_mismatch(self, tools, tmp_path):
"""set_line with wrong hash returns error."""
hashline_edit = tools[0]["hashline_edit"]
f = tmp_path / "f.txt"
f.write_text("aaa\nbbb\nccc\n")
edits = json.dumps([{"op": "set_line", "anchor": "2:ffff", "content": "BBB"}])
result = hashline_edit(path="f.txt", edits=edits)
assert "Error" in result
assert "mismatch" in result.lower()
def test_set_line_delete(self, tools, tmp_path):
"""set_line with empty content deletes the line."""
hashline_edit = tools[0]["hashline_edit"]
f = tmp_path / "f.txt"
f.write_text("aaa\nbbb\nccc\n")
edits = json.dumps([{"op": "set_line", "anchor": _anchor(2, "bbb"), "content": ""}])
result = hashline_edit(path="f.txt", edits=edits)
assert "Applied 1 edit" in result
assert f.read_text() == "aaa\nccc\n"
class TestHashlineEditReplaceLines:
def test_replace_lines(self, tools, tmp_path):
"""replace_lines replaces a range."""
hashline_edit = tools[0]["hashline_edit"]
f = tmp_path / "f.txt"
f.write_text("aaa\nbbb\nccc\nddd\n")
edits = json.dumps(
[
{
"op": "replace_lines",
"start_anchor": _anchor(2, "bbb"),
"end_anchor": _anchor(3, "ccc"),
"content": "XXX\nYYY\nZZZ",
}
]
)
result = hashline_edit(path="f.txt", edits=edits)
assert "Applied 1 edit" in result
assert f.read_text() == "aaa\nXXX\nYYY\nZZZ\nddd\n"
class TestHashlineEditInsert:
def test_insert_after(self, tools, tmp_path):
"""insert_after adds lines after the anchor."""
hashline_edit = tools[0]["hashline_edit"]
f = tmp_path / "f.txt"
f.write_text("aaa\nbbb\nccc\n")
edits = json.dumps(
[
{
"op": "insert_after",
"anchor": _anchor(1, "aaa"),
"content": "NEW",
}
]
)
result = hashline_edit(path="f.txt", edits=edits)
assert "Applied 1 edit" in result
assert f.read_text() == "aaa\nNEW\nbbb\nccc\n"
def test_insert_before(self, tools, tmp_path):
"""insert_before adds lines before the anchor."""
hashline_edit = tools[0]["hashline_edit"]
f = tmp_path / "f.txt"
f.write_text("aaa\nbbb\nccc\n")
edits = json.dumps(
[
{
"op": "insert_before",
"anchor": _anchor(2, "bbb"),
"content": "NEW",
}
]
)
result = hashline_edit(path="f.txt", edits=edits)
assert "Applied 1 edit" in result
assert f.read_text() == "aaa\nNEW\nbbb\nccc\n"
class TestHashlineEditReplace:
def test_replace(self, tools, tmp_path):
"""replace does string replacement."""
hashline_edit = tools[0]["hashline_edit"]
f = tmp_path / "f.txt"
f.write_text("aaa\nbbb\nccc\n")
edits = json.dumps(
[
{
"op": "replace",
"old_content": "bbb",
"new_content": "BBB",
}
]
)
result = hashline_edit(path="f.txt", edits=edits)
assert "Applied 1 edit" in result
assert f.read_text() == "aaa\nBBB\nccc\n"
def test_replace_not_found(self, tools, tmp_path):
"""replace with missing old_content returns error."""
hashline_edit = tools[0]["hashline_edit"]
f = tmp_path / "f.txt"
f.write_text("aaa\nbbb\nccc\n")
edits = json.dumps(
[
{
"op": "replace",
"old_content": "zzz",
"new_content": "ZZZ",
}
]
)
result = hashline_edit(path="f.txt", edits=edits)
assert "Error" in result
assert "not found" in result
class TestHashlineEditAppend:
def test_append(self, tools, tmp_path):
"""append adds content at end of file."""
hashline_edit = tools[0]["hashline_edit"]
f = tmp_path / "f.txt"
f.write_text("aaa\nbbb\n")
edits = json.dumps([{"op": "append", "content": "ccc\nddd"}])
result = hashline_edit(path="f.txt", edits=edits)
assert "Applied 1 edit" in result
assert f.read_text() == "aaa\nbbb\nccc\nddd\n"
class TestHashlineEditOverlap:
def test_overlapping_edits_rejected(self, tools, tmp_path):
"""Overlapping splice ranges are rejected."""
hashline_edit = tools[0]["hashline_edit"]
f = tmp_path / "f.txt"
f.write_text("aaa\nbbb\nccc\nddd\n")
edits = json.dumps(
[
{"op": "set_line", "anchor": _anchor(2, "bbb"), "content": "BBB"},
{
"op": "replace_lines",
"start_anchor": _anchor(1, "aaa"),
"end_anchor": _anchor(3, "ccc"),
"content": "XXX",
},
]
)
result = hashline_edit(path="f.txt", edits=edits)
assert "Error" in result
assert "Overlapping" in result
class TestHashlineEditAutoCleanup:
def test_strips_hashline_prefix_multiline(self, tools, tmp_path):
"""auto_cleanup strips N:hhhh| prefixes from multi-line content."""
hashline_edit = tools[0]["hashline_edit"]
f = tmp_path / "f.txt"
f.write_text("aaa\nbbb\nccc\nddd\n")
h_bbb = compute_line_hash("bbb")
h_ccc = compute_line_hash("ccc")
# LLM echoes hashline prefixes in replace_lines content
edits = json.dumps(
[
{
"op": "replace_lines",
"start_anchor": _anchor(2, "bbb"),
"end_anchor": _anchor(3, "ccc"),
"content": f"2:{h_bbb}|BBB\n3:{h_ccc}|CCC",
}
]
)
result = hashline_edit(path="f.txt", edits=edits)
assert "Applied 1 edit" in result
# Should have stripped the prefixes
assert f.read_text() == "aaa\nBBB\nCCC\nddd\n"
assert "cleanup" in result.lower()
def test_no_cleanup_when_disabled(self, tools, tmp_path):
"""auto_cleanup=False writes content as-is."""
hashline_edit = tools[0]["hashline_edit"]
f = tmp_path / "f.txt"
f.write_text("aaa\nbbb\nccc\n")
h = compute_line_hash("bbb")
raw_content = f"2:{h}|BBB"
edits = json.dumps(
[
{
"op": "set_line",
"anchor": _anchor(2, "bbb"),
"content": raw_content,
}
]
)
result = hashline_edit(path="f.txt", edits=edits, auto_cleanup=False)
assert "Applied 1 edit" in result
assert f.read_text() == f"aaa\n{raw_content}\nccc\n"
class TestHashlineEditAtomicWrite:
def test_preserves_permissions(self, tools, tmp_path):
"""Atomic write preserves original file permissions."""
hashline_edit = tools[0]["hashline_edit"]
f = tmp_path / "f.txt"
f.write_text("aaa\nbbb\n")
os.chmod(f, 0o755)
edits = json.dumps([{"op": "set_line", "anchor": _anchor(1, "aaa"), "content": "AAA"}])
hashline_edit(path="f.txt", edits=edits)
assert os.stat(f).st_mode & 0o777 == 0o755
def test_preserves_trailing_newline(self, tools, tmp_path):
"""Files with trailing newline keep it after edit."""
hashline_edit = tools[0]["hashline_edit"]
f = tmp_path / "f.txt"
f.write_text("aaa\nbbb\n")
edits = json.dumps([{"op": "set_line", "anchor": _anchor(1, "aaa"), "content": "AAA"}])
hashline_edit(path="f.txt", edits=edits)
assert f.read_text().endswith("\n")
def test_unknown_op(self, tools, tmp_path):
"""Unknown op returns error."""
hashline_edit = tools[0]["hashline_edit"]
f = tmp_path / "f.txt"
f.write_text("aaa\n")
edits = json.dumps([{"op": "delete_line", "anchor": "1:abcd"}])
result = hashline_edit(path="f.txt", edits=edits)
assert "Error" in result
assert "unknown op" in result
def test_crlf_replace_op_no_double_conversion(self, tools, tmp_path):
"""Replace op on a CRLF file should not corrupt \\r\\n in new_content."""
hashline_edit = tools[0]["hashline_edit"]
f = tmp_path / "f.txt"
f.write_bytes(b"aaa\r\nbbb\r\nccc\r\n")
edits = json.dumps([{"op": "replace", "old_content": "aaa", "new_content": "x\r\ny"}])
result = hashline_edit(path="f.txt", edits=edits)
assert "Error" not in result
raw = f.read_bytes()
assert b"\r\r\n" not in raw
assert raw == b"x\r\ny\r\nbbb\r\nccc\r\n"
class TestHashlineEditResponseFormat:
def test_shows_updated_content(self, tools, tmp_path):
"""Response includes updated hashline content."""
hashline_edit = tools[0]["hashline_edit"]
f = tmp_path / "f.txt"
f.write_text("aaa\nbbb\nccc\n")
edits = json.dumps([{"op": "set_line", "anchor": _anchor(2, "bbb"), "content": "BBB"}])
result = hashline_edit(path="f.txt", edits=edits)
# Should show updated content in hashline format
h_new = compute_line_hash("BBB")
assert f"2:{h_new}|BBB" in result
def test_pagination_hint_for_large_files(self, tools, tmp_path):
"""Response includes pagination hint when file > 200 lines."""
hashline_edit = tools[0]["hashline_edit"]
f = tmp_path / "f.txt"
lines = [f"line{i}" for i in range(300)]
f.write_text("\n".join(lines) + "\n")
edits = json.dumps([{"op": "set_line", "anchor": _anchor(1, "line0"), "content": "FIRST"}])
result = hashline_edit(path="f.txt", edits=edits)
assert "Showing first 200" in result
assert "300 lines" in result
+336 -1
View File
@@ -1,5 +1,6 @@
"""Tests for file_system_toolkits tools (FastMCP)."""
import json
import os
from unittest.mock import patch
@@ -70,7 +71,11 @@ def mock_secure_path(tmp_path):
"aden_tools.tools.file_system_toolkits.execute_command_tool.execute_command_tool.WORKSPACES_DIR",
str(tmp_path),
):
yield
with patch(
"aden_tools.tools.file_system_toolkits.hashline_edit.hashline_edit.get_secure_path",
side_effect=_get_secure_path,
):
yield
class TestViewFileTool:
@@ -203,6 +208,42 @@ class TestViewFileTool:
assert "error" in result
assert "Failed to read file" in result["error"]
def test_offset_without_hashline_returns_error(
self, view_file_fn, mock_workspace, mock_secure_path, tmp_path
):
"""Passing offset without hashline=True returns error."""
test_file = tmp_path / "test.txt"
test_file.write_text("aaa\nbbb\nccc\n")
result = view_file_fn(path="test.txt", offset=5, **mock_workspace)
assert "error" in result
assert "hashline=True" in result["error"]
def test_limit_without_hashline_returns_error(
self, view_file_fn, mock_workspace, mock_secure_path, tmp_path
):
"""Passing limit without hashline=True returns error."""
test_file = tmp_path / "test.txt"
test_file.write_text("aaa\nbbb\nccc\n")
result = view_file_fn(path="test.txt", limit=10, **mock_workspace)
assert "error" in result
assert "hashline=True" in result["error"]
def test_offset_and_limit_without_hashline_returns_error(
self, view_file_fn, mock_workspace, mock_secure_path, tmp_path
):
"""Passing both offset and limit without hashline=True returns error."""
test_file = tmp_path / "test.txt"
test_file.write_text("aaa\nbbb\nccc\n")
result = view_file_fn(path="test.txt", offset=2, limit=5, **mock_workspace)
assert "error" in result
assert "hashline=True" in result["error"]
class TestWriteToFileTool:
"""Tests for write_to_file tool."""
@@ -762,3 +803,297 @@ class TestApplyPatchTool:
assert result["success"] is True
assert result["all_successful"] is True
assert test_file.read_text(encoding="utf-8") == modified
class TestViewFileHashlineMode:
"""Tests for view_file hashline mode."""
@pytest.fixture
def view_file_fn(self, mcp):
from aden_tools.tools.file_system_toolkits.view_file import register_tools
register_tools(mcp)
return mcp._tool_manager._tools["view_file"].fn
def test_hashline_format(self, view_file_fn, mock_workspace, mock_secure_path, tmp_path):
"""hashline=True returns N:hhhh|content format."""
test_file = tmp_path / "test.txt"
test_file.write_text("hello\nworld\n")
result = view_file_fn(path="test.txt", hashline=True, **mock_workspace)
assert result["success"] is True
assert result["hashline"] is True
lines = result["content"].split("\n")
assert lines[0].startswith("1:")
assert "|hello" in lines[0]
assert lines[1].startswith("2:")
assert "|world" in lines[1]
def test_hashline_offset(self, view_file_fn, mock_workspace, mock_secure_path, tmp_path):
"""hashline with offset skips initial lines."""
test_file = tmp_path / "test.txt"
test_file.write_text("aaa\nbbb\nccc\n")
result = view_file_fn(path="test.txt", hashline=True, offset=2, **mock_workspace)
assert result["success"] is True
assert result["offset"] == 2
lines = result["content"].split("\n")
assert lines[0].startswith("2:")
assert "|bbb" in lines[0]
def test_hashline_limit(self, view_file_fn, mock_workspace, mock_secure_path, tmp_path):
"""hashline with limit restricts number of lines."""
test_file = tmp_path / "test.txt"
test_file.write_text("aaa\nbbb\nccc\nddd\n")
result = view_file_fn(path="test.txt", hashline=True, limit=2, **mock_workspace)
assert result["success"] is True
assert result["limit"] == 2
assert result["shown_lines"] == 2
assert result["total_lines"] == 4
def test_hashline_total_and_shown_lines(
self, view_file_fn, mock_workspace, mock_secure_path, tmp_path
):
"""total_lines and shown_lines are reported correctly."""
test_file = tmp_path / "test.txt"
test_file.write_text("a\nb\nc\nd\ne\n")
result = view_file_fn(path="test.txt", hashline=True, offset=2, limit=2, **mock_workspace)
assert result["total_lines"] == 5
assert result["shown_lines"] == 2
def test_default_mode_unchanged(self, view_file_fn, mock_workspace, mock_secure_path, tmp_path):
"""Default mode (hashline=False) returns the same format as before."""
test_file = tmp_path / "test.txt"
test_file.write_text("hello\n")
result = view_file_fn(path="test.txt", **mock_workspace)
assert result["success"] is True
assert "hashline" not in result
assert result["content"] == "hello\n"
assert result["lines"] == 1
def test_hashline_offset_zero_returns_error(
self, view_file_fn, mock_workspace, mock_secure_path, tmp_path
):
"""hashline with offset=0 returns error (must be >= 1)."""
test_file = tmp_path / "test.txt"
test_file.write_text("aaa\nbbb\n")
result = view_file_fn(path="test.txt", hashline=True, offset=0, **mock_workspace)
assert "error" in result
assert "offset" in result["error"].lower()
def test_hashline_negative_offset_returns_error(
self, view_file_fn, mock_workspace, mock_secure_path, tmp_path
):
"""hashline with negative offset returns error."""
test_file = tmp_path / "test.txt"
test_file.write_text("aaa\nbbb\n")
result = view_file_fn(path="test.txt", hashline=True, offset=-1, **mock_workspace)
assert "error" in result
assert "offset" in result["error"].lower()
def test_hashline_negative_limit_returns_error(
self, view_file_fn, mock_workspace, mock_secure_path, tmp_path
):
"""hashline with negative limit returns error."""
test_file = tmp_path / "test.txt"
test_file.write_text("aaa\nbbb\n")
result = view_file_fn(path="test.txt", hashline=True, limit=-1, **mock_workspace)
assert "error" in result
assert "limit" in result["error"].lower()
def test_hashline_truncated_file_returns_error(
self, view_file_fn, mock_workspace, mock_secure_path, tmp_path
):
"""Large file with hashline=True and no offset/limit returns error directing to paginate."""
test_file = tmp_path / "large.txt"
# Create a file larger than the max_size we'll pass
content = "line\n" * 100 # 500 bytes
test_file.write_text(content)
result = view_file_fn(path="large.txt", hashline=True, max_size=50, **mock_workspace)
assert "error" in result
assert "too large" in result["error"].lower()
assert "offset" in result["error"].lower()
assert "limit" in result["error"].lower()
def test_hashline_offset_beyond_end_returns_error(
self, view_file_fn, mock_workspace, mock_secure_path, tmp_path
):
"""hashline with offset beyond total lines returns error."""
test_file = tmp_path / "test.txt"
test_file.write_text("aaa\nbbb\n")
result = view_file_fn(path="test.txt", hashline=True, offset=50, **mock_workspace)
assert "error" in result
assert "beyond" in result["error"].lower()
assert "2 lines" in result["error"]
def test_hashline_large_file_with_offset_limit_works(
self, view_file_fn, mock_workspace, mock_secure_path, tmp_path
):
"""Large file using offset/limit bypasses full-file size check."""
test_file = tmp_path / "large.txt"
lines = [f"line {i}" for i in range(1, 101)]
test_file.write_text("\n".join(lines) + "\n")
# File is large (> max_size=200), but offset/limit lets us page through it
result = view_file_fn(
path="large.txt", hashline=True, offset=10, limit=5, max_size=200, **mock_workspace
)
assert result["success"] is True
assert result["shown_lines"] == 5
assert result["total_lines"] == 100
# First shown line should be line 10
first_line = result["content"].split("\n")[0]
assert first_line.startswith("10:")
assert "|line 10" in first_line
class TestGrepSearchHashlineMode:
"""Tests for grep_search hashline mode."""
@pytest.fixture
def grep_search_fn(self, mcp):
from aden_tools.tools.file_system_toolkits.grep_search import register_tools
register_tools(mcp)
return mcp._tool_manager._tools["grep_search"].fn
def test_hashline_anchor_present(
self, grep_search_fn, mock_workspace, mock_secure_path, tmp_path
):
"""hashline=True includes anchor field in matches."""
test_file = tmp_path / "test.txt"
test_file.write_text("hello world\ngoodbye world\n")
result = grep_search_fn(path="test.txt", pattern="hello", hashline=True, **mock_workspace)
assert result["success"] is True
assert result["total_matches"] == 1
match = result["matches"][0]
assert "anchor" in match
# Anchor format: N:hhhh (4-char hash)
assert match["anchor"].startswith("1:")
assert len(match["anchor"]) == 6 # "1:hhhh"
def test_hashline_anchor_absent_by_default(
self, grep_search_fn, mock_workspace, mock_secure_path, tmp_path
):
"""hashline=False (default) does not include anchor field."""
test_file = tmp_path / "test.txt"
test_file.write_text("hello world\n")
result = grep_search_fn(path="test.txt", pattern="hello", **mock_workspace)
assert result["success"] is True
assert result["total_matches"] == 1
assert "anchor" not in result["matches"][0]
def test_grep_hashline_preserves_indentation(
self, grep_search_fn, mock_workspace, mock_secure_path, tmp_path
):
"""hashline=True preserves leading whitespace in line_content."""
test_file = tmp_path / "test.txt"
test_file.write_text(" hello world\n")
result = grep_search_fn(path="test.txt", pattern="hello", hashline=True, **mock_workspace)
assert result["success"] is True
assert result["total_matches"] == 1
assert result["matches"][0]["line_content"] == " hello world"
def test_hashline_skips_large_files_with_notice(
self, grep_search_fn, mock_workspace, mock_secure_path, tmp_path
):
"""hashline=True skips files > 10MB and reports them in the response."""
search_dir = tmp_path / "search_dir"
search_dir.mkdir()
small_file = search_dir / "small.txt"
small_file.write_text("hello world\n")
large_file = search_dir / "large.txt"
# Write just over 10MB
large_file.write_bytes(b"hello large\n" * (1024 * 1024))
result = grep_search_fn(
path="search_dir", pattern="hello", hashline=True, recursive=True, **mock_workspace
)
assert result["success"] is True
assert "skipped_large_files" in result
assert any("large.txt" in f for f in result["skipped_large_files"])
# Small file should still have matches
assert result["total_matches"] >= 1
class TestHashlineCrossToolConsistency:
"""Cross-tool consistency tests for hashline workflows."""
@pytest.fixture
def view_file_fn(self, mcp):
from aden_tools.tools.file_system_toolkits.view_file import register_tools
register_tools(mcp)
return mcp._tool_manager._tools["view_file"].fn
@pytest.fixture
def grep_search_fn(self, mcp):
from aden_tools.tools.file_system_toolkits.grep_search import register_tools
register_tools(mcp)
return mcp._tool_manager._tools["grep_search"].fn
@pytest.fixture
def hashline_edit_fn(self, mcp):
from aden_tools.tools.file_system_toolkits.hashline_edit import register_tools
register_tools(mcp)
return mcp._tool_manager._tools["hashline_edit"].fn
def test_unicode_line_separator_anchor_roundtrip(
self,
view_file_fn,
grep_search_fn,
hashline_edit_fn,
mock_workspace,
mock_secure_path,
tmp_path,
):
"""Anchors from grep hashline mode should be consumable by hashline_edit."""
test_file = tmp_path / "test.txt"
test_file.write_text("A\u2028B\nC\n", encoding="utf-8")
# Hashline view sees U+2028 as a line boundary via splitlines()
view_res = view_file_fn(path="test.txt", hashline=True, **mock_workspace)
assert view_res["success"] is True
assert view_res["total_lines"] == 3
# grep_search line iteration treats U+2028 as in-line content
grep_res = grep_search_fn(path="test.txt", pattern="B", hashline=True, **mock_workspace)
assert grep_res["success"] is True
assert grep_res["total_matches"] == 1
anchor = grep_res["matches"][0]["anchor"]
edits = json.dumps([{"op": "set_line", "anchor": anchor, "content": "X"}])
edit_res = hashline_edit_fn(path="test.txt", edits=edits, **mock_workspace)
assert "error" not in edit_res, edit_res.get("error")
assert edit_res["success"] is True
+186
View File
@@ -0,0 +1,186 @@
"""Unit tests for the hashline utility module."""
import pytest
from aden_tools.hashline import (
compute_line_hash,
format_hashlines,
parse_anchor,
validate_anchor,
)
class TestComputeLineHash:
"""Tests for compute_line_hash."""
def test_basic_output_format(self):
"""Hash is a 4-char lowercase hex string."""
h = compute_line_hash("hello world")
assert len(h) == 4
assert all(c in "0123456789abcdef" for c in h)
def test_space_stripping(self):
"""Trailing spaces are stripped before hashing."""
assert compute_line_hash("hello ") == compute_line_hash("hello")
assert compute_line_hash(" hello") != compute_line_hash("hello")
def test_tab_stripping(self):
"""Trailing tabs are stripped before hashing."""
assert compute_line_hash("hello\t") == compute_line_hash("hello")
assert compute_line_hash("\thello") != compute_line_hash("hello")
def test_empty_line(self):
"""Empty line produces a valid 4-char hash."""
h = compute_line_hash("")
assert len(h) == 4
assert all(c in "0123456789abcdef" for c in h)
def test_different_lines_different_hashes(self):
"""Different lines produce different hashes (most of the time)."""
h1 = compute_line_hash("def foo():")
h2 = compute_line_hash("def bar():")
# These specific strings should produce different hashes
assert h1 != h2
def test_whitespace_only_equals_empty(self):
"""A line of only spaces/tabs hashes the same as empty."""
assert compute_line_hash(" \t ") == compute_line_hash("")
def test_formatter_resilience(self):
"""Trailing whitespace-only variants stay stable across formatting noise."""
assert compute_line_hash("if x:") == compute_line_hash("if x: ")
assert compute_line_hash("return 0") == compute_line_hash("return 0\t\t")
def test_leading_whitespace_changes_hash(self):
"""Leading whitespace changes the hash (indentation is semantic)."""
assert compute_line_hash(" x") != compute_line_hash(" x")
def test_trailing_whitespace_ignored(self):
"""Trailing spaces are ignored in hashing."""
assert compute_line_hash("x ") == compute_line_hash("x")
class TestFormatHashlines:
"""Tests for format_hashlines."""
def test_basic_format(self):
"""Lines are formatted as N:hhhh|content."""
lines = ["hello", "world"]
result = format_hashlines(lines)
output_lines = result.split("\n")
assert len(output_lines) == 2
# Check format: N:hhhh|content
assert output_lines[0].startswith("1:")
assert "|hello" in output_lines[0]
assert output_lines[1].startswith("2:")
assert "|world" in output_lines[1]
def test_offset(self):
"""Offset skips initial lines."""
lines = ["a", "b", "c", "d"]
result = format_hashlines(lines, offset=3)
output_lines = result.split("\n")
assert len(output_lines) == 2
assert output_lines[0].startswith("3:")
assert "|c" in output_lines[0]
def test_limit(self):
"""Limit restricts number of lines returned."""
lines = ["a", "b", "c", "d"]
result = format_hashlines(lines, limit=2)
output_lines = result.split("\n")
assert len(output_lines) == 2
assert "|a" in output_lines[0]
assert "|b" in output_lines[1]
def test_offset_and_limit(self):
"""Offset and limit work together."""
lines = ["a", "b", "c", "d", "e"]
result = format_hashlines(lines, offset=2, limit=2)
output_lines = result.split("\n")
assert len(output_lines) == 2
assert output_lines[0].startswith("2:")
assert "|b" in output_lines[0]
assert output_lines[1].startswith("3:")
assert "|c" in output_lines[1]
def test_empty_input(self):
"""Empty input produces empty output."""
result = format_hashlines([])
assert result == ""
class TestParseAnchor:
"""Tests for parse_anchor."""
def test_valid_anchor(self):
"""Valid anchor is parsed correctly."""
line_num, hash_str = parse_anchor("5:a3b1")
assert line_num == 5
assert hash_str == "a3b1"
def test_valid_anchor_with_zeros(self):
"""Anchor with zero-padded hash works."""
line_num, hash_str = parse_anchor("1:0000")
assert line_num == 1
assert hash_str == "0000"
def test_no_colon(self):
"""Missing colon raises ValueError."""
with pytest.raises(ValueError, match="no colon"):
parse_anchor("5a3")
@pytest.mark.parametrize("bad_anchor", ["5:abc", "5:a", "5:abcd1234"])
def test_wrong_hash_length(self, bad_anchor):
"""Hash with wrong length raises ValueError."""
with pytest.raises(ValueError, match="4 chars"):
parse_anchor(bad_anchor)
def test_uppercase_hash(self):
"""Uppercase hex raises ValueError."""
with pytest.raises(ValueError, match="lowercase hex"):
parse_anchor("5:A3B1")
def test_non_hex_hash(self):
"""Non-hex chars in hash raises ValueError."""
with pytest.raises(ValueError, match="lowercase hex"):
parse_anchor("5:zzxx")
def test_non_integer_line(self):
"""Non-integer line number raises ValueError."""
with pytest.raises(ValueError, match="not an integer"):
parse_anchor("abc:a3b1")
class TestValidateAnchor:
"""Tests for validate_anchor."""
def test_valid_match(self):
"""Valid anchor returns None."""
lines = ["hello", "world"]
h = compute_line_hash("hello")
assert validate_anchor(f"1:{h}", lines) is None
def test_hash_mismatch(self):
"""Mismatched hash returns error with re-read hint and current content."""
lines = ["hello", "world"]
err = validate_anchor("1:ffff", lines)
assert err is not None
assert "mismatch" in err.lower()
assert "re-read" in err.lower()
assert "hello" in err
@pytest.mark.parametrize("anchor", ["5:abcd", "0:0000"])
def test_out_of_range(self, anchor):
"""Line number beyond file length or zero returns error."""
lines = ["hello"]
err = validate_anchor(anchor, lines)
assert err is not None
assert "out of range" in err.lower()
def test_invalid_format(self):
"""Invalid anchor format returns error."""
lines = ["hello"]
err = validate_anchor("bad", lines)
assert err is not None
assert "no colon" in err.lower()
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,114 @@
"""Tests for run_command PYTHONPATH handling (Windows compatibility).
On Windows, PYTHONPATH must use semicolon (;) as separator, not colon (:).
These tests verify the correct behavior. They are Windows-only because
the bug only manifests on Windows.
"""
import os
import subprocess
import sys
import pytest
# Skip entire module on non-Windows (tests will pass when fixes are applied)
pytestmark = pytest.mark.skipif(
sys.platform != "win32",
reason="Windows-only: PYTHONPATH separator behavior",
)
def _build_pythonpath_buggy(project_root: str) -> str:
"""Replicate current (buggy) PYTHONPATH construction in run_command."""
return f"{project_root}/core:{project_root}/exports:{project_root}/core/framework/agents"
def _build_pythonpath_fixed(project_root: str) -> str:
"""Correct PYTHONPATH construction using os.pathsep."""
return os.pathsep.join(
[
os.path.join(project_root, "core"),
os.path.join(project_root, "exports"),
os.path.join(project_root, "core", "framework", "agents"),
]
)
class TestPythonpathSeparatorWindows:
"""Verify PYTHONPATH uses correct separator on Windows."""
def test_pythonpath_with_semicolons_parses_multiple_paths(self, tmp_path):
"""PYTHONPATH built with os.pathsep allows Python to find modules in multiple dirs."""
# Create two dirs, each with a module
core_dir = tmp_path / "core"
core_dir.mkdir()
(core_dir / "mod_a.py").write_text("x = 1\n")
exports_dir = tmp_path / "exports"
exports_dir.mkdir()
(exports_dir / "mod_b.py").write_text("y = 2\n")
pythonpath = os.pathsep.join([str(core_dir), str(exports_dir)])
env = {**os.environ, "PYTHONPATH": pythonpath}
# Python should find both when we add them to path
result = subprocess.run(
[
sys.executable,
"-c",
"import sys; "
"sys.path = [p for p in sys.path if 'mod_a' not in p and 'mod_b' not in p]; "
"import mod_a; import mod_b; print('ok')",
],
env=env,
capture_output=True,
text=True,
cwd=str(tmp_path),
timeout=10,
)
assert result.returncode == 0, f"Stdout: {result.stdout} Stderr: {result.stderr}"
assert "ok" in result.stdout
def test_pythonpath_with_colons_fails_on_windows(self, tmp_path):
"""PYTHONPATH built with colons (Unix style) fails on Windows - single path parsed."""
core_dir = tmp_path / "core"
core_dir.mkdir()
(core_dir / "mod_c.py").write_text("z = 3\n")
exports_dir = tmp_path / "exports"
exports_dir.mkdir()
(exports_dir / "mod_d.py").write_text("w = 4\n")
# Buggy: colon-separated (Unix style)
pythonpath = f"{tmp_path}/core:{tmp_path}/exports"
env = {**os.environ, "PYTHONPATH": pythonpath}
# On Windows, Python splits by ; only. The colon string is one invalid path.
result = subprocess.run(
[
sys.executable,
"-c",
"import sys; "
"pp = [p for p in sys.path if 'core' in p or 'exports' in p]; "
"import mod_c; import mod_d; print('ok')",
],
env=env,
capture_output=True,
text=True,
cwd=str(tmp_path),
timeout=10,
)
# Should fail: Python won't parse multiple paths from colon-separated string
assert result.returncode != 0 or "ok" not in result.stdout
def test_fixed_pythonpath_construction_uses_pathsep(self, tmp_path):
"""The fix pattern (os.pathsep.join) produces valid multi-path PYTHONPATH."""
project_root = str(tmp_path)
fixed = _build_pythonpath_fixed(project_root)
# On Windows, os.pathsep is ';'
assert os.pathsep in fixed, "Fixed PYTHONPATH must use os.pathsep on Windows"
# Three paths => two separators
assert fixed.count(os.pathsep) == 2