fix: data disclosure gaps

This commit is contained in:
Timothy
2026-03-17 17:50:08 -07:00
parent 65c8e1653c
commit c167002754
14 changed files with 136 additions and 625 deletions
+6 -2
View File
@@ -612,6 +612,11 @@ class NodeConversation:
continue # never prune errors
if msg.content.startswith("[Pruned tool result"):
continue # already pruned
# Tiny results (set_output acks, confirmations) — pruning
# saves negligible space but makes the LLM think the call
# failed, causing costly retries.
if len(msg.content) < 100:
continue
# Phase-aware: protect current phase messages
if self._current_phase and msg.phase_id == self._current_phase:
@@ -901,8 +906,7 @@ class NodeConversation:
full_path = str((spill_path / conv_filename).resolve())
ref_parts.append(
f"[Previous conversation saved to '{full_path}'. "
f"Use load_data('{conv_filename}'), read_file('{full_path}'), "
f"or run_command('cat \"{full_path}\"') to review if needed.]"
f"Use load_data('{conv_filename}') to review if needed.]"
)
elif not collapsed_msgs:
ref_parts.append("[Previous freeform messages compacted.]")
+91 -58
View File
@@ -293,13 +293,26 @@ class OutputAccumulator:
Values are stored in memory and optionally written through to a
ConversationStore's cursor data for crash recovery.
When *spillover_dir* and *max_value_chars* are set, large values are
automatically saved to files and replaced with lightweight file
references. This guarantees auto-spill fires on **every** ``set()``
call regardless of code path (resume, checkpoint restore, etc.).
"""
values: dict[str, Any] = field(default_factory=dict)
store: ConversationStore | None = None
spillover_dir: str | None = None
max_value_chars: int = 0 # 0 = disabled
async def set(self, key: str, value: Any) -> None:
"""Set a key-value pair, persisting immediately if store is available."""
"""Set a key-value pair, auto-spilling large values to files.
When the serialised value exceeds *max_value_chars*, the data is
saved to ``<spillover_dir>/output_<key>.<ext>`` and *value* is
replaced with a compact file-reference string.
"""
value = self._auto_spill(key, value)
self.values[key] = value
if self.store:
cursor = await self.store.read_cursor() or {}
@@ -308,6 +321,44 @@ class OutputAccumulator:
cursor["outputs"] = outputs
await self.store.write_cursor(cursor)
def _auto_spill(self, key: str, value: Any) -> Any:
"""Save large values to a file and return a reference string."""
if self.max_value_chars <= 0 or not self.spillover_dir:
return value
val_str = (
json.dumps(value, ensure_ascii=False)
if not isinstance(value, str)
else value
)
if len(val_str) <= self.max_value_chars:
return value
spill_path = Path(self.spillover_dir)
spill_path.mkdir(parents=True, exist_ok=True)
ext = ".json" if isinstance(value, (dict, list)) else ".txt"
filename = f"output_{key}{ext}"
write_content = (
json.dumps(value, indent=2, ensure_ascii=False)
if isinstance(value, (dict, list))
else str(value)
)
(spill_path / filename).write_text(write_content, encoding="utf-8")
file_size = (spill_path / filename).stat().st_size
logger.info(
"set_output value auto-spilled: key=%s, "
"%d chars → %s (%d bytes)",
key,
len(val_str),
filename,
file_size,
)
return (
f"[Saved to '{filename}' ({file_size:,} bytes). "
f"Use load_data(filename='{filename}') "
f"to access full data.]"
)
def get(self, key: str) -> Any | None:
"""Get a value by key, or None if not present."""
return self.values.get(key)
@@ -467,7 +518,11 @@ class EventLoopNode(NodeProtocol):
conversation._output_keys = (
ctx.cumulative_output_keys or ctx.node_spec.output_keys or None
)
accumulator = OutputAccumulator(store=self._conversation_store)
accumulator = OutputAccumulator(
store=self._conversation_store,
spillover_dir=self._config.spillover_dir,
max_value_chars=self._config.max_output_value_chars,
)
start_iteration = 0
_restored_recent_responses: list[str] = []
_restored_tool_fingerprints: list[list[tuple[str, str]]] = []
@@ -585,7 +640,11 @@ class EventLoopNode(NodeProtocol):
# Stamp phase for first node in continuous mode
if _is_continuous:
conversation.set_current_phase(ctx.node_id)
accumulator = OutputAccumulator(store=self._conversation_store)
accumulator = OutputAccumulator(
store=self._conversation_store,
spillover_dir=self._config.spillover_dir,
max_value_chars=self._config.max_output_value_chars,
)
start_iteration = 0
# Add initial user message from input data
@@ -2209,58 +2268,24 @@ class EventLoopNode(NodeProtocol):
pass
key = tc.tool_input.get("key", "")
# Auto-spill: save large values to data files and
# replace with a lightweight file reference so shared
# memory / adapt.md / transition markers stay small.
spill_dir = self._config.spillover_dir
max_val = self._config.max_output_value_chars
if max_val > 0 and spill_dir:
val_str = (
json.dumps(value, ensure_ascii=False)
if not isinstance(value, str)
else value
)
if len(val_str) > max_val:
spill_path = Path(spill_dir)
spill_path.mkdir(parents=True, exist_ok=True)
ext = ".json" if isinstance(value, (dict, list)) else ".txt"
filename = f"output_{key}{ext}"
write_content = (
json.dumps(value, indent=2, ensure_ascii=False)
if isinstance(value, (dict, list))
else str(value)
)
(spill_path / filename).write_text(write_content, encoding="utf-8")
file_size = (spill_path / filename).stat().st_size
logger.info(
"set_output value auto-spilled: key=%s, "
"%d chars → %s (%d bytes)",
key,
len(val_str),
filename,
file_size,
)
# Replace value with reference
value = (
f"[Saved to '{filename}' ({file_size:,} bytes). "
f"Use load_data(filename='{filename}') "
f"to access full data.]"
)
# Update tool result to inform the LLM
result = ToolResult(
tool_use_id=tc.tool_use_id,
content=(
f"Output '{key}' was large "
f"({len(val_str):,} chars) — data saved "
f"to '{filename}' ({file_size:,} bytes). "
f"The next phase will see the file "
f"reference and can load full data."
),
is_error=False,
)
# Auto-spill happens inside accumulator.set()
# — it fires on every code path (fresh, resume,
# restore) and prevents overwrite regression.
await accumulator.set(key, value)
self._record_learning(key, value)
stored = accumulator.get(key)
# If the accumulator spilled, update the tool
# result so the LLM knows data was saved to a file.
if isinstance(stored, str) and stored.startswith("[Saved to '"):
result = ToolResult(
tool_use_id=tc.tool_use_id,
content=(
f"Output '{key}' auto-saved to file "
f"(value was too large for inline). "
f"{stored}"
),
is_error=False,
)
self._record_learning(key, stored)
outputs_set_this_turn.append(key)
await self._publish_output_key_set(stream_id, node_id, key, execution_id)
logged_tool_calls.append(
@@ -2639,6 +2664,13 @@ class EventLoopNode(NodeProtocol):
content=raw.content,
is_error=raw.is_error,
)
# Route through _truncate_tool_result so large
# subagent results are saved to spillover files
# and survive pruning (instead of being "cleared
# from context" with no recovery path).
result = self._truncate_tool_result(
result, "delegate_to_sub_agent"
)
results_by_id[tc.tool_use_id] = result
logged_tool_calls.append(
{
@@ -4299,8 +4331,7 @@ class EventLoopNode(NodeProtocol):
)
parts.append(
"CONVERSATION HISTORY (freeform messages saved during compaction — "
"use load_data('<filename>'), read_file('<full_path>'), "
"or run_command('cat \"<full_path>\"') to review earlier dialogue):\n"
"use load_data('<filename>') to review earlier dialogue):\n"
+ conv_list
)
if data_files:
@@ -4308,8 +4339,8 @@ class EventLoopNode(NodeProtocol):
f" - {f} (full path: {data_dir / f})" for f in data_files[:30]
)
parts.append(
"DATA FILES (use load_data('<filename>'), read_file('<full_path>'), "
"or run_command('cat \"<full_path>\"') to read):\n" + file_list
"DATA FILES (use load_data('<filename>') to read):\n"
+ file_list
)
if not all_files:
parts.append(
@@ -4375,6 +4406,8 @@ class EventLoopNode(NodeProtocol):
return None
accumulator = await OutputAccumulator.restore(self._conversation_store)
accumulator.spillover_dir = self._config.spillover_dir
accumulator.max_value_chars = self._config.max_output_value_chars
cursor = await self._conversation_store.read_cursor()
start_iteration = cursor.get("iteration", 0) + 1 if cursor else 0
+30 -3
View File
@@ -277,7 +277,9 @@ def build_transition_marker(
sections.append(f"\nCompleted: {previous_node.name}")
sections.append(f" {previous_node.description}")
# Outputs in memory
# Outputs in memory — use file references for large values so the
# next node loads full data from disk instead of seeing truncated
# inline previews that look deceptively complete.
all_memory = memory.read_all()
if all_memory:
memory_lines: list[str] = []
@@ -285,7 +287,31 @@ def build_transition_marker(
if value is None:
continue
val_str = str(value)
if len(val_str) > 300:
if len(val_str) > 300 and data_dir:
# Auto-spill large transition values to data files
import json as _json
data_path = Path(data_dir)
data_path.mkdir(parents=True, exist_ok=True)
ext = ".json" if isinstance(value, (dict, list)) else ".txt"
filename = f"output_{key}{ext}"
try:
write_content = (
_json.dumps(value, indent=2, ensure_ascii=False)
if isinstance(value, (dict, list))
else str(value)
)
(data_path / filename).write_text(
write_content, encoding="utf-8"
)
file_size = (data_path / filename).stat().st_size
val_str = (
f"[Saved to '{filename}' ({file_size:,} bytes). "
f"Use load_data(filename='{filename}') to access.]"
)
except Exception:
val_str = val_str[:300] + "..."
elif len(val_str) > 300:
val_str = val_str[:300] + "..."
memory_lines.append(f" {key}: {val_str}")
if memory_lines:
@@ -302,7 +328,8 @@ def build_transition_marker(
]
if file_lines:
sections.append(
"\nData files (use read_file to access):\n" + "\n".join(file_lines)
"\nData files (use load_data to access):\n"
+ "\n".join(file_lines)
)
# Agent working memory
+1 -1
View File
@@ -12,7 +12,7 @@ import zlib
# Files beyond this size are skipped/rejected in hashline mode because
# hashline anchors are not practical on files this large (minified
# bundles, logs, data dumps). Shared by view_file, grep_search, and
# bundles, logs, data dumps). Shared by read_file, grep_search, and
# hashline_edit.
HASHLINE_MAX_FILE_BYTES = 10 * 1024 * 1024 # 10 MB
+1 -5
View File
@@ -70,8 +70,6 @@ from .file_system_toolkits.list_dir import register_tools as register_list_dir
from .file_system_toolkits.replace_file_content import (
register_tools as register_replace_file_content,
)
from .file_system_toolkits.view_file import register_tools as register_view_file
from .file_system_toolkits.write_to_file import register_tools as register_write_to_file
from .github_tool import register_tools as register_github
from .gitlab_tool import register_tools as register_gitlab
from .gmail_tool import register_tools as register_gmail
@@ -186,14 +184,12 @@ def _register_verified(
register_account_info(mcp, credentials=credentials)
# --- File system toolkits ---
register_view_file(mcp)
register_write_to_file(mcp)
register_list_dir(mcp)
register_replace_file_content(mcp)
register_apply_diff(mcp)
register_apply_patch(mcp)
register_grep_search(mcp)
# hashline_edit: anchor-based editing, pairs with view_file/grep_search hashline mode
# hashline_edit: anchor-based editing, pairs with read_file/grep_search hashline mode
register_hashline_edit(mcp)
register_execute_command(mcp)
register_data_tools(mcp)
@@ -75,7 +75,7 @@ def register_tools(mcp: FastMCP) -> None:
try:
if hashline:
# Use splitlines() for anchor consistency with
# view_file/hashline_edit (handles Unicode line
# read_file/hashline_edit (handles Unicode line
# separators like \u2028, \x85).
# Skip files > 10MB to avoid excessive memory use.
file_size = os.path.getsize(file_path)
@@ -6,11 +6,11 @@ Edit files using anchor-based line references for precise, hash-validated edits.
The `hashline_edit` tool enables file editing using short content-hash anchors (`N:hhhh`) instead of requiring exact text reproduction. Each line's anchor includes a 4-character hash of its content. If the file has changed since the model last read it, the hash won't match and the edit is cleanly rejected.
Use this tool together with `view_file(hashline=True)` and `grep_search(hashline=True)`, which return anchors for each line.
Use this tool together with `read_file(hashline=True)` and `grep_search(hashline=True)`, which return anchors for each line.
## Use Cases
- Making targeted edits after reading a file with `view_file(hashline=True)`
- Making targeted edits after reading a file with `read_file(hashline=True)`
- Replacing single lines, line ranges, or inserting new lines by anchor
- Batch editing multiple locations in a single atomic call
- Falling back to string replacement when anchors are not available
@@ -21,7 +21,7 @@ Use this tool together with `view_file(hashline=True)` and `grep_search(hashline
import json
# First, read the file with hashline mode to get anchors
content = view_file(path="app.py", hashline=True, workspace_id="ws-1", agent_id="a-1", session_id="s-1")
content = read_file(path="app.py", hashline=True)
# Returns lines like: 1:a3b1|def main(): 2:f1c2| print("hello") ...
# Then edit using the anchors
@@ -29,25 +29,10 @@ hashline_edit(
path="app.py",
edits=json.dumps([
{"op": "set_line", "anchor": "2:f1c2", "content": ' print("goodbye")'}
]),
workspace_id="ws-1",
agent_id="a-1",
session_id="s-1"
])
)
```
## Arguments
| Argument | Type | Required | Default | Description |
|----------|------|----------|---------|-------------|
| `path` | str | Yes | - | The path to the file (relative to session root) |
| `edits` | str | Yes | - | JSON string containing a list of edit operations (see Operations below) |
| `workspace_id` | str | Yes | - | The ID of the workspace |
| `agent_id` | str | Yes | - | The ID of the agent |
| `session_id` | str | Yes | - | The ID of the current session |
| `auto_cleanup` | bool | No | `True` | Strip hashline prefixes and echoed context from content. Set to `False` to write content exactly as provided. |
| `encoding` | str | No | `"utf-8"` | File encoding. Must match the file's actual encoding. |
## Operations
The `edits` parameter is a JSON array of operation objects. Each object must have an `"op"` field:
@@ -61,62 +46,6 @@ The `edits` parameter is a JSON array of operation objects. Each object must hav
| `replace` | `old_content`, `new_content`, `allow_multiple` (optional) | Fallback string replacement; errors if 0 or 2+ matches (unless `allow_multiple: true`) |
| `append` | `content` | Append new lines to end of file (works for empty files too) |
## Returns
**Success:**
```python
{
"success": True,
"path": "app.py",
"edits_applied": 2,
"content": "1:b2c4|def main():\n2:c4a1| print(\"goodbye\")\n..."
}
```
**Success (noop, content unchanged after applying edits):**
```python
{
"success": True,
"path": "app.py",
"edits_applied": 0,
"note": "Content unchanged after applying edits",
"content": "1:b2c4|def main():\n..."
}
```
**Success (with auto-cleanup applied):**
```python
{
"success": True,
"path": "app.py",
"edits_applied": 1,
"content": "...",
"cleanup_applied": ["prefix_strip"]
}
```
The `cleanup_applied` field is only present when cleanup actually modified content. Possible values: `prefix_strip`, `boundary_echo_strip`, `insert_echo_strip`.
**Success (replace with allow_multiple):**
```python
{
"success": True,
"path": "app.py",
"edits_applied": 1,
"content": "...",
"replacements": {"edit_1": 3}
}
```
The `replacements` field is only present when `allow_multiple: true` was used, showing the count per replace op.
**Error:**
```python
{
"error": "Edit #1 (set_line): Hash mismatch at line 2: expected 'f1c2', got 'a3b1'. Re-read the file to get current anchors."
}
```
## Error Handling
- Returns an error if the file doesn't exist
@@ -127,90 +56,11 @@ The `replacements` field is only present when `allow_multiple: true` was used, s
- Returns an error for unknown op types or invalid JSON
- All edits are validated before any writes occur (atomic): on any error the file is unchanged
## Examples
### Replacing a single line
```python
edits = json.dumps([
{"op": "set_line", "anchor": "5:a3b1", "content": " return result"}
])
result = hashline_edit(path="app.py", edits=edits, workspace_id="ws-1", agent_id="a-1", session_id="s-1")
# Returns: {"success": True, "path": "app.py", "edits_applied": 1, "content": "..."}
```
### Replacing a range of lines
```python
edits = json.dumps([{
"op": "replace_lines",
"start_anchor": "10:b1c2",
"end_anchor": "15:c2d3",
"content": " # simplified\n return x + y"
}])
result = hashline_edit(path="math.py", edits=edits, workspace_id="ws-1", agent_id="a-1", session_id="s-1")
```
### Inserting new lines after
```python
edits = json.dumps([
{"op": "insert_after", "anchor": "3:d4e5", "content": "import os\nimport sys"}
])
result = hashline_edit(path="app.py", edits=edits, workspace_id="ws-1", agent_id="a-1", session_id="s-1")
```
### Inserting new lines before
```python
edits = json.dumps([
{"op": "insert_before", "anchor": "1:a1b2", "content": "#!/usr/bin/env python3"}
])
result = hashline_edit(path="app.py", edits=edits, workspace_id="ws-1", agent_id="a-1", session_id="s-1")
```
### Batch editing
```python
edits = json.dumps([
{"op": "set_line", "anchor": "1:a1b2", "content": "#!/usr/bin/env python3"},
{"op": "insert_after", "anchor": "2:b2c3", "content": "import logging"},
{"op": "set_line", "anchor": "10:c3d4", "content": " logging.info('done')"},
])
result = hashline_edit(path="app.py", edits=edits, workspace_id="ws-1", agent_id="a-1", session_id="s-1")
```
### Replace all occurrences
```python
edits = json.dumps([
{"op": "replace", "old_content": "old_name", "new_content": "new_name", "allow_multiple": True}
])
result = hashline_edit(path="app.py", edits=edits, workspace_id="ws-1", agent_id="a-1", session_id="s-1")
# Returns: {..., "replacements": {"edit_1": 5}}
```
## Notes
- Anchors are generated by `view_file(hashline=True)` and `grep_search(hashline=True)`
- Anchors are generated by `read_file(hashline=True)` and `grep_search(hashline=True)`
- The hash is a CRC32-based 4-char hex digest of the line content (with trailing spaces and tabs stripped; leading whitespace is included so indentation changes invalidate anchors). Collision probability is ~0.0015% per changed line.
- All anchor-based ops are validated before any writes occur; if any op fails validation, the file is left unchanged
- String `replace` ops are applied after all anchor-based splices, so they match against post-splice content
- Original line endings (LF or CRLF) are preserved
- The response includes the updated file content in hashline format, so subsequent edits can use the new anchors without re-reading
## Auto-Cleanup Details
When `auto_cleanup=True` (the default), the tool strips hashline prefixes and echoed context that LLMs frequently include in edit content. Prefix stripping uses a **2+ non-empty line threshold** to avoid false positives. The prefix regex matches the `N:hhhh|` pattern (4-char hex hash).
**Why the threshold matters:** Single-line content matching the `N:hhhh|` pattern is ambiguous. It could be literal content (CSV data, config values, log format strings) that happens to match the pattern. With 2+ lines all matching, the probability of a false positive drops dramatically.
**Single-line example (NOT stripped):**
```python
# set_line with content "5:a3b1|hello" writes literally "5:a3b1|hello"
{"op": "set_line", "anchor": "2:f1c2", "content": "5:a3b1|hello"}
```
**Multi-line example (stripped):**
```python
# replace_lines where all lines match N:hhhh| pattern gets stripped
{"op": "replace_lines", "start_anchor": "2:f1c2", "end_anchor": "3:b2d3",
"content": "2:a3b1|BBB\n3:c4d2|CCC"}
# Writes "BBB\nCCC" (prefixes removed)
```
**Escape hatch:** Set `auto_cleanup=False` to write content exactly as provided, bypassing all cleanup heuristics.
@@ -39,7 +39,7 @@ def register_tools(mcp: FastMCP) -> None:
Edit a file using anchor-based line references (N:hash) for precise edits.
When to use
After reading a file with view_file(hashline=True), use the anchors to make
After reading a file with read_file(hashline=True), use the anchors to make
targeted edits without reproducing exact file content.
Rules & Constraints
@@ -1,106 +0,0 @@
# View File Tool
Reads the content of a file within the secure session sandbox.
## Description
The `view_file` tool allows you to read and retrieve the complete content of files within a sandboxed session environment. It provides metadata about the file along with its content.
## Use Cases
- Reading configuration files
- Viewing source code
- Inspecting log files
- Retrieving data files for processing
## Usage
```python
view_file(
path="config/settings.json",
workspace_id="workspace-123",
agent_id="agent-456",
session_id="session-789"
)
```
## Arguments
| Argument | Type | Required | Default | Description |
|----------|------|----------|---------|-------------|
| `path` | str | Yes | - | The path to the file (relative to session root) |
| `workspace_id` | str | Yes | - | The ID of the workspace |
| `agent_id` | str | Yes | - | The ID of the agent |
| `session_id` | str | Yes | - | The ID of the current session |
| `encoding` | str | No | `"utf-8"` | The encoding to use for reading the file |
| `max_size` | int | No | `10485760` | Maximum size of file content to return in bytes (10 MB) |
| `hashline` | bool | No | `False` | If True, return content with `N:hhhh\|content` anchors for use with `hashline_edit` |
| `offset` | int | No | `1` | 1-indexed start line (only used when `hashline=True`) |
| `limit` | int | No | `0` | Max lines to return, 0 = all (only used when `hashline=True`) |
## Returns
Returns a dictionary with the following structure:
**Success (default mode):**
```python
{
"success": True,
"path": "config/settings.json",
"content": "{\"debug\": true}",
"size_bytes": 16,
"lines": 1
}
```
**Success (hashline mode):**
```python
{
"success": True,
"path": "app.py",
"content": "1:a3f2|def main():\n2:f1c4| print(\"hello\")",
"hashline": True,
"offset": 1,
"limit": 0,
"total_lines": 2,
"shown_lines": 2,
"size_bytes": 35
}
```
**Error:**
```python
{
"error": "File not found at config/settings.json"
}
```
## Error Handling
- Returns an error dict if the file doesn't exist
- Returns an error dict if the file cannot be read (permission issues, encoding errors, etc.)
- Handles binary files gracefully by returning appropriate error messages
## Examples
### Reading a text file
```python
result = view_file(
path="README.md",
workspace_id="ws-1",
agent_id="agent-1",
session_id="session-1"
)
# Returns: {"success": True, "path": "README.md", "content": "# My Project\n...", "size_bytes": 1024, "lines": 42}
```
### Handling missing files
```python
result = view_file(
path="nonexistent.txt",
workspace_id="ws-1",
agent_id="agent-1",
session_id="session-1"
)
# Returns: {"error": "File not found at nonexistent.txt"}
```
@@ -1,3 +0,0 @@
from .view_file import register_tools
__all__ = ["register_tools"]
@@ -1,134 +0,0 @@
import os
from mcp.server.fastmcp import FastMCP
from aden_tools.hashline import HASHLINE_MAX_FILE_BYTES, format_hashlines
from ..security import get_secure_path
def register_tools(mcp: FastMCP) -> None:
"""Register file view tools with the MCP server."""
if getattr(mcp, "_file_tools_registered", False):
return
mcp._file_tools_registered = True
@mcp.tool()
def view_file(
path: str,
workspace_id: str,
agent_id: str,
session_id: str,
encoding: str = "utf-8",
max_size: int = HASHLINE_MAX_FILE_BYTES,
hashline: bool = False,
offset: int = 1,
limit: int = 0,
) -> dict:
"""
Purpose
Read the content of a file within the session sandbox.
When to use
Inspect file contents before making changes
Retrieve stored data or configuration
Review logs or artifacts
Rules & Constraints
File must exist at the specified path
Returns full content with size and line count
Always read before patching or modifying
Args:
path: The path to the file (relative to session root)
workspace_id: The ID of workspace
agent_id: The ID of agent
session_id: The ID of the current session
encoding: The encoding to use for reading the file (default: "utf-8")
max_size: The maximum size of file content to return in bytes (default: 10MB)
hashline: If True, return content with N:hhhh|content anchors
for use with hashline_edit (default: False)
offset: 1-indexed start line, only used when hashline=True (default: 1)
limit: Max lines to return, 0 = all, only used when hashline=True (default: 0)
Returns:
Dict with file content and metadata, or error dict
"""
try:
if max_size < 0:
return {"error": f"max_size must be non-negative, got {max_size}"}
secure_path = get_secure_path(path, workspace_id, agent_id, session_id)
if not os.path.exists(secure_path):
return {"error": f"File not found at {path}"}
if not os.path.isfile(secure_path):
return {"error": f"Path is not a file: {path}"}
with open(secure_path, encoding=encoding) as f:
content_raw = f.read()
if not hashline and (offset != 1 or limit != 0):
return {
"error": "offset and limit are only supported when hashline=True. "
"Set hashline=True to use paging."
}
if hashline:
if offset < 1:
return {"error": f"offset must be >= 1, got {offset}"}
if limit < 0:
return {"error": f"limit must be >= 0, got {limit}"}
all_lines = content_raw.splitlines()
total_lines = len(all_lines)
raw_size = len(content_raw.encode(encoding))
if offset > max(total_lines, 1):
return {"error": f"offset {offset} is beyond end of file ({total_lines} lines)"}
# Check size after considering offset/limit. When paging
# (offset or limit set), only check the formatted output size.
# When reading the full file, check the raw size.
is_paging = offset > 1 or limit > 0
if not is_paging and raw_size > max_size:
return {
"error": f"File too large for hashline mode ({raw_size} bytes, "
f"max {max_size}). Use offset and limit to read a section at a time."
}
formatted = format_hashlines(all_lines, offset=offset, limit=limit)
shown_lines = len(formatted.splitlines()) if formatted else 0
if is_paging and len(formatted.encode(encoding)) > max_size:
return {
"error": f"Requested section too large ({shown_lines} lines). "
f"Reduce limit to read a smaller section."
}
return {
"success": True,
"path": path,
"content": formatted,
"hashline": True,
"offset": offset,
"limit": limit,
"total_lines": total_lines,
"shown_lines": shown_lines,
"size_bytes": raw_size,
}
content = content_raw
if len(content.encode(encoding)) > max_size:
content = content[:max_size]
content += "\n\n[... Content truncated due to size limit ...]"
return {
"success": True,
"path": path,
"content": content,
"size_bytes": len(content.encode(encoding)),
"lines": len(content.splitlines()),
}
except Exception as e:
return {"error": f"Failed to read file: {str(e)}"}
@@ -1,92 +0,0 @@
# Write to File Tool
Writes content to a file within the secure session sandbox. Supports both overwriting and appending modes.
## Description
The `write_to_file` tool allows you to create new files or modify existing files within a sandboxed session environment. It automatically creates parent directories if they don't exist and provides flexible write modes.
## Use Cases
- Creating new configuration files
- Writing generated code or data
- Appending logs or output to existing files
- Saving processed results to disk
## Usage
```python
write_to_file(
path="config/settings.json",
content='{"debug": true}',
workspace_id="workspace-123",
agent_id="agent-456",
session_id="session-789",
append=False
)
```
## Arguments
| Argument | Type | Required | Default | Description |
|----------|------|----------|---------|-------------|
| `path` | str | Yes | - | The path to the file (relative to session root) |
| `content` | str | Yes | - | The content to write to the file |
| `workspace_id` | str | Yes | - | The ID of the workspace |
| `agent_id` | str | Yes | - | The ID of the agent |
| `session_id` | str | Yes | - | The ID of the current session |
| `append` | bool | No | False | Whether to append to the file instead of overwriting |
## Returns
Returns a dictionary with the following structure:
**Success:**
```python
{
"success": True,
"path": "config/settings.json",
"mode": "written", # or "appended"
"bytes_written": 18
}
```
**Error:**
```python
{
"error": "Failed to write to file: [error message]"
}
```
## Error Handling
- Returns an error dict if the file cannot be written (permission issues, invalid path, etc.)
- Automatically creates parent directories if they don't exist
- Handles encoding errors gracefully
## Examples
### Creating a new file
```python
result = write_to_file(
path="data/output.txt",
content="Hello, world!",
workspace_id="ws-1",
agent_id="agent-1",
session_id="session-1"
)
# Returns: {"success": True, "path": "data/output.txt", "mode": "written", "bytes_written": 13}
```
### Appending to a file
```python
result = write_to_file(
path="logs/activity.log",
content="\n[INFO] Task completed",
workspace_id="ws-1",
agent_id="agent-1",
session_id="session-1",
append=True
)
# Returns: {"success": True, "path": "logs/activity.log", "mode": "appended", "bytes_written": 24}
```
@@ -1,3 +0,0 @@
from .write_to_file import register_tools
__all__ = ["register_tools"]
@@ -1,61 +0,0 @@
import os
from mcp.server.fastmcp import FastMCP
from ..security import get_secure_path
def register_tools(mcp: FastMCP) -> None:
"""Register file write tools with the MCP server."""
@mcp.tool()
def write_to_file(
path: str,
content: str,
workspace_id: str,
agent_id: str,
session_id: str,
append: bool = False,
) -> dict:
"""
Purpose
Create a new file or append content to an existing file.
When to use
Append new events to append-only logs
Create new artifacts or summaries
Initialize new canonical memory files
Rules & Constraints
Must not overwrite canonical memory unless explicitly allowed
Should include structured data (JSON, Markdown with headers)
Every write must be intentional and minimal
Anti-pattern
Do NOT dump raw conversation transcripts without structure or reason.
Args:
path: The path to the file (relative to session root)
content: The content to write to the file
workspace_id: The ID of the workspace
agent_id: The ID of the agent
session_id: The ID of the current session
append: Whether to append to the file instead of overwriting (default: False)
Returns:
Dict with success status and path, or error dict
"""
try:
secure_path = get_secure_path(path, workspace_id, agent_id, session_id)
os.makedirs(os.path.dirname(secure_path), exist_ok=True)
mode = "a" if append else "w"
with open(secure_path, mode, encoding="utf-8") as f:
f.write(content)
return {
"success": True,
"path": path,
"mode": "appended" if append else "written",
"bytes_written": len(content.encode("utf-8")),
}
except Exception as e:
return {"error": f"Failed to write to file: {str(e)}"}