refactor: graph executer cleanup

This commit is contained in:
Richard Tang
2026-04-02 10:20:46 -07:00
parent 0e5f571b09
commit 332311b49b
3 changed files with 18 additions and 81 deletions
@@ -476,13 +476,6 @@ async def execute_tool(
return result
def next_spill_filename(tool_name: str, counter: int) -> str:
"""Return a short, monotonic filename for a tool result spill."""
# Shorten common tool name prefixes to save tokens
short = tool_name.removeprefix("tool_").removeprefix("mcp_")
return f"{short}_{counter}.txt"
def restore_spill_counter(spillover_dir: str | None) -> int:
"""Scan spillover_dir for existing spill files and return the max counter.
+14 -70
View File
@@ -30,62 +30,6 @@ from framework.runtime.core import Runtime
logger = logging.getLogger(__name__)
def _fix_unescaped_newlines_in_json(json_str: str) -> str:
"""Fix unescaped newlines inside JSON string values.
LLMs sometimes output actual newlines inside JSON strings instead of \\n.
This function fixes that by properly escaping newlines within string values.
"""
result = []
in_string = False
escape_next = False
i = 0
while i < len(json_str):
char = json_str[i]
if escape_next:
result.append(char)
escape_next = False
i += 1
continue
if char == "\\" and in_string:
escape_next = True
result.append(char)
i += 1
continue
if char == '"' and not escape_next:
in_string = not in_string
result.append(char)
i += 1
continue
# Fix unescaped newlines inside strings
if in_string and char == "\n":
result.append("\\n")
i += 1
continue
# Fix unescaped carriage returns inside strings
if in_string and char == "\r":
result.append("\\r")
i += 1
continue
# Fix unescaped tabs inside strings
if in_string and char == "\t":
result.append("\\t")
i += 1
continue
result.append(char)
i += 1
return "".join(result)
def find_json_object(text: str) -> str | None:
"""Find the first valid JSON object in text using balanced brace matching.
@@ -275,19 +219,19 @@ class NodeSpec(BaseModel):
model_config = {"extra": "allow", "arbitrary_types_allowed": True}
class MemoryWriteError(Exception):
"""Raised when an invalid value is written to memory."""
class DataBufferWriteError(Exception):
"""Raised when an invalid value is written to the data buffer."""
pass
@dataclass
class SharedMemory:
class DataBuffer:
"""
Shared state between nodes in a graph execution.
Shared data buffer between nodes in a graph execution.
Nodes read and write to shared memory using typed keys.
The memory is scoped to a single run.
Nodes read and write to the data buffer using typed keys.
The buffer is scoped to a single run.
For parallel execution, use write_async() which provides per-key locking
to prevent race conditions when multiple nodes write concurrently.
@@ -306,23 +250,23 @@ class SharedMemory:
self._lock = asyncio.Lock()
def read(self, key: str) -> Any:
"""Read a value from shared memory."""
"""Read a value from the data buffer."""
if self._allowed_read and key not in self._allowed_read:
raise PermissionError(f"Node not allowed to read key: {key}")
return self._data.get(key)
def write(self, key: str, value: Any, validate: bool = True) -> None:
"""
Write a value to shared memory.
Write a value to the data buffer.
Args:
key: The memory key to write to
key: The buffer key to write to
value: The value to write
validate: If True, check for suspicious content (default True)
Raises:
PermissionError: If node doesn't have write permission
MemoryWriteError: If value appears to be hallucinated content
DataBufferWriteError: If value appears to be hallucinated content
"""
if self._allowed_write and key not in self._allowed_write:
raise PermissionError(f"Node not allowed to write key: {key}")
@@ -336,7 +280,7 @@ class SharedMemory:
f"⚠ Suspicious write to key '{key}': appears to be code "
f"({len(value)} chars). Consider using validate=False if intended."
)
raise MemoryWriteError(
raise DataBufferWriteError(
f"Rejected suspicious content for key '{key}': "
f"appears to be hallucinated code ({len(value)} chars). "
"If this is intentional, use validate=False."
@@ -352,13 +296,13 @@ class SharedMemory:
parallel execution. Each key has its own lock to minimize contention.
Args:
key: The memory key to write to
key: The buffer key to write to
value: The value to write
validate: If True, check for suspicious content (default True)
Raises:
PermissionError: If node doesn't have write permission
MemoryWriteError: If value appears to be hallucinated content
DataBufferWriteError: If value appears to be hallucinated content
"""
# Check permissions first (no lock needed)
if self._allowed_write and key not in self._allowed_write:
@@ -379,7 +323,7 @@ class SharedMemory:
f"⚠ Suspicious write to key '{key}': appears to be code "
f"({len(value)} chars). Consider using validate=False if intended."
)
raise MemoryWriteError(
raise DataBufferWriteError(
f"Rejected suspicious content for key '{key}': "
f"appears to be hallucinated code ({len(value)} chars). "
"If this is intentional, use validate=False."
+4 -4
View File
@@ -1,10 +1,10 @@
"""
Shared State Manager - Manages state across concurrent executions.
Shared Buffer Manager - Manages state across concurrent executions.
Provides different isolation levels:
- ISOLATED: Each execution has its own memory copy
- SHARED: All executions read/write same memory (eventual consistency)
- SYNCHRONIZED: Shared memory with write locks (strong consistency)
- ISOLATED: Each execution has its own state copy
- SHARED: All executions read/write same state (eventual consistency)
- SYNCHRONIZED: Shared state with write locks (strong consistency)
"""
import asyncio