Merge remote-tracking branch 'origin/main' into feat/email-tool

This commit is contained in:
Timothy
2026-01-30 13:56:15 -08:00
6 changed files with 562 additions and 2 deletions
+63 -1
View File
@@ -47,6 +47,23 @@ class ExecutionResult:
paused_at: str | None = None # Node ID where execution paused for HITL
session_state: dict[str, Any] = field(default_factory=dict) # State to resume from
# Execution quality metrics
total_retries: int = 0 # Total number of retries across all nodes
nodes_with_failures: list[str] = field(default_factory=list) # Node IDs that failed but recovered
retry_details: dict[str, int] = field(default_factory=dict) # {node_id: retry_count}
had_partial_failures: bool = False # True if any node failed but eventually succeeded
execution_quality: str = "clean" # "clean", "degraded", or "failed"
@property
def is_clean_success(self) -> bool:
"""True only if execution succeeded with no retries or failures."""
return self.success and self.execution_quality == "clean"
@property
def is_degraded_success(self) -> bool:
"""True if execution succeeded but had retries or partial failures."""
return self.success and self.execution_quality == "degraded"
@dataclass
class ParallelBranch:
@@ -399,6 +416,11 @@ class GraphExecutor:
f"{max_retries} retries: {result.error}"
),
)
# Calculate quality metrics
total_retries_count = sum(node_retry_counts.values())
nodes_failed = list(node_retry_counts.keys())
return ExecutionResult(
success=False,
error=(
@@ -410,6 +432,11 @@ class GraphExecutor:
total_tokens=total_tokens,
total_latency_ms=total_latency,
path=path,
total_retries=total_retries_count,
nodes_with_failures=nodes_failed,
retry_details=dict(node_retry_counts),
had_partial_failures=len(nodes_failed) > 0,
execution_quality="failed",
)
# Check if we just executed a pause node - if so, save state and return
@@ -430,6 +457,11 @@ class GraphExecutor:
narrative=f"Paused at {node_spec.name} after {steps} steps",
)
# Calculate quality metrics
total_retries_count = sum(node_retry_counts.values())
nodes_failed = [nid for nid, count in node_retry_counts.items() if count > 0]
exec_quality = "degraded" if total_retries_count > 0 else "clean"
return ExecutionResult(
success=True,
output=saved_memory,
@@ -439,6 +471,11 @@ class GraphExecutor:
path=path,
paused_at=node_spec.id,
session_state=session_state_out,
total_retries=total_retries_count,
nodes_with_failures=nodes_failed,
retry_details=dict(node_retry_counts),
had_partial_failures=len(nodes_failed) > 0,
execution_quality=exec_quality,
)
# Check if this is a terminal node - if so, we're done
@@ -527,10 +564,20 @@ class GraphExecutor:
self.logger.info(f" Total tokens: {total_tokens}")
self.logger.info(f" Total latency: {total_latency}ms")
# Calculate execution quality metrics
total_retries_count = sum(node_retry_counts.values())
nodes_failed = [nid for nid, count in node_retry_counts.items() if count > 0]
exec_quality = "degraded" if total_retries_count > 0 else "clean"
# Update narrative to reflect execution quality
quality_suffix = ""
if exec_quality == "degraded":
quality_suffix = f" (⚠️ {total_retries_count} retries across {len(nodes_failed)} nodes)"
self.runtime.end_run(
success=True,
output_data=output,
narrative=f"Executed {steps} steps through path: {' -> '.join(path)}",
narrative=f"Executed {steps} steps through path: {' -> '.join(path)}{quality_suffix}",
)
return ExecutionResult(
@@ -540,6 +587,11 @@ class GraphExecutor:
total_tokens=total_tokens,
total_latency_ms=total_latency,
path=path,
total_retries=total_retries_count,
nodes_with_failures=nodes_failed,
retry_details=dict(node_retry_counts),
had_partial_failures=len(nodes_failed) > 0,
execution_quality=exec_quality,
)
except Exception as e:
@@ -551,11 +603,21 @@ class GraphExecutor:
success=False,
narrative=f"Failed at step {steps}: {e}",
)
# Calculate quality metrics even for exceptions
total_retries_count = sum(node_retry_counts.values())
nodes_failed = list(node_retry_counts.keys())
return ExecutionResult(
success=False,
error=str(e),
steps_executed=steps,
path=path,
total_retries=total_retries_count,
nodes_with_failures=nodes_failed,
retry_details=dict(node_retry_counts),
had_partial_failures=len(nodes_failed) > 0,
execution_quality="failed",
)
def _build_context(
+20 -1
View File
@@ -158,7 +158,26 @@ class ToolRegistry:
)
result = executor_func(tool_use)
if isinstance(result, ToolResult):
return json.loads(result.content) if result.content else {}
# ToolResult.content is expected to be JSON, but tools may
# sometimes return invalid JSON. Guard against crashes here
# and surface a structured error instead.
if not result.content:
return {}
try:
return json.loads(result.content)
except json.JSONDecodeError as e:
logger.warning(
"Tool '%s' returned invalid JSON: %s",
tool_name,
str(e),
)
return {
"error": (
f"Invalid JSON response from tool '{tool_name}': "
f"{str(e)}"
),
"raw_content": result.content,
}
return result
return executor
+364
View File
@@ -0,0 +1,364 @@
"""
Tests for execution quality tracking.
Verifies that ExecutionResult properly tracks retries, partial failures,
and execution quality to ensure observability reflects semantic correctness.
"""
import pytest
from framework.graph.edge import EdgeCondition, EdgeSpec, GraphSpec
from framework.graph.executor import ExecutionResult, GraphExecutor
from framework.graph.goal import Goal, SuccessCriterion
from framework.graph.node import NodeContext, NodeProtocol, NodeResult, NodeSpec
from framework.runtime.core import Runtime
class FlakyNode(NodeProtocol):
"""A node that fails N times before succeeding."""
def __init__(self, fail_count: int = 2):
self.fail_count = fail_count
self.attempt = 0
async def execute(self, ctx: NodeContext) -> NodeResult:
"""Execute with flaky behavior."""
self.attempt += 1
if self.attempt <= self.fail_count:
return NodeResult(
success=False,
error=f"Simulated failure {self.attempt}/{self.fail_count}",
)
# Get the output keys from the node spec and populate them
output = {}
for key in ctx.node_spec.output_keys:
output[key] = f"succeeded after {self.attempt} attempts"
return NodeResult(
success=True,
output=output,
)
def validate_input(self, ctx: NodeContext) -> list[str]:
return []
class AlwaysSucceedsNode(NodeProtocol):
"""A node that always succeeds immediately."""
async def execute(self, ctx: NodeContext) -> NodeResult:
# Get the output keys from the node spec and populate them
output = {}
for key in ctx.node_spec.output_keys:
output[key] = "success"
return NodeResult(
success=True,
output=output,
)
def validate_input(self, ctx: NodeContext) -> list[str]:
return []
class AlwaysFailsNode(NodeProtocol):
"""A node that always fails (for testing max retries)."""
async def execute(self, ctx: NodeContext) -> NodeResult:
return NodeResult(
success=False,
error="Permanent failure",
)
def validate_input(self, ctx: NodeContext) -> list[str]:
return []
@pytest.mark.asyncio
class TestExecutionQuality:
"""Test execution quality tracking."""
async def test_clean_success_no_retries(self, tmp_path):
"""Test clean success when no retries occur."""
# Setup
runtime = Runtime(tmp_path)
goal = Goal(
id="test",
name="Test",
description="Test clean execution",
success_criteria=[
SuccessCriterion(
id="works",
description="Works",
metric="output_equals",
target="success",
)
],
)
# Create simple graph with always-succeeding node
graph = GraphSpec(
id="test-graph",
goal_id=goal.id,
nodes=[
NodeSpec(
id="node1",
name="Always Succeeds",
description="Never fails",
node_type="function",
output_keys=["result"],
),
],
edges=[],
entry_node="node1",
terminal_nodes=["node1"],
)
executor = GraphExecutor(
runtime=runtime,
node_registry={"node1": AlwaysSucceedsNode()},
)
# Execute
result = await executor.execute(graph, goal)
# Verify - this should be clean success
assert result.success is True
assert result.execution_quality == "clean"
assert result.total_retries == 0
assert result.nodes_with_failures == []
assert result.had_partial_failures is False
assert result.is_clean_success is True
assert result.is_degraded_success is False
async def test_degraded_success_with_retries(self, tmp_path):
"""Test degraded success when retries occur but eventually succeeds."""
# Setup
runtime = Runtime(tmp_path)
goal = Goal(
id="test",
name="Test",
description="Test execution with retries",
success_criteria=[
SuccessCriterion(
id="works",
description="Works eventually",
metric="output_equals",
target="success",
)
],
)
# Create graph with flaky node (fails 2 times before succeeding)
graph = GraphSpec(
id="test-graph",
goal_id=goal.id,
nodes=[
NodeSpec(
id="flaky",
name="Flaky Node",
description="Fails then succeeds",
node_type="function",
output_keys=["result"],
max_retries=3, # Allow retries
),
],
edges=[],
entry_node="flaky",
terminal_nodes=["flaky"],
)
executor = GraphExecutor(
runtime=runtime,
node_registry={"flaky": FlakyNode(fail_count=2)},
)
# Execute
result = await executor.execute(graph, goal)
# Verify - this should be degraded success
assert result.success is True
assert result.execution_quality == "degraded"
assert result.total_retries == 2
assert "flaky" in result.nodes_with_failures
assert result.retry_details["flaky"] == 2
assert result.had_partial_failures is True
assert result.is_clean_success is False
assert result.is_degraded_success is True
async def test_failed_execution_max_retries_exceeded(self, tmp_path):
"""Test failed execution when max retries are exceeded."""
# Setup
runtime = Runtime(tmp_path)
goal = Goal(
id="test",
name="Test",
description="Test execution failure",
success_criteria=[
SuccessCriterion(
id="works",
description="Should work",
metric="output_equals",
target="success",
)
],
)
# Create graph with always-failing node
graph = GraphSpec(
id="test-graph",
goal_id=goal.id,
nodes=[
NodeSpec(
id="fails",
name="Always Fails",
description="Never succeeds",
node_type="function",
output_keys=["result"],
max_retries=2, # Will retry twice then fail
),
],
edges=[],
entry_node="fails",
terminal_nodes=["fails"],
)
executor = GraphExecutor(
runtime=runtime,
node_registry={"fails": AlwaysFailsNode()},
)
# Execute
result = await executor.execute(graph, goal)
# Verify - this should be failed
assert result.success is False
assert result.execution_quality == "failed"
assert result.total_retries == 2
assert "fails" in result.nodes_with_failures
assert result.retry_details["fails"] == 2
assert result.had_partial_failures is True
assert result.error is not None
assert "failed after 2 attempts" in result.error
async def test_multi_node_partial_failures(self, tmp_path):
"""Test tracking failures across multiple nodes."""
# Setup
runtime = Runtime(tmp_path)
goal = Goal(
id="test",
name="Test",
description="Test multi-node execution",
success_criteria=[
SuccessCriterion(
id="works",
description="All nodes succeed",
metric="output_equals",
target="success",
)
],
)
# Create graph with multiple flaky nodes
graph = GraphSpec(
id="test-graph",
goal_id=goal.id,
nodes=[
NodeSpec(
id="flaky1",
name="Flaky Node 1",
description="Fails once",
node_type="function",
output_keys=["result1"],
max_retries=3,
),
NodeSpec(
id="flaky2",
name="Flaky Node 2",
description="Fails twice",
node_type="function",
input_keys=["result1"],
output_keys=["result2"],
max_retries=3,
),
NodeSpec(
id="success",
name="Success Node",
description="Always succeeds",
node_type="function",
input_keys=["result2"],
output_keys=["final"],
),
],
edges=[
EdgeSpec(
id="e1",
source="flaky1",
target="flaky2",
condition=EdgeCondition.ON_SUCCESS,
),
EdgeSpec(
id="e2",
source="flaky2",
target="success",
condition=EdgeCondition.ON_SUCCESS,
),
],
entry_node="flaky1",
terminal_nodes=["success"],
)
executor = GraphExecutor(
runtime=runtime,
node_registry={
"flaky1": FlakyNode(fail_count=1), # Fails once
"flaky2": FlakyNode(fail_count=2), # Fails twice
"success": AlwaysSucceedsNode(),
},
)
# Execute
result = await executor.execute(graph, goal)
# Verify - should succeed but be degraded
assert result.success is True
assert result.execution_quality == "degraded"
assert result.total_retries == 3 # 1 + 2 retries
assert set(result.nodes_with_failures) == {"flaky1", "flaky2"}
assert result.retry_details["flaky1"] == 1
assert result.retry_details["flaky2"] == 2
assert result.had_partial_failures is True
assert result.is_clean_success is False
assert result.is_degraded_success is True
async def test_execution_result_properties(self, tmp_path):
"""Test ExecutionResult helper properties."""
# Clean success
clean = ExecutionResult(
success=True,
execution_quality="clean",
)
assert clean.is_clean_success is True
assert clean.is_degraded_success is False
# Degraded success
degraded = ExecutionResult(
success=True,
execution_quality="degraded",
total_retries=2,
)
assert degraded.is_clean_success is False
assert degraded.is_degraded_success is True
# Failed
failed = ExecutionResult(
success=False,
execution_quality="failed",
)
assert failed.is_clean_success is False
assert failed.is_degraded_success is False
if __name__ == "__main__":
pytest.main([__file__, "-v"])
+95
View File
@@ -0,0 +1,95 @@
"""Tests for ToolRegistry JSON handling when tools return invalid JSON.
These tests exercise the discover_from_module() path, where tools are
registered via a TOOLS dict and a unified tool_executor that returns
ToolResult instances. Historically, invalid JSON in ToolResult.content
could cause a json.JSONDecodeError and crash execution.
"""
from pathlib import Path
import textwrap
from framework.llm.provider import Tool, ToolResult
from framework.runner.tool_registry import ToolRegistry
def _write_tool_module(tmp_path: Path, content: str) -> Path:
"""Helper to write a temporary tools module."""
module_path = tmp_path / "agent_tools.py"
module_path.write_text(textwrap.dedent(content))
return module_path
def test_discover_from_module_handles_invalid_json(tmp_path):
"""ToolRegistry should not crash when tool_executor returns invalid JSON."""
module_src = """
from framework.llm.provider import Tool, ToolUse, ToolResult
TOOLS = {
"bad_tool": Tool(
name="bad_tool",
description="Returns malformed JSON",
parameters={"type": "object", "properties": {}},
),
}
def tool_executor(tool_use: ToolUse) -> ToolResult:
# Intentionally malformed JSON
return ToolResult(
tool_use_id=tool_use.id,
content="not {valid json",
is_error=False,
)
"""
module_path = _write_tool_module(tmp_path, module_src)
registry = ToolRegistry()
count = registry.discover_from_module(module_path)
assert count == 1
# Access the registered executor for "bad_tool"
assert "bad_tool" in registry._tools # noqa: SLF001 - testing internal registry
registered = registry._tools["bad_tool"]
# Should not raise, and should return a structured error dict
result = registered.executor({})
assert isinstance(result, dict)
assert "error" in result
assert "raw_content" in result
assert result["raw_content"] == "not {valid json"
def test_discover_from_module_handles_empty_content(tmp_path):
"""ToolRegistry should handle empty ToolResult.content gracefully."""
module_src = """
from framework.llm.provider import Tool, ToolUse, ToolResult
TOOLS = {
"empty_tool": Tool(
name="empty_tool",
description="Returns empty content",
parameters={"type": "object", "properties": {}},
),
}
def tool_executor(tool_use: ToolUse) -> ToolResult:
return ToolResult(
tool_use_id=tool_use.id,
content="",
is_error=False,
)
"""
module_path = _write_tool_module(tmp_path, module_src)
registry = ToolRegistry()
count = registry.discover_from_module(module_path)
assert count == 1
assert "empty_tool" in registry._tools # noqa: SLF001 - testing internal registry
registered = registry._tools["empty_tool"]
# Empty content should return an empty dict rather than crashing
result = registered.executor({})
assert isinstance(result, dict)
assert result == {}
@@ -364,6 +364,8 @@ class AgentRequest(BaseModel):
```
### Output Sanitization
> **Note:** The following snippet is illustrative and shows a simplified example
> of output sanitization logic. Actual implementations may differ.
```python
def sanitize_output(result):
# Remove any leaked secrets
+18
View File
@@ -195,6 +195,24 @@ else
fi
echo ""
# Ensure exports directory exists
echo "=================================================="
echo "Checking Directory Structure"
echo "=================================================="
echo ""
if [ ! -d "$PROJECT_ROOT/exports" ]; then
echo "Creating exports directory..."
mkdir -p "$PROJECT_ROOT/exports"
echo "# Agent Exports" > "$PROJECT_ROOT/exports/README.md"
echo "" >> "$PROJECT_ROOT/exports/README.md"
echo "This directory is the default location for generated agent packages." >> "$PROJECT_ROOT/exports/README.md"
echo -e "${GREEN}${NC} Created exports directory"
else
echo -e "${GREEN}${NC} exports directory exists"
fi
echo ""
# Verify installations
echo "=================================================="
echo "Verifying Installation"