Compare commits

...

3 Commits

Author SHA1 Message Date
Richard T 38ba872bf1 chore: lint code quality 2026-01-23 18:04:54 -08:00
Richard T 0dbd1f7896 chore lint fixes 2026-01-23 18:03:50 -08:00
Richard T 364fb57d3c chore: lint fixes for ruff -part 1 2026-01-23 17:21:59 -08:00
59 changed files with 1934 additions and 1770 deletions
+5 -9
View File
@@ -78,6 +78,7 @@ async def example_3_config_file():
# Copy example config (in practice, you'd place this in your agent folder)
import shutil
shutil.copy(
"examples/mcp_servers.json",
test_agent_path / "mcp_servers.json"
@@ -110,18 +111,12 @@ async def example_4_custom_agent_with_mcp_tools():
builder.set_goal(
goal_id="web-researcher",
name="Web Research Agent",
description="Search the web and summarize findings"
description="Search the web and summarize findings",
)
# Add success criteria
builder.add_success_criterion(
"search-results",
"Successfully retrieve at least 3 web search results"
)
builder.add_success_criterion(
"summary",
"Provide a clear, concise summary of the findings"
)
builder.add_success_criterion("search-results", "Successfully retrieve at least 3 web search results")
builder.add_success_criterion("summary", "Provide a clear, concise summary of the findings")
# Add nodes that will use MCP tools
builder.add_node(
@@ -192,6 +187,7 @@ async def main():
except Exception as e:
print(f"\nError running example: {e}")
import traceback
traceback.print_exc()
+11 -11
View File
@@ -22,24 +22,24 @@ The framework includes a Goal-Based Testing system (Goal → Agent → Eval):
See `framework.testing` for details.
"""
from framework.schemas.decision import Decision, Option, Outcome, DecisionEvaluation
from framework.schemas.run import Run, RunSummary, Problem
from framework.runtime.core import Runtime
from framework.builder.query import BuilderQuery
from framework.llm import LLMProvider, AnthropicProvider
from framework.runner import AgentRunner, AgentOrchestrator
from framework.llm import AnthropicProvider, LLMProvider
from framework.runner import AgentOrchestrator, AgentRunner
from framework.runtime.core import Runtime
from framework.schemas.decision import Decision, DecisionEvaluation, Option, Outcome
from framework.schemas.run import Problem, Run, RunSummary
# Testing framework
from framework.testing import (
ApprovalStatus,
ConstraintTestGenerator,
DebugTool,
ErrorCategory,
SuccessCriteriaTestGenerator,
Test,
TestResult,
TestSuiteResult,
TestStorage,
ApprovalStatus,
ErrorCategory,
ConstraintTestGenerator,
SuccessCriteriaTestGenerator,
DebugTool,
TestSuiteResult,
)
__all__ = [
+3 -3
View File
@@ -2,12 +2,12 @@
from framework.builder.query import BuilderQuery
from framework.builder.workflow import (
GraphBuilder,
BuildSession,
BuildPhase,
ValidationResult,
BuildSession,
GraphBuilder,
TestCase,
TestResult,
ValidationResult,
)
__all__ = [
+26 -29
View File
@@ -8,12 +8,12 @@ This is designed around the questions I need to answer:
4. What should we change? (suggestions)
"""
from typing import Any
from collections import defaultdict
from pathlib import Path
from typing import Any
from framework.schemas.decision import Decision
from framework.schemas.run import Run, RunSummary, RunStatus
from framework.schemas.run import Run, RunStatus, RunSummary
from framework.storage.backend import FileStorage
@@ -196,10 +196,7 @@ class BuilderQuery:
break
# Extract problems
problems = [
f"[{p.severity}] {p.description}"
for p in run.problems
]
problems = [f"[{p.severity}] {p.description}" for p in run.problems]
# Generate suggestions based on the failure
suggestions = self._generate_suggestions(run, failed_decisions)
@@ -253,11 +250,7 @@ class BuilderQuery:
error = decision.outcome.error or "Unknown error"
failure_counts[error] += 1
common_failures = sorted(
failure_counts.items(),
key=lambda x: x[1],
reverse=True
)[:5]
common_failures = sorted(failure_counts.items(), key=lambda x: x[1], reverse=True)[:5]
# Find problematic nodes
node_stats: dict[str, dict[str, int]] = defaultdict(lambda: {"total": 0, "failed": 0})
@@ -328,34 +321,42 @@ class BuilderQuery:
# Suggestion: Fix problematic nodes
for node_id, failure_rate in patterns.problematic_nodes:
suggestions.append({
suggestions.append(
{
"type": "node_improvement",
"target": node_id,
"reason": f"Node has {failure_rate:.1%} failure rate",
"recommendation": f"Review and improve node '{node_id}' - high failure rate suggests prompt or tool issues",
"recommendation": (
f"Review and improve node '{node_id}' - high failure rate suggests prompt or tool issues"
),
"priority": "high" if failure_rate > 0.3 else "medium",
})
}
)
# Suggestion: Address common failures
for failure, count in patterns.common_failures:
if count >= 2:
suggestions.append({
suggestions.append(
{
"type": "error_handling",
"target": failure,
"reason": f"Error occurred {count} times",
"recommendation": f"Add handling for: {failure}",
"priority": "high" if count >= 5 else "medium",
})
}
)
# Suggestion: Overall success rate
if patterns.success_rate < 0.8:
suggestions.append({
suggestions.append(
{
"type": "architecture",
"target": goal_id,
"reason": f"Goal success rate is only {patterns.success_rate:.1%}",
"recommendation": "Consider restructuring the agent graph or improving goal definition",
"recommendation": ("Consider restructuring the agent graph or improving goal definition"),
"priority": "high",
})
}
)
return suggestions
@@ -408,9 +409,8 @@ class BuilderQuery:
alternatives = [o for o in decision.options if o.id != decision.chosen_option_id]
if alternatives:
alt_desc = alternatives[0].description
suggestions.append(
f"Consider alternative: '{alt_desc}' instead of '{chosen.description if chosen else 'unknown'}'"
)
chosen_desc = chosen.description if chosen else "unknown"
suggestions.append(f"Consider alternative: '{alt_desc}' instead of '{chosen_desc}'")
# Check for missing context
if not decision.input_context:
@@ -420,9 +420,8 @@ class BuilderQuery:
# Check for constraint issues
if decision.active_constraints:
suggestions.append(
f"Review constraints: {', '.join(decision.active_constraints)} - may be too restrictive"
)
constraints = ", ".join(decision.active_constraints)
suggestions.append(f"Review constraints: {constraints} - may be too restrictive")
# Check for reported problems with suggestions
for problem in run.problems:
@@ -471,12 +470,10 @@ class BuilderQuery:
# Decision count difference
if len(run1.decisions) != len(run2.decisions):
differences.append(
f"Decision count: {len(run1.decisions)} vs {len(run2.decisions)}"
)
differences.append(f"Decision count: {len(run1.decisions)} vs {len(run2.decisions)}")
# Find first divergence point
for i, (d1, d2) in enumerate(zip(run1.decisions, run2.decisions)):
for i, (d1, d2) in enumerate(zip(run1.decisions, run2.decisions, strict=False)):
if d1.chosen_option_id != d2.chosen_option_id:
differences.append(
f"Diverged at decision {i}: chose '{d1.chosen_option_id}' vs '{d2.chosen_option_id}'"
+70 -49
View File
@@ -13,20 +13,22 @@ Each step requires validation and human approval before proceeding.
You cannot skip steps or bypass validation.
"""
from collections.abc import Callable
from datetime import datetime
from enum import Enum
from pathlib import Path
from datetime import datetime
from typing import Any, Callable
from typing import Any
from pydantic import BaseModel, Field
from framework.graph.edge import EdgeCondition, EdgeSpec, GraphSpec
from framework.graph.goal import Goal
from framework.graph.node import NodeSpec
from framework.graph.edge import EdgeSpec, EdgeCondition, GraphSpec
class BuildPhase(str, Enum):
"""Current phase of the build process."""
INIT = "init" # Just started
GOAL_DRAFT = "goal_draft" # Drafting goal
GOAL_APPROVED = "goal_approved" # Goal approved
@@ -39,6 +41,7 @@ class BuildPhase(str, Enum):
class ValidationResult(BaseModel):
"""Result of a validation check."""
valid: bool
errors: list[str] = Field(default_factory=list)
warnings: list[str] = Field(default_factory=list)
@@ -47,6 +50,7 @@ class ValidationResult(BaseModel):
class TestCase(BaseModel):
"""A test case for validating agent behavior."""
id: str
description: str
input: dict[str, Any]
@@ -56,6 +60,7 @@ class TestCase(BaseModel):
class TestResult(BaseModel):
"""Result of running a test case."""
test_id: str
passed: bool
actual_output: Any = None
@@ -69,6 +74,7 @@ class BuildSession(BaseModel):
Saved after each approved step so you can resume later.
"""
id: str
name: str
phase: BuildPhase = BuildPhase.INIT
@@ -457,11 +463,14 @@ class GraphBuilder:
# Run the test
import asyncio
result = asyncio.run(executor.execute(
result = asyncio.run(
executor.execute(
graph=graph,
goal=self.session.goal,
input_data=test.input,
))
)
)
# Check result
passed = result.success
@@ -515,12 +524,14 @@ class GraphBuilder:
if not self._pending_validation.valid:
return False
self.session.approvals.append({
self.session.approvals.append(
{
"phase": self.session.phase.value,
"comment": comment,
"timestamp": datetime.now().isoformat(),
"validation": self._pending_validation.model_dump(),
})
}
)
# Advance phase if appropriate
if self.session.phase == BuildPhase.GOAL_DRAFT:
@@ -554,11 +565,13 @@ class GraphBuilder:
return False
self.session.phase = BuildPhase.APPROVED
self.session.approvals.append({
self.session.approvals.append(
{
"phase": "final",
"comment": comment,
"timestamp": datetime.now().isoformat(),
})
}
)
self._save_session()
return True
@@ -630,69 +643,75 @@ class GraphBuilder:
"""Generate Python code for the graph."""
lines = [
'"""',
f'Generated agent: {self.session.name}',
f'Generated at: {datetime.now().isoformat()}',
f"Generated agent: {self.session.name}",
f"Generated at: {datetime.now().isoformat()}",
'"""',
'',
'from framework.graph import (',
' Goal, SuccessCriterion, Constraint,',
' NodeSpec, EdgeSpec, EdgeCondition,',
')',
'from framework.graph.edge import GraphSpec',
'from framework.graph.goal import GoalStatus',
'',
'',
'# Goal',
"",
"from framework.graph import (",
" Goal, SuccessCriterion, Constraint,",
" NodeSpec, EdgeSpec, EdgeCondition,",
")",
"from framework.graph.edge import GraphSpec",
"from framework.graph.goal import GoalStatus",
"",
"",
"# Goal",
]
if self.session.goal:
goal_json = self.session.goal.model_dump_json(indent=4)
lines.append('GOAL = Goal.model_validate_json(\'\'\'')
lines.append("GOAL = Goal.model_validate_json('''")
lines.append(goal_json)
lines.append("''')")
else:
lines.append('GOAL = None')
lines.append("GOAL = None")
lines.extend([
'',
'',
'# Nodes',
'NODES = [',
])
lines.extend(
[
"",
"",
"# Nodes",
"NODES = [",
]
)
for node in self.session.nodes:
node_json = node.model_dump_json(indent=4)
lines.append(' NodeSpec.model_validate_json(\'\'\'')
lines.append(" NodeSpec.model_validate_json('''")
lines.append(node_json)
lines.append(" '''),")
lines.extend([
']',
'',
'',
'# Edges',
'EDGES = [',
])
lines.extend(
[
"]",
"",
"",
"# Edges",
"EDGES = [",
]
)
for edge in self.session.edges:
edge_json = edge.model_dump_json(indent=4)
lines.append(' EdgeSpec.model_validate_json(\'\'\'')
lines.append(" EdgeSpec.model_validate_json('''")
lines.append(edge_json)
lines.append(" '''),")
lines.extend([
']',
'',
'',
'# Graph',
])
lines.extend(
[
"]",
"",
"",
"# Graph",
]
)
graph_json = graph.model_dump_json(indent=4)
lines.append('GRAPH = GraphSpec.model_validate_json(\'\'\'')
lines.append("GRAPH = GraphSpec.model_validate_json('''")
lines.append(graph_json)
lines.append("''')")
return '\n'.join(lines)
return "\n".join(lines)
# =========================================================================
# SESSION MANAGEMENT
@@ -743,7 +762,7 @@ class GraphBuilder:
"tests": len(self.session.test_cases),
"tests_passed": sum(1 for t in self.session.test_results if t.passed),
"approvals": len(self.session.approvals),
"pending_validation": self._pending_validation.model_dump() if self._pending_validation else None,
"pending_validation": (self._pending_validation.model_dump() if self._pending_validation else None),
}
def show(self) -> str:
@@ -755,11 +774,13 @@ class GraphBuilder:
]
if self.session.goal:
lines.extend([
lines.extend(
[
f"Goal: {self.session.goal.name}",
f" {self.session.goal.description}",
"",
])
]
)
if self.session.nodes:
lines.append("Nodes:")
+3 -3
View File
@@ -23,9 +23,7 @@ import sys
def main():
parser = argparse.ArgumentParser(
description="Goal Agent - Build and run goal-driven agents"
)
parser = argparse.ArgumentParser(description="Goal Agent - Build and run goal-driven agents")
parser.add_argument(
"--model",
default="claude-haiku-4-5-20251001",
@@ -36,10 +34,12 @@ def main():
# Register runner commands (run, info, validate, list, dispatch, shell)
from framework.runner.cli import register_commands
register_commands(subparsers)
# Register testing commands (test-generate, test-approve, test-run, test-debug, etc.)
from framework.testing.cli import register_testing_commands
register_testing_commands(subparsers)
args = parser.parse_args()
+16 -16
View File
@@ -1,32 +1,32 @@
"""Graph structures: Goals, Nodes, Edges, and Flexible Execution."""
from framework.graph.goal import Goal, SuccessCriterion, Constraint, GoalStatus
from framework.graph.node import NodeSpec, NodeContext, NodeResult, NodeProtocol
from framework.graph.edge import EdgeSpec, EdgeCondition
from framework.graph.code_sandbox import CodeSandbox, safe_eval, safe_exec
from framework.graph.edge import EdgeCondition, EdgeSpec
from framework.graph.executor import GraphExecutor
from framework.graph.flexible_executor import ExecutorConfig, FlexibleGraphExecutor
from framework.graph.goal import Constraint, Goal, GoalStatus, SuccessCriterion
from framework.graph.judge import HybridJudge, create_default_judge
from framework.graph.node import NodeContext, NodeProtocol, NodeResult, NodeSpec
# Flexible execution (Worker-Judge pattern)
from framework.graph.plan import (
Plan,
PlanStep,
ActionSpec,
ActionType,
StepStatus,
Judgment,
JudgmentAction,
EvaluationRule,
PlanExecutionResult,
ExecutionStatus,
load_export,
# HITL (Human-in-the-loop)
ApprovalDecision,
ApprovalRequest,
ApprovalResult,
EvaluationRule,
ExecutionStatus,
Judgment,
JudgmentAction,
Plan,
PlanExecutionResult,
PlanStep,
StepStatus,
load_export,
)
from framework.graph.judge import HybridJudge, create_default_judge
from framework.graph.worker_node import WorkerNode, StepExecutionResult
from framework.graph.flexible_executor import FlexibleGraphExecutor, ExecutorConfig
from framework.graph.code_sandbox import CodeSandbox, safe_exec, safe_eval
from framework.graph.worker_node import StepExecutionResult, WorkerNode
__all__ = [
# Goal
+16 -20
View File
@@ -13,11 +13,11 @@ Security measures:
"""
import ast
import sys
import signal
from typing import Any
from dataclasses import dataclass, field
import sys
from contextlib import contextmanager
from dataclasses import dataclass, field
from typing import Any
# Safe builtins whitelist
SAFE_BUILTINS = {
@@ -25,7 +25,6 @@ SAFE_BUILTINS = {
"True": True,
"False": False,
"None": None,
# Type constructors
"bool": bool,
"int": int,
@@ -36,7 +35,6 @@ SAFE_BUILTINS = {
"set": set,
"tuple": tuple,
"frozenset": frozenset,
# Basic functions
"abs": abs,
"all": all,
@@ -97,22 +95,26 @@ BLOCKED_AST_NODES = {
class CodeSandboxError(Exception):
"""Error during sandboxed code execution."""
pass
class TimeoutError(CodeSandboxError):
"""Code execution timed out."""
pass
class SecurityError(CodeSandboxError):
"""Code contains potentially dangerous operations."""
pass
@dataclass
class SandboxResult:
"""Result of sandboxed code execution."""
success: bool
result: Any = None
error: str | None = None
@@ -134,6 +136,7 @@ class RestrictedImporter:
if name not in self._cache:
import importlib
self._cache[name] = importlib.import_module(name)
return self._cache[name]
@@ -161,24 +164,19 @@ class CodeValidator:
for node in ast.walk(tree):
# Check for blocked node types
if type(node) in self.blocked_nodes:
issues.append(
f"Blocked operation: {type(node).__name__} at line {getattr(node, 'lineno', '?')}"
)
lineno = getattr(node, "lineno", "?")
issues.append(f"Blocked operation: {type(node).__name__} at line {lineno}")
# Check for dangerous attribute access
if isinstance(node, ast.Attribute):
if node.attr.startswith("_"):
issues.append(
f"Access to private attribute '{node.attr}' at line {node.lineno}"
)
issues.append(f"Access to private attribute '{node.attr}' at line {node.lineno}")
# Check for exec/eval calls
if isinstance(node, ast.Call):
if isinstance(node.func, ast.Name):
if node.func.id in ("exec", "eval", "compile", "__import__"):
issues.append(
f"Blocked function call: {node.func.id} at line {node.lineno}"
)
issues.append(f"Blocked function call: {node.func.id} at line {node.lineno}")
return issues
@@ -212,11 +210,12 @@ class CodeSandbox:
@contextmanager
def _timeout_context(self, seconds: int):
"""Context manager for timeout enforcement."""
def handler(signum, frame):
raise TimeoutError(f"Code execution timed out after {seconds} seconds")
# Only works on Unix-like systems
if hasattr(signal, 'SIGALRM'):
if hasattr(signal, "SIGALRM"):
old_handler = signal.signal(signal.SIGALRM, handler)
signal.alarm(seconds)
try:
@@ -275,6 +274,7 @@ class CodeSandbox:
# Capture stdout
import io
old_stdout = sys.stdout
sys.stdout = captured_stdout = io.StringIO()
@@ -296,11 +296,7 @@ class CodeSandbox:
# Also extract any new variables (not in inputs or builtins)
for key, value in namespace.items():
if (
key not in inputs
and key not in self.safe_builtins
and not key.startswith("_")
):
if key not in inputs and key not in self.safe_builtins and not key.startswith("_"):
extracted[key] = value
return SandboxResult(
+19 -31
View File
@@ -21,14 +21,15 @@ allowing the LLM to evaluate whether proceeding along an edge makes sense
given the current goal, context, and execution state.
"""
from typing import Any
from enum import Enum
from typing import Any
from pydantic import BaseModel, Field
class EdgeCondition(str, Enum):
"""When an edge should be traversed."""
ALWAYS = "always" # Always after source completes
ON_SUCCESS = "on_success" # Only if source succeeds
ON_FAILURE = "on_failure" # Only if source fails
@@ -68,6 +69,7 @@ class EdgeSpec(BaseModel):
description="Only filter if results need refinement to meet goal",
)
"""
id: str
source: str = Field(description="Source node ID")
target: str = Field(description="Target node ID")
@@ -76,20 +78,17 @@ class EdgeSpec(BaseModel):
condition: EdgeCondition = EdgeCondition.ALWAYS
condition_expr: str | None = Field(
default=None,
description="Expression for CONDITIONAL edges, e.g., 'output.confidence > 0.8'"
description="Expression for CONDITIONAL edges, e.g., 'output.confidence > 0.8'",
)
# Data flow
input_mapping: dict[str, str] = Field(
default_factory=dict,
description="Map source outputs to target inputs: {target_key: source_key}"
description="Map source outputs to target inputs: {target_key: source_key}",
)
# Priority for multiple outgoing edges
priority: int = Field(
default=0,
description="Higher priority edges are evaluated first"
)
priority: int = Field(default=0, description="Higher priority edges are evaluated first")
# Metadata
description: str = ""
@@ -175,6 +174,7 @@ class EdgeSpec(BaseModel):
except Exception as e:
# Log the error for debugging
import logging
logger = logging.getLogger(__name__)
logger.warning(f" ⚠ Condition evaluation failed: {self.condition_expr}")
logger.warning(f" Error: {e}")
@@ -235,7 +235,8 @@ Respond with ONLY a JSON object:
# Parse response
import re
json_match = re.search(r'\{[^{}]*\}', response.content, re.DOTALL)
json_match = re.search(r"\{[^{}]*\}", response.content, re.DOTALL)
if json_match:
data = json.loads(json_match.group())
proceed = data.get("proceed", False)
@@ -243,6 +244,7 @@ Respond with ONLY a JSON object:
# Log the decision (using basic print for now)
import logging
logger = logging.getLogger(__name__)
logger.info(f" 🤔 LLM routing decision: {'PROCEED' if proceed else 'SKIP'}")
logger.info(f" Reason: {reasoning}")
@@ -252,6 +254,7 @@ Respond with ONLY a JSON object:
except Exception as e:
# Fallback: proceed on success
import logging
logger = logging.getLogger(__name__)
logger.warning(f" ⚠ LLM routing failed, defaulting to on_success: {e}")
return source_success
@@ -304,6 +307,7 @@ class GraphSpec(BaseModel):
edges=[...],
)
"""
id: str
goal_id: str
version: str = "1.0.0"
@@ -312,42 +316,26 @@ class GraphSpec(BaseModel):
entry_node: str = Field(description="ID of the first node to execute")
entry_points: dict[str, str] = Field(
default_factory=dict,
description="Named entry points for resuming execution. Format: {name: node_id}"
)
terminal_nodes: list[str] = Field(
default_factory=list,
description="IDs of nodes that end execution"
)
pause_nodes: list[str] = Field(
default_factory=list,
description="IDs of nodes that pause execution for HITL input"
description="Named entry points for resuming execution. Format: {name: node_id}",
)
terminal_nodes: list[str] = Field(default_factory=list, description="IDs of nodes that end execution")
pause_nodes: list[str] = Field(default_factory=list, description="IDs of nodes that pause execution for HITL input")
# Components
nodes: list[Any] = Field( # NodeSpec, but avoiding circular import
default_factory=list,
description="All node specifications"
)
edges: list[EdgeSpec] = Field(
default_factory=list,
description="All edge specifications"
default_factory=list, description="All node specifications"
)
edges: list[EdgeSpec] = Field(default_factory=list, description="All edge specifications")
# Shared memory keys
memory_keys: list[str] = Field(
default_factory=list,
description="Keys available in shared memory"
)
memory_keys: list[str] = Field(default_factory=list, description="Keys available in shared memory")
# Default LLM settings
default_model: str = "claude-haiku-4-5-20251001"
max_tokens: int = 1024
# Execution limits
max_steps: int = Field(
default=100,
description="Maximum node executions before timeout"
)
max_steps: int = Field(default=100, description="Maximum node executions before timeout")
max_retries_per_node: int = 3
# Metadata
+18 -16
View File
@@ -10,28 +10,30 @@ The executor:
"""
import logging
from typing import Any, Callable
from collections.abc import Callable
from dataclasses import dataclass, field
from typing import Any
from framework.runtime.core import Runtime
from framework.graph.edge import GraphSpec
from framework.graph.goal import Goal
from framework.graph.node import (
NodeSpec,
NodeContext,
NodeResult,
NodeProtocol,
SharedMemory,
LLMNode,
RouterNode,
FunctionNode,
LLMNode,
NodeContext,
NodeProtocol,
NodeResult,
NodeSpec,
RouterNode,
SharedMemory,
)
from framework.graph.edge import GraphSpec
from framework.llm.provider import LLMProvider, Tool
from framework.runtime.core import Runtime
@dataclass
class ExecutionResult:
"""Result of executing a graph."""
success: bool
output: dict[str, Any] = field(default_factory=dict)
error: str | None = None
@@ -125,7 +127,8 @@ class GraphExecutor:
# Restore memory from previous session
for key, value in session_state["memory"].items():
memory.write(key, value)
self.logger.info(f"📥 Restored session state with {len(session_state['memory'])} memory keys")
num_keys = len(session_state["memory"])
self.logger.info(f"📥 Restored session state with {num_keys} memory keys")
# Write new input data to memory (each key individually)
if input_data:
@@ -212,7 +215,9 @@ class GraphExecutor:
result = await node_impl.execute(ctx)
if result.success:
self.logger.info(f" ✓ Success (tokens: {result.tokens_used}, latency: {result.latency_ms}ms)")
tokens = result.tokens_used
latency = result.latency_ms
self.logger.info(f" ✓ Success (tokens: {tokens}, latency: {latency}ms)")
# Generate and log human-readable summary
summary = result.to_summary(node_spec)
@@ -393,10 +398,7 @@ class GraphExecutor:
if node_spec.node_type == "function":
# Function nodes need explicit registration
raise RuntimeError(
f"Function node '{node_spec.id}' not registered. "
"Register with node_registry."
)
raise RuntimeError(f"Function node '{node_spec.id}' not registered. Register with node_registry.")
# Default to LLM node
return LLMNode(tool_executor=self.tool_executor)
+24 -17
View File
@@ -15,28 +15,29 @@ using a Worker-Judge loop:
This keeps planning external while execution/evaluation is internal.
"""
from typing import Any, Callable
from collections.abc import Callable
from dataclasses import dataclass
from datetime import datetime
from typing import Any
from framework.runtime.core import Runtime
from framework.graph.code_sandbox import CodeSandbox
from framework.graph.goal import Goal
from framework.graph.judge import HybridJudge, create_default_judge
from framework.graph.plan import (
Plan,
PlanStep,
PlanExecutionResult,
ExecutionStatus,
StepStatus,
Judgment,
JudgmentAction,
ApprovalDecision,
ApprovalRequest,
ApprovalResult,
ApprovalDecision,
ExecutionStatus,
Judgment,
JudgmentAction,
Plan,
PlanExecutionResult,
PlanStep,
StepStatus,
)
from framework.graph.judge import HybridJudge, create_default_judge
from framework.graph.worker_node import WorkerNode, StepExecutionResult
from framework.graph.code_sandbox import CodeSandbox
from framework.graph.worker_node import StepExecutionResult, WorkerNode
from framework.llm.provider import LLMProvider, Tool
from framework.runtime.core import Runtime
# Type alias for approval callback
ApprovalCallback = Callable[[ApprovalRequest], ApprovalResult]
@@ -45,6 +46,7 @@ ApprovalCallback = Callable[[ApprovalRequest], ApprovalResult]
@dataclass
class ExecutorConfig:
"""Configuration for FlexibleGraphExecutor."""
max_retries_per_step: int = 3
max_total_steps: int = 100
timeout_seconds: int = 300
@@ -165,7 +167,7 @@ class FlexibleGraphExecutor:
status=ExecutionStatus.NEEDS_REPLAN,
plan=plan,
context=context,
feedback="No executable steps available but plan not complete. Check dependencies.",
feedback=("No executable steps available but plan not complete. Check dependencies."),
steps_executed=steps_executed,
total_tokens=total_tokens,
total_latency=total_latency,
@@ -174,7 +176,8 @@ class FlexibleGraphExecutor:
# Execute next step (for now, sequential; could be parallel)
step = ready_steps[0]
# Debug: show ready steps
# print(f" [DEBUG] Ready steps: {[s.id for s in ready_steps]}, executing: {step.id}")
# ready_ids = [s.id for s in ready_steps]
# print(f" [DEBUG] Ready steps: {ready_ids}, executing: {step.id}")
# APPROVAL CHECK - before execution
if step.requires_approval:
@@ -360,7 +363,7 @@ class FlexibleGraphExecutor:
status=ExecutionStatus.NEEDS_REPLAN,
plan=plan,
context=context,
feedback=f"Step '{step.id}' failed after {step.attempts} attempts: {judgment.feedback}",
feedback=(f"Step '{step.id}' failed after {step.attempts} attempts: {judgment.feedback}"),
steps_executed=steps_executed,
total_tokens=total_tokens,
total_latency=total_latency,
@@ -450,12 +453,16 @@ class FlexibleGraphExecutor:
preview_parts.append(f"Tool: {step.action.tool_name}")
if step.action.tool_args:
import json
args_preview = json.dumps(step.action.tool_args, indent=2, default=str)
if len(args_preview) > 500:
args_preview = args_preview[:500] + "..."
preview_parts.append(f"Args: {args_preview}")
elif step.action.prompt:
prompt_preview = step.action.prompt[:300] + "..." if len(step.action.prompt) > 300 else step.action.prompt
if len(step.action.prompt) > 300:
prompt_preview = step.action.prompt[:300] + "..."
else:
prompt_preview = step.action.prompt
preview_parts.append(f"Prompt: {prompt_preview}")
# Include step inputs resolved from context (what will be sent/used)
+16 -37
View File
@@ -12,14 +12,15 @@ Goals are:
"""
from datetime import datetime
from typing import Any
from enum import Enum
from typing import Any
from pydantic import BaseModel, Field
class GoalStatus(str, Enum):
"""Lifecycle status of a goal."""
DRAFT = "draft" # Being defined
READY = "ready" # Ready for agent creation
ACTIVE = "active" # Has an agent graph, can execute
@@ -37,22 +38,12 @@ class SuccessCriterion(BaseModel):
- Measurable: Can be evaluated programmatically or by LLM
- Achievable: Within the agent's capabilities
"""
id: str
description: str = Field(
description="Human-readable description of what success looks like"
)
metric: str = Field(
description="How to measure: 'output_contains', 'output_equals', 'llm_judge', 'custom'"
)
target: Any = Field(
description="The target value or condition"
)
weight: float = Field(
default=1.0,
ge=0.0,
le=1.0,
description="Relative importance (0-1)"
)
description: str = Field(description="Human-readable description of what success looks like")
metric: str = Field(description="How to measure: 'output_contains', 'output_equals', 'llm_judge', 'custom'")
target: Any = Field(description="The target value or condition")
weight: float = Field(default=1.0, ge=0.0, le=1.0, description="Relative importance (0-1)")
met: bool = False
model_config = {"extra": "allow"}
@@ -66,19 +57,12 @@ class Constraint(BaseModel):
- Hard: Violation means failure
- Soft: Violation is discouraged but allowed
"""
id: str
description: str
constraint_type: str = Field(
description="Type: 'hard' (must not violate) or 'soft' (prefer not to violate)"
)
category: str = Field(
default="general",
description="Category: 'time', 'cost', 'safety', 'scope', 'quality'"
)
check: str = Field(
default="",
description="How to check: expression, function name, or 'llm_judge'"
)
constraint_type: str = Field(description="Type: 'hard' (must not violate) or 'soft' (prefer not to violate)")
category: str = Field(default="general", description="Category: 'time', 'cost', 'safety', 'scope', 'quality'")
check: str = Field(default="", description="How to check: expression, function name, or 'llm_judge'")
model_config = {"extra": "allow"}
@@ -119,6 +103,7 @@ class Goal(BaseModel):
]
)
"""
id: str
name: str
description: str
@@ -133,24 +118,18 @@ class Goal(BaseModel):
# Context for the agent
context: dict[str, Any] = Field(
default_factory=dict,
description="Additional context: domain knowledge, user preferences, etc."
description="Additional context: domain knowledge, user preferences, etc.",
)
# Capabilities required
required_capabilities: list[str] = Field(
default_factory=list,
description="What the agent needs: 'llm', 'web_search', 'code_execution', etc."
description="What the agent needs: 'llm', 'web_search', 'code_execution', etc.",
)
# Input/output schema
input_schema: dict[str, Any] = Field(
default_factory=dict,
description="Expected input format"
)
output_schema: dict[str, Any] = Field(
default_factory=dict,
description="Expected output format"
)
input_schema: dict[str, Any] = Field(default_factory=dict, description="Expected input format")
output_schema: dict[str, Any] = Field(default_factory=dict, description="Expected output format")
# Versioning for evolution
version: str = "1.0.0"
+10 -7
View File
@@ -12,6 +12,7 @@ from typing import Any
class HITLInputType(str, Enum):
"""Type of input expected from human."""
FREE_TEXT = "free_text" # Open-ended text response
STRUCTURED = "structured" # Specific fields to fill
SELECTION = "selection" # Choose from options
@@ -22,6 +23,7 @@ class HITLInputType(str, Enum):
@dataclass
class HITLQuestion:
"""A single question to ask the human."""
id: str
question: str
input_type: HITLInputType = HITLInputType.FREE_TEXT
@@ -44,6 +46,7 @@ class HITLRequest:
This is what the agent produces when it needs human input.
"""
# Context
objective: str # What we're trying to accomplish
current_state: str # Where we are in the process
@@ -92,6 +95,7 @@ class HITLResponse:
This is what gets passed back when resuming from a pause.
"""
# Original request reference
request_id: str
@@ -170,13 +174,11 @@ class HITLProtocol:
# Use Haiku to extract answers
try:
import anthropic
import json
questions_str = "\n".join([
f"{i+1}. {q.question} (id: {q.id})"
for i, q in enumerate(request.questions)
])
import anthropic
questions_str = "\n".join([f"{i + 1}. {q.question} (id: {q.id})" for i, q in enumerate(request.questions)])
prompt = f"""Parse the user's response and extract answers for each question.
@@ -195,13 +197,14 @@ Example format:
message = client.messages.create(
model="claude-3-5-haiku-20241022",
max_tokens=500,
messages=[{"role": "user", "content": prompt}]
messages=[{"role": "user", "content": prompt}],
)
# Parse Haiku's response
import re
response_text = message.content[0].text.strip()
json_match = re.search(r'\{[^{}]*\}', response_text, re.DOTALL)
json_match = re.search(r"\{[^{}]*\}", response_text, re.DOTALL)
if json_match:
parsed = json.loads(json_match.group())
+34 -19
View File
@@ -8,23 +8,24 @@ The HybridJudge evaluates step execution results using:
Escalation path: rules → LLM → human
"""
from typing import Any
from dataclasses import dataclass, field
from typing import Any
from framework.graph.code_sandbox import safe_eval
from framework.graph.goal import Goal
from framework.graph.plan import (
PlanStep,
EvaluationRule,
Judgment,
JudgmentAction,
EvaluationRule,
PlanStep,
)
from framework.graph.goal import Goal
from framework.graph.code_sandbox import safe_eval
from framework.llm.provider import LLMProvider
@dataclass
class RuleEvaluationResult:
"""Result of rule-based evaluation."""
is_definitive: bool # True if a rule matched definitively
judgment: Judgment | None = None
context: dict[str, Any] = field(default_factory=dict)
@@ -136,9 +137,9 @@ class HybridJudge:
# Build evaluation context
eval_context = {
"step": step.model_dump() if hasattr(step, 'model_dump') else step,
"step": step.model_dump() if hasattr(step, "model_dump") else step,
"result": result,
"goal": goal.model_dump() if hasattr(goal, 'model_dump') else goal,
"goal": goal.model_dump() if hasattr(goal, "model_dump") else goal,
"context": context,
"success": isinstance(result, dict) and result.get("success", False),
"error": isinstance(result, dict) and result.get("error"),
@@ -216,7 +217,9 @@ class HybridJudge:
# Low confidence - escalate
return Judgment(
action=JudgmentAction.ESCALATE,
reasoning=f"LLM confidence ({judgment.confidence:.2f}) below threshold ({self.llm_confidence_threshold})",
reasoning=(
f"LLM confidence ({judgment.confidence:.2f}) below threshold ({self.llm_confidence_threshold})"
),
feedback=judgment.feedback,
confidence=judgment.confidence,
llm_used=True,
@@ -338,52 +341,64 @@ def create_default_judge(llm: LLMProvider | None = None) -> HybridJudge:
judge = HybridJudge(llm=llm)
# Rule: Accept on explicit success flag
judge.add_rule(EvaluationRule(
judge.add_rule(
EvaluationRule(
id="explicit_success",
description="Step explicitly marked as successful",
condition="isinstance(result, dict) and result.get('success') == True",
action=JudgmentAction.ACCEPT,
priority=100,
))
)
)
# Rule: Retry on transient errors
judge.add_rule(EvaluationRule(
judge.add_rule(
EvaluationRule(
id="transient_error_retry",
description="Transient error that can be retried",
condition="isinstance(result, dict) and result.get('error_type') in ['timeout', 'rate_limit', 'connection_error']",
condition=(
"isinstance(result, dict) and result.get('error_type') in ['timeout', 'rate_limit', 'connection_error']"
),
action=JudgmentAction.RETRY,
feedback_template="Transient error: {result[error]}. Please retry.",
priority=90,
))
)
)
# Rule: Replan on missing data
judge.add_rule(EvaluationRule(
judge.add_rule(
EvaluationRule(
id="missing_data_replan",
description="Required data not available",
condition="isinstance(result, dict) and result.get('error_type') == 'missing_data'",
action=JudgmentAction.REPLAN,
feedback_template="Missing required data: {result[error]}. Plan needs adjustment.",
priority=80,
))
)
)
# Rule: Escalate on security issues
judge.add_rule(EvaluationRule(
judge.add_rule(
EvaluationRule(
id="security_escalate",
description="Security issue detected",
condition="isinstance(result, dict) and result.get('error_type') == 'security'",
action=JudgmentAction.ESCALATE,
feedback_template="Security issue detected: {result[error]}",
priority=200,
))
)
)
# Rule: Fail on max retries exceeded
judge.add_rule(EvaluationRule(
judge.add_rule(
EvaluationRule(
id="max_retries_fail",
description="Maximum retries exceeded",
condition="step.get('attempts', 0) >= step.get('max_retries', 3)",
action=JudgmentAction.REPLAN,
feedback_template="Step '{step[id]}' failed after {step[attempts]} attempts",
priority=150,
))
)
)
return judge
+96 -91
View File
@@ -17,13 +17,14 @@ Protocol:
import logging
from abc import ABC, abstractmethod
from typing import Any, Callable
from collections.abc import Callable
from dataclasses import dataclass, field
from typing import Any
from pydantic import BaseModel, Field
from framework.runtime.core import Runtime
from framework.llm.provider import LLMProvider, Tool
from framework.runtime.core import Runtime
logger = logging.getLogger(__name__)
@@ -48,6 +49,7 @@ class NodeSpec(BaseModel):
system_prompt="You are a calculator..."
)
"""
id: str
name: str
description: str
@@ -55,51 +57,27 @@ class NodeSpec(BaseModel):
# Node behavior type
node_type: str = Field(
default="llm_tool_use",
description="Type: 'llm_tool_use', 'llm_generate', 'function', 'router', 'human_input'"
description="Type: 'llm_tool_use', 'llm_generate', 'function', 'router', 'human_input'",
)
# Data flow
input_keys: list[str] = Field(
default_factory=list,
description="Keys this node reads from shared memory or input"
)
output_keys: list[str] = Field(
default_factory=list,
description="Keys this node writes to shared memory or output"
)
input_keys: list[str] = Field(default_factory=list, description="Keys this node reads from shared memory or input")
output_keys: list[str] = Field(default_factory=list, description="Keys this node writes to shared memory or output")
# For LLM nodes
system_prompt: str | None = Field(
default=None,
description="System prompt for LLM nodes"
)
tools: list[str] = Field(
default_factory=list,
description="Tool names this node can use"
)
model: str | None = Field(
default=None,
description="Specific model to use (defaults to graph default)"
)
system_prompt: str | None = Field(default=None, description="System prompt for LLM nodes")
tools: list[str] = Field(default_factory=list, description="Tool names this node can use")
model: str | None = Field(default=None, description="Specific model to use (defaults to graph default)")
# For function nodes
function: str | None = Field(
default=None,
description="Function name or path for function nodes"
)
function: str | None = Field(default=None, description="Function name or path for function nodes")
# For router nodes
routes: dict[str, str] = Field(
default_factory=dict,
description="Condition -> target_node_id mapping for routers"
)
routes: dict[str, str] = Field(default_factory=dict, description="Condition -> target_node_id mapping for routers")
# Retry behavior
max_retries: int = Field(default=3)
retry_on: list[str] = Field(
default_factory=list,
description="Error types to retry on"
)
retry_on: list[str] = Field(default_factory=list, description="Error types to retry on")
model_config = {"extra": "allow"}
@@ -112,6 +90,7 @@ class SharedMemory:
Nodes read and write to shared memory using typed keys.
The memory is scoped to a single run.
"""
_data: dict[str, Any] = field(default_factory=dict)
_allowed_read: set[str] = field(default_factory=set)
_allowed_write: set[str] = field(default_factory=set)
@@ -159,6 +138,7 @@ class NodeContext:
- Access to tools (for actions)
- The goal context (for guidance)
"""
# Core runtime
runtime: Runtime
@@ -194,6 +174,7 @@ class NodeResult:
- State changes made
- Route decision (for routers)
"""
success: bool
output: dict[str, Any] = field(default_factory=dict)
error: str | None = None
@@ -221,6 +202,7 @@ class NodeResult:
# Use Haiku to generate intelligent summary
import os
api_key = os.environ.get("ANTHROPIC_API_KEY")
if not api_key:
@@ -235,25 +217,28 @@ class NodeResult:
# Use Haiku to generate intelligent summary
try:
import anthropic
import json
import anthropic
node_context = ""
if node_spec:
node_context = f"\nNode: {node_spec.name}\nPurpose: {node_spec.description}"
prompt = f"""Generate a 1-2 sentence human-readable summary of what this node produced.{node_context}
Node output:
{json.dumps(self.output, indent=2, default=str)[:2000]}
Provide a concise, clear summary that a human can quickly understand. Focus on the key information produced."""
output_json = json.dumps(self.output, indent=2, default=str)[:2000]
prompt = (
"Generate a 1-2 sentence human-readable summary of what this "
f"node produced.{node_context}\n\n"
f"Node output:\n{output_json}\n\n"
"Provide a concise, clear summary that a human can quickly "
"understand. Focus on the key information produced."
)
client = anthropic.Anthropic(api_key=api_key)
message = client.messages.create(
model="claude-3-5-haiku-20241022",
max_tokens=200,
messages=[{"role": "user", "content": prompt}]
messages=[{"role": "user", "content": prompt}],
)
summary = message.content[0].text.strip()
@@ -381,17 +366,21 @@ class LLMNode(NodeProtocol):
# Log the LLM call details
logger.info(" 🤖 LLM Call:")
logger.info(f" System: {system[:150]}..." if len(system) > 150 else f" System: {system}")
logger.info(f" User message: {messages[-1]['content'][:150]}..." if len(messages[-1]['content']) > 150 else f" User message: {messages[-1]['content']}")
sys_preview = system[:150] + "..." if len(system) > 150 else system
logger.info(f" System: {sys_preview}")
user_msg = messages[-1]["content"]
msg_preview = user_msg[:150] + "..." if len(user_msg) > 150 else user_msg
logger.info(f" User message: {msg_preview}")
if ctx.available_tools:
logger.info(f" Tools available: {[t.name for t in ctx.available_tools]}")
# Call LLM
if ctx.available_tools and self.tool_executor:
from framework.llm.provider import ToolUse, ToolResult
from framework.llm.provider import ToolResult, ToolUse
def executor(tool_use: ToolUse) -> ToolResult:
logger.info(f" 🔧 Tool call: {tool_use.name}({', '.join(f'{k}={v}' for k, v in tool_use.input.items())})")
args_str = ", ".join(f"{k}={v}" for k, v in tool_use.input.items())
logger.info(f" 🔧 Tool call: {tool_use.name}({args_str})")
result = self.tool_executor(tool_use)
# Truncate long results
result_str = str(result.content)[:150]
@@ -413,9 +402,10 @@ class LLMNode(NodeProtocol):
)
# Log the response
response_preview = response.content[:200] if len(response.content) > 200 else response.content
if len(response.content) > 200:
response_preview += "..."
response_preview = response.content[:200] + "..."
else:
response_preview = response.content
logger.info(f" ← Response: {response_preview}")
latency_ms = int((time.time() - start) * 1000)
@@ -432,7 +422,9 @@ class LLMNode(NodeProtocol):
output = self._parse_output(response.content, ctx.node_spec)
# For llm_generate and llm_tool_use nodes, try to parse JSON and extract fields
if ctx.node_spec.node_type in ("llm_generate", "llm_tool_use") and len(ctx.node_spec.output_keys) > 1:
is_llm_node = ctx.node_spec.node_type in ("llm_generate", "llm_tool_use")
has_multiple_outputs = len(ctx.node_spec.output_keys) > 1
if is_llm_node and has_multiple_outputs:
try:
import json
@@ -446,7 +438,8 @@ class LLMNode(NodeProtocol):
ctx.memory.write(key, parsed[key])
output[key] = parsed[key]
elif key in ctx.input_data:
# Key not in parsed JSON but exists in input - pass through input value
# Key not in parsed JSON but exists in input
# Pass through input value
ctx.memory.write(key, ctx.input_data[key])
output[key] = ctx.input_data[key]
else:
@@ -508,7 +501,7 @@ class LLMNode(NodeProtocol):
content = raw_response.strip()
# Remove markdown code blocks if present
if content.startswith("```"):
match = re.search(r'```(?:json)?\s*\n?(.*?)\n?```', content, re.DOTALL)
match = re.search(r"```(?:json)?\s*\n?(.*?)\n?```", content, re.DOTALL)
if match:
content = match.group(1).strip()
@@ -520,15 +513,16 @@ class LLMNode(NodeProtocol):
# JSON parse failed - use Haiku to extract clean JSON
import os
api_key = os.environ.get("ANTHROPIC_API_KEY")
if not api_key:
# No API key, try one more simple extraction
try:
# Find first { and last }
start = raw_response.find('{')
end = raw_response.rfind('}')
start = raw_response.find("{")
end = raw_response.rfind("}")
if start != -1 and end != -1:
json_str = raw_response[start:end+1]
json_str = raw_response[start : end + 1]
return json.loads(json_str)
except (ValueError, json.JSONDecodeError):
pass
@@ -536,31 +530,31 @@ class LLMNode(NodeProtocol):
# Use Haiku to clean the response
from framework.llm.anthropic import AnthropicProvider
haiku = AnthropicProvider(model="claude-3-5-haiku-20241022")
prompt = f"""Extract the JSON object from this LLM response. Extract ONLY the values that the LLM actually generated.
Expected output keys: {output_keys}
LLM Response:
{raw_response}
IMPORTANT:
- Only extract keys that the LLM explicitly output in its response
- Do NOT include keys that were just mentioned or passed through from input
- If the LLM output multiple pieces of text/JSON, extract the LAST JSON object only
- Output ONLY valid JSON with no extra text, no markdown, no explanations"""
prompt = (
"Extract the JSON object from this LLM response. "
"Extract ONLY the values that the LLM actually generated.\n\n"
f"Expected output keys: {output_keys}\n\n"
f"LLM Response:\n{raw_response}\n\n"
"IMPORTANT:\n"
"- Only extract keys that the LLM explicitly output in its response\n"
"- Do NOT include keys that were just mentioned or passed through from input\n"
"- If the LLM output multiple pieces of text/JSON, extract the LAST JSON object only\n"
"- Output ONLY valid JSON with no extra text, no markdown, no explanations"
)
try:
result = haiku.complete(
messages=[{"role": "user", "content": prompt}],
system="You extract clean JSON from messy responses. Output only valid JSON, nothing else.",
system=("You extract clean JSON from messy responses. Output only valid JSON, nothing else."),
)
cleaned = result.content.strip()
# Remove markdown if Haiku added it
if cleaned.startswith("```"):
match = re.search(r'```(?:json)?\s*\n?(.*?)\n?```', cleaned, re.DOTALL)
match = re.search(r"```(?:json)?\s*\n?(.*?)\n?```", cleaned, re.DOTALL)
if match:
cleaned = match.group(1).strip()
@@ -598,6 +592,7 @@ IMPORTANT:
# Use Haiku to intelligently extract relevant data
import os
api_key = os.environ.get("ANTHROPIC_API_KEY")
if not api_key:
# Fallback to simple formatting if no API key
@@ -610,24 +605,27 @@ IMPORTANT:
# Build prompt for Haiku to extract clean values
import json
prompt = f"""Extract the following information from the memory context:
Required fields: {', '.join(ctx.node_spec.input_keys)}
Memory context (may contain nested data, JSON strings, or extra information):
{json.dumps(memory_data, indent=2, default=str)[:3000]}
Extract ONLY the clean values for the required fields. Ignore nested structures, JSON wrappers, and irrelevant data.
Output as JSON with the exact field names requested."""
fields = ", ".join(ctx.node_spec.input_keys)
memory_json = json.dumps(memory_data, indent=2, default=str)[:3000]
prompt = (
"Extract the following information from the memory context:\n\n"
f"Required fields: {fields}\n\n"
"Memory context (may contain nested data, JSON strings, "
f"or extra information):\n{memory_json}\n\n"
"Extract ONLY the clean values for the required fields. "
"Ignore nested structures, JSON wrappers, and irrelevant data.\n\n"
"Output as JSON with the exact field names requested."
)
try:
import anthropic
client = anthropic.Anthropic(api_key=api_key)
message = client.messages.create(
model="claude-3-5-haiku-20241022",
max_tokens=1000,
messages=[{"role": "user", "content": prompt}]
messages=[{"role": "user", "content": prompt}],
)
# Parse Haiku's response
@@ -635,7 +633,8 @@ Output as JSON with the exact field names requested."""
# Try to extract JSON
import re
json_match = re.search(r'\{[^{}]*\}', response_text, re.DOTALL)
json_match = re.search(r"\{[^{}]*\}", response_text, re.DOTALL)
if json_match:
extracted = json.loads(json_match.group())
# Format as key: value pairs
@@ -708,11 +707,13 @@ class RouterNode(NodeProtocol):
# Build options from routes
options = []
for condition, target in ctx.node_spec.routes.items():
options.append({
options.append(
{
"id": condition,
"description": f"Route to {target} when condition '{condition}' is met",
"target": target,
})
}
)
# Check if we should use LLM-based routing
if ctx.node_spec.system_prompt and ctx.llm:
@@ -765,10 +766,9 @@ class RouterNode(NodeProtocol):
import json
# Build routing options description
options_desc = "\n".join([
f"- {opt['id']}: {opt['description']} → goes to '{opt['target']}'"
for opt in options
])
options_desc = "\n".join(
[f"- {opt['id']}: {opt['description']} → goes to '{opt['target']}'" for opt in options]
)
# Build context
context_data = {
@@ -795,15 +795,17 @@ Respond with ONLY a JSON object:
logger.info(" 🤔 Router using LLM to choose path...")
try:
default_system = "You are a routing agent. Respond with JSON only."
response = ctx.llm.complete(
messages=[{"role": "user", "content": prompt}],
system=ctx.node_spec.system_prompt or "You are a routing agent. Respond with JSON only.",
system=ctx.node_spec.system_prompt or default_system,
max_tokens=150,
)
# Parse response
import re
json_match = re.search(r'\{[^{}]*\}', response.content, re.DOTALL)
json_match = re.search(r"\{[^{}]*\}", response.content, re.DOTALL)
if json_match:
data = json.loads(json_match.group())
chosen = data.get("chosen", "default")
@@ -813,7 +815,8 @@ Respond with ONLY a JSON object:
logger.info(f" Reason: {reasoning}")
# Find the target for this choice
target = ctx.node_spec.routes.get(chosen, ctx.node_spec.routes.get("default", "end"))
default_target = ctx.node_spec.routes.get("default", "end")
target = ctx.node_spec.routes.get(chosen, default_target)
return (chosen, target)
except Exception as e:
@@ -864,10 +867,12 @@ class FunctionNode(NodeProtocol):
decision_id = ctx.runtime.decide(
intent=f"Execute function {ctx.node_spec.function or 'unknown'}",
options=[{
options=[
{
"id": "execute",
"description": f"Run function with inputs: {list(ctx.input_data.keys())}",
}],
}
],
chosen="execute",
reasoning="Deterministic function execution",
)
+21 -19
View File
@@ -10,15 +10,16 @@ The Plan is the contract between the external planner and the executor:
- If replanning needed, returns feedback to external planner
"""
from typing import Any
from enum import Enum
from datetime import datetime
from enum import Enum
from typing import Any
from pydantic import BaseModel, Field
class ActionType(str, Enum):
"""Types of actions a PlanStep can perform."""
LLM_CALL = "llm_call" # Call LLM for generation
TOOL_USE = "tool_use" # Use a registered tool
SUB_GRAPH = "sub_graph" # Execute a sub-graph
@@ -28,6 +29,7 @@ class ActionType(str, Enum):
class StepStatus(str, Enum):
"""Status of a plan step."""
PENDING = "pending"
AWAITING_APPROVAL = "awaiting_approval" # Waiting for human approval
IN_PROGRESS = "in_progress"
@@ -39,6 +41,7 @@ class StepStatus(str, Enum):
class ApprovalDecision(str, Enum):
"""Human decision on a step requiring approval."""
APPROVE = "approve" # Execute as planned
REJECT = "reject" # Skip this step
MODIFY = "modify" # Execute with modifications
@@ -47,6 +50,7 @@ class ApprovalDecision(str, Enum):
class ApprovalRequest(BaseModel):
"""Request for human approval before executing a step."""
step_id: str
step_description: str
action_type: str
@@ -62,6 +66,7 @@ class ApprovalRequest(BaseModel):
class ApprovalResult(BaseModel):
"""Result of human approval decision."""
decision: ApprovalDecision
reason: str | None = None
modifications: dict[str, Any] = Field(default_factory=dict)
@@ -71,6 +76,7 @@ class ApprovalResult(BaseModel):
class JudgmentAction(str, Enum):
"""Actions the judge can take after evaluating a step."""
ACCEPT = "accept" # Step completed successfully, continue
RETRY = "retry" # Retry the step with feedback
REPLAN = "replan" # Return to external planner for new plan
@@ -83,6 +89,7 @@ class ActionSpec(BaseModel):
This is the "what to do" part of a PlanStep.
"""
action_type: ActionType
# For LLM_CALL
@@ -114,6 +121,7 @@ class PlanStep(BaseModel):
Created by external planner, executed by Worker, evaluated by Judge.
"""
id: str
description: str
action: ActionSpec
@@ -121,28 +129,16 @@ class PlanStep(BaseModel):
# Data flow
inputs: dict[str, Any] = Field(
default_factory=dict,
description="Input data for this step (can reference previous step outputs)"
)
expected_outputs: list[str] = Field(
default_factory=list,
description="Keys this step should produce"
description="Input data for this step (can reference previous step outputs)",
)
expected_outputs: list[str] = Field(default_factory=list, description="Keys this step should produce")
# Dependencies
dependencies: list[str] = Field(
default_factory=list,
description="IDs of steps that must complete before this one"
)
dependencies: list[str] = Field(default_factory=list, description="IDs of steps that must complete before this one")
# Human-in-the-loop (HITL)
requires_approval: bool = Field(
default=False,
description="If True, requires human approval before execution"
)
approval_message: str | None = Field(
default=None,
description="Message to show human when requesting approval"
)
requires_approval: bool = Field(default=False, description="If True, requires human approval before execution")
approval_message: str | None = Field(default=None, description="Message to show human when requesting approval")
# Execution state
status: StepStatus = StepStatus.PENDING
@@ -170,6 +166,7 @@ class Judgment(BaseModel):
The Judge evaluates step results and decides what to do next.
"""
action: JudgmentAction
reasoning: str
feedback: str | None = None # For retry/replan - what went wrong
@@ -193,6 +190,7 @@ class EvaluationRule(BaseModel):
Rules are checked before falling back to LLM evaluation.
"""
id: str
description: str
@@ -216,6 +214,7 @@ class Plan(BaseModel):
Created by external planner (Claude Code, etc).
Executed by FlexibleGraphExecutor.
"""
id: str
goal_id: str
description: str
@@ -361,6 +360,7 @@ class Plan(BaseModel):
class ExecutionStatus(str, Enum):
"""Status of plan execution."""
COMPLETED = "completed"
AWAITING_APPROVAL = "awaiting_approval" # Paused for human approval
NEEDS_REPLAN = "needs_replan"
@@ -376,6 +376,7 @@ class PlanExecutionResult(BaseModel):
Returned to external planner with status and feedback.
"""
status: ExecutionStatus
# Results from completed steps
@@ -421,6 +422,7 @@ def load_export(data: str | dict) -> tuple["Plan", Any]:
result = await executor.execute_plan(plan, goal, context)
"""
import json as json_module
from framework.graph.goal import Goal
if isinstance(data, str):
+15 -10
View File
@@ -10,20 +10,21 @@ appropriate executor based on action type:
- Code execution (sandboxed)
"""
from typing import Any, Callable
from dataclasses import dataclass, field
import time
import json
import re
import time
from collections.abc import Callable
from dataclasses import dataclass, field
from typing import Any
from framework.graph.code_sandbox import CodeSandbox
from framework.graph.plan import (
PlanStep,
ActionSpec,
ActionType,
PlanStep,
)
from framework.graph.code_sandbox import CodeSandbox
from framework.runtime.core import Runtime
from framework.llm.provider import LLMProvider, Tool
from framework.runtime.core import Runtime
def parse_llm_json_response(text: str) -> tuple[Any | None, str]:
@@ -50,7 +51,7 @@ def parse_llm_json_response(text: str) -> tuple[Any | None, str]:
# Try to extract JSON from markdown code blocks
# Pattern: ```json ... ``` or ``` ... ```
code_block_pattern = r'```(?:json)?\s*([\s\S]*?)\s*```'
code_block_pattern = r"```(?:json)?\s*([\s\S]*?)\s*```"
matches = re.findall(code_block_pattern, cleaned)
if matches:
@@ -70,7 +71,7 @@ def parse_llm_json_response(text: str) -> tuple[Any | None, str]:
pass
# Try to find JSON-like content (starts with { or [)
json_start_pattern = r'(\{[\s\S]*\}|\[[\s\S]*\])'
json_start_pattern = r"(\{[\s\S]*\}|\[[\s\S]*\])"
json_matches = re.findall(json_start_pattern, cleaned)
for match in json_matches:
@@ -87,6 +88,7 @@ def parse_llm_json_response(text: str) -> tuple[Any | None, str]:
@dataclass
class StepExecutionResult:
"""Result of executing a plan step."""
success: bool
outputs: dict[str, Any] = field(default_factory=dict)
error: str | None = None
@@ -160,11 +162,13 @@ class WorkerNode:
# Record decision
decision_id = self.runtime.decide(
intent=f"Execute plan step: {step.description}",
options=[{
options=[
{
"id": step.action.action_type.value,
"description": f"Execute {step.action.action_type.value} action",
"action_type": step.action.action_type.value,
}],
}
],
chosen=step.action.action_type.value,
reasoning=f"Step requires {step.action.action_type.value}",
context={"step_id": step.id, "inputs": step.inputs},
@@ -414,6 +418,7 @@ class WorkerNode:
try:
# Execute tool via formal executor
from framework.llm.provider import ToolUse
tool_use = ToolUse(
id=f"step_{tool_name}",
name=tool_name,
+1 -1
View File
@@ -1,7 +1,7 @@
"""LLM provider abstraction."""
from framework.llm.provider import LLMProvider, LLMResponse
from framework.llm.anthropic import AnthropicProvider
from framework.llm.litellm import LiteLLMProvider
from framework.llm.provider import LLMProvider, LLMResponse
__all__ = ["LLMProvider", "LLMResponse", "AnthropicProvider", "LiteLLMProvider"]
+2 -4
View File
@@ -3,8 +3,8 @@
import os
from typing import Any
from framework.llm.provider import LLMProvider, LLMResponse, Tool
from framework.llm.litellm import LiteLLMProvider
from framework.llm.provider import LLMProvider, LLMResponse, Tool
def _get_api_key_from_credential_manager() -> str | None:
@@ -50,9 +50,7 @@ class AnthropicProvider(LLMProvider):
# Delegate to LiteLLMProvider internally.
self.api_key = api_key or _get_api_key_from_credential_manager()
if not self.api_key:
raise ValueError(
"Anthropic API key required. Set ANTHROPIC_API_KEY env var or pass api_key."
)
raise ValueError("Anthropic API key required. Set ANTHROPIC_API_KEY env var or pass api_key.")
self.model = model
+8 -4
View File
@@ -183,7 +183,8 @@ class LiteLLMProvider(LLMProvider):
# Process tool calls.
# Add assistant message with tool calls.
current_messages.append({
current_messages.append(
{
"role": "assistant",
"content": message.content,
"tool_calls": [
@@ -197,7 +198,8 @@ class LiteLLMProvider(LLMProvider):
}
for tc in message.tool_calls
],
})
}
)
# Execute tools and add results.
for tool_call in message.tool_calls:
@@ -216,11 +218,13 @@ class LiteLLMProvider(LLMProvider):
result = tool_executor(tool_use)
# Add tool result message
current_messages.append({
current_messages.append(
{
"role": "tool",
"tool_call_id": result.tool_use_id,
"content": result.content,
})
}
)
# Max iterations reached
return LLMResponse(
+4
View File
@@ -8,6 +8,7 @@ from typing import Any
@dataclass
class LLMResponse:
"""Response from an LLM call."""
content: str
model: str
input_tokens: int = 0
@@ -19,6 +20,7 @@ class LLMResponse:
@dataclass
class Tool:
"""A tool the LLM can use."""
name: str
description: str
parameters: dict[str, Any] = field(default_factory=dict)
@@ -27,6 +29,7 @@ class Tool:
@dataclass
class ToolUse:
"""A tool call requested by the LLM."""
id: str
name: str
input: dict[str, Any]
@@ -35,6 +38,7 @@ class ToolUse:
@dataclass
class ToolResult:
"""Result of executing a tool."""
tool_use_id: str
content: str
is_error: bool = False
File diff suppressed because it is too large Load Diff
+3 -3
View File
@@ -1,15 +1,15 @@
"""Agent Runner - load and run exported agents."""
from framework.runner.runner import AgentRunner, AgentInfo, ValidationResult
from framework.runner.tool_registry import ToolRegistry, tool
from framework.runner.orchestrator import AgentOrchestrator
from framework.runner.protocol import (
AgentMessage,
MessageType,
CapabilityLevel,
CapabilityResponse,
MessageType,
OrchestratorResult,
)
from framework.runner.runner import AgentInfo, AgentRunner, ValidationResult
from framework.runner.tool_registry import ToolRegistry, tool
__all__ = [
# Single agent
+82 -37
View File
@@ -22,12 +22,14 @@ def register_commands(subparsers: argparse._SubParsersAction) -> None:
help="Path to agent folder (containing agent.json)",
)
run_parser.add_argument(
"--input", "-i",
"--input",
"-i",
type=str,
help="Input context as JSON string",
)
run_parser.add_argument(
"--input-file", "-f",
"--input-file",
"-f",
type=str,
help="Input context from JSON file",
)
@@ -37,17 +39,20 @@ def register_commands(subparsers: argparse._SubParsersAction) -> None:
help="Run in mock mode (no real LLM calls)",
)
run_parser.add_argument(
"--output", "-o",
"--output",
"-o",
type=str,
help="Write results to file instead of stdout",
)
run_parser.add_argument(
"--quiet", "-q",
"--quiet",
"-q",
action="store_true",
help="Only output the final result JSON",
)
run_parser.add_argument(
"--verbose", "-v",
"--verbose",
"-v",
action="store_true",
help="Show detailed execution logs (steps, LLM calls, etc.)",
)
@@ -113,7 +118,8 @@ def register_commands(subparsers: argparse._SubParsersAction) -> None:
help="Directory containing agent folders (default: exports)",
)
dispatch_parser.add_argument(
"--input", "-i",
"--input",
"-i",
type=str,
required=True,
help="Input context as JSON string",
@@ -124,13 +130,15 @@ def register_commands(subparsers: argparse._SubParsersAction) -> None:
help="Description of what you want to accomplish",
)
dispatch_parser.add_argument(
"--agents", "-a",
"--agents",
"-a",
type=str,
nargs="+",
help="Specific agent names to use (default: all in directory)",
)
dispatch_parser.add_argument(
"--quiet", "-q",
"--quiet",
"-q",
action="store_true",
help="Only output the final result JSON",
)
@@ -170,15 +178,16 @@ def register_commands(subparsers: argparse._SubParsersAction) -> None:
def cmd_run(args: argparse.Namespace) -> int:
"""Run an exported agent."""
import logging
from framework.runner import AgentRunner
# Set logging level (quiet by default for cleaner output)
if args.quiet:
logging.basicConfig(level=logging.ERROR, format='%(message)s')
elif getattr(args, 'verbose', False):
logging.basicConfig(level=logging.INFO, format='%(message)s')
logging.basicConfig(level=logging.ERROR, format="%(message)s")
elif getattr(args, "verbose", False):
logging.basicConfig(level=logging.INFO, format="%(message)s")
else:
logging.basicConfig(level=logging.WARNING, format='%(message)s')
logging.basicConfig(level=logging.WARNING, format="%(message)s")
# Load input context
context = {}
@@ -211,6 +220,7 @@ def cmd_run(args: argparse.Namespace) -> int:
entry_input_keys = runner.graph.nodes[0].input_keys if runner.graph.nodes else []
if "user_id" in entry_input_keys and context.get("user_id") is None:
import os
context["user_id"] = os.environ.get("USER", "default_user")
if not args.quiet:
@@ -279,7 +289,13 @@ def cmd_run(args: argparse.Namespace) -> int:
# If no meaningful key found, show all non-internal keys
if not shown:
for key, value in result.output.items():
if not key.startswith("_") and key not in ["user_id", "request", "memory_loaded", "user_profile", "recent_context"]:
if not key.startswith("_") and key not in [
"user_id",
"request",
"memory_loaded",
"user_profile",
"recent_context",
]:
if isinstance(value, (dict, list)):
print(f"\n{key}:")
value_str = json.dumps(value, indent=2, default=str)
@@ -311,7 +327,9 @@ def cmd_info(args: argparse.Namespace) -> int:
info = runner.info()
if args.json:
print(json.dumps({
print(
json.dumps(
{
"name": info.name,
"description": info.description,
"goal_name": info.goal_name,
@@ -323,7 +341,10 @@ def cmd_info(args: argparse.Namespace) -> int:
"constraints": info.constraints,
"required_tools": info.required_tools,
"has_tools_module": info.has_tools_module,
}, indent=2))
},
indent=2,
)
)
else:
print(f"Agent: {info.name}")
print(f"Description: {info.description}")
@@ -333,8 +354,8 @@ def cmd_info(args: argparse.Namespace) -> int:
print()
print(f"Nodes ({info.node_count}):")
for node in info.nodes:
inputs = f" [in: {', '.join(node['input_keys'])}]" if node.get('input_keys') else ""
outputs = f" [out: {', '.join(node['output_keys'])}]" if node.get('output_keys') else ""
inputs = f" [in: {', '.join(node['input_keys'])}]" if node.get("input_keys") else ""
outputs = f" [out: {', '.join(node['output_keys'])}]" if node.get("output_keys") else ""
print(f" - {node['id']}: {node['name']}{inputs}{outputs}")
print()
print(f"Success Criteria ({len(info.success_criteria)}):")
@@ -405,19 +426,25 @@ def cmd_list(args: argparse.Namespace) -> int:
try:
runner = AgentRunner.load(path)
info = runner.info()
agents.append({
agents.append(
{
"path": str(path),
"name": info.name,
"description": info.description[:60] + "..." if len(info.description) > 60 else info.description,
"description": info.description[:60] + "..."
if len(info.description) > 60
else info.description,
"nodes": info.node_count,
"tools": len(info.required_tools),
})
}
)
runner.cleanup()
except Exception as e:
agents.append({
agents.append(
{
"path": str(path),
"error": str(e),
})
}
)
if not agents:
print(f"No agents found in {directory}")
@@ -540,7 +567,7 @@ def cmd_dispatch(args: argparse.Namespace) -> int:
def _interactive_approval(request):
"""Interactive approval callback for HITL mode."""
from framework.graph import ApprovalResult, ApprovalDecision
from framework.graph import ApprovalDecision, ApprovalResult
print()
print("=" * 60)
@@ -561,6 +588,7 @@ def _interactive_approval(request):
print(f"\n[{key}]:")
if isinstance(value, (dict, list)):
import json
value_str = json.dumps(value, indent=2, default=str)
# Show more content for approval - up to 2000 chars
if len(value_str) > 2000:
@@ -605,11 +633,14 @@ def _interactive_approval(request):
print("Invalid choice. Please enter a, r, s, or x.")
def _format_natural_language_to_json(user_input: str, input_keys: list[str], agent_description: str, session_context: dict = None) -> dict:
def _format_natural_language_to_json(
user_input: str, input_keys: list[str], agent_description: str, session_context: dict = None
) -> dict:
"""Use Haiku to convert natural language input to JSON based on agent's input schema."""
import anthropic
import os
import anthropic
client = anthropic.Anthropic(api_key=os.environ.get("ANTHROPIC_API_KEY"))
# Build prompt for Haiku
@@ -619,17 +650,26 @@ def _format_natural_language_to_json(user_input: str, input_keys: list[str], age
main_field = input_keys[0] if input_keys else "objective"
existing_value = session_context.get(main_field, "")
session_info = f"\n\nExisting {main_field}: \"{existing_value}\"\n\nThe user is providing ADDITIONAL information. Append this new information to the existing {main_field} to create an enriched, more detailed version."
session_info = (
f'\n\nExisting {main_field}: "{existing_value}"\n\n'
"The user is providing ADDITIONAL information. Append this new information "
f"to the existing {main_field} to create an enriched, more detailed version."
)
prompt = f"""You are formatting user input for an agent that requires specific input fields.
Agent: {agent_description}
Required input fields: {', '.join(input_keys)}{session_info}
Required input fields: {", ".join(input_keys)}{session_info}
User input: {user_input}
{"If this is a follow-up message, APPEND the new information to the existing field value to create a more complete, detailed version. Do not create new fields." if session_context else ""}
{
"If this is a follow-up message, APPEND the new information to the existing field "
"value to create a more complete, detailed version. Do not create new fields."
if session_context
else ""
}
Output ONLY valid JSON, no explanation:"""
@@ -637,7 +677,7 @@ Output ONLY valid JSON, no explanation:"""
message = client.messages.create(
model="claude-3-5-haiku-20241022", # Fast and cheap
max_tokens=500,
messages=[{"role": "user", "content": prompt}]
messages=[{"role": "user", "content": prompt}],
)
json_str = message.content[0].text.strip()
@@ -661,12 +701,13 @@ Output ONLY valid JSON, no explanation:"""
def cmd_shell(args: argparse.Namespace) -> int:
"""Start an interactive agent session."""
import logging
from framework.runner import AgentRunner
# Configure logging to show runtime visibility
logging.basicConfig(
level=logging.INFO,
format='%(message)s', # Simple format for clean output
format="%(message)s", # Simple format for clean output
)
agents_dir = Path(args.agents_dir)
@@ -690,7 +731,7 @@ def cmd_shell(args: argparse.Namespace) -> int:
return 1
# Set up approval callback by default (unless --no-approve is set)
if not getattr(args, 'no_approve', False):
if not getattr(args, "no_approve", False):
runner.set_approval_callback(_interactive_approval)
print("\n🔔 Human-in-the-loop mode enabled")
print(" Steps marked for approval will pause for your review")
@@ -748,8 +789,8 @@ def cmd_shell(args: argparse.Namespace) -> int:
if user_input == "/nodes":
print("\nAgent nodes:")
for node in info.nodes:
inputs = f" [in: {', '.join(node['input_keys'])}]" if node.get('input_keys') else ""
outputs = f" [out: {', '.join(node['output_keys'])}]" if node.get('output_keys') else ""
inputs = f" [in: {', '.join(node['input_keys'])}]" if node.get("input_keys") else ""
outputs = f" [out: {', '.join(node['output_keys'])}]" if node.get("output_keys") else ""
print(f" {node['id']}: {node['name']}{inputs}{outputs}")
print(f" {node['description']}")
print()
@@ -784,7 +825,7 @@ def cmd_shell(args: argparse.Namespace) -> int:
user_input,
entry_input_keys,
info.description,
session_context=session_memory
session_context=session_memory,
)
print(f"✓ Formatted to: {json.dumps(context)}")
except Exception as e:
@@ -807,6 +848,7 @@ def cmd_shell(args: argparse.Namespace) -> int:
# Auto-inject user_id if missing (for personal assistant agents)
if "user_id" in entry_input_keys and run_context.get("user_id") is None:
import os
run_context["user_id"] = os.environ.get("USER", "default_user")
# Add conversation history to context if agent expects it
@@ -872,12 +914,14 @@ def cmd_shell(args: argparse.Namespace) -> int:
session_memory[key] = value
# Track conversation history
conversation_history.append({
conversation_history.append(
{
"input": context,
"output": result.output if result.output else {},
"status": "success" if result.success else "failed",
"paused_at": result.paused_at
})
"paused_at": result.paused_at,
}
)
print()
@@ -904,6 +948,7 @@ def _select_agent(agents_dir: Path) -> str | None:
for i, agent_path in enumerate(agents, 1):
try:
from framework.runner import AgentRunner
runner = AgentRunner.load(agent_path)
info = runner.info()
desc = info.description[:50] + "..." if len(info.description) > 50 else info.description
+14 -8
View File
@@ -145,6 +145,7 @@ class MCPClient:
try:
import threading
from mcp import StdioServerParameters
# Create server parameters
@@ -177,7 +178,10 @@ class MCPClient:
# Create persistent stdio client context
self._stdio_context = stdio_client(server_params)
self._read_stream, self._write_stream = await self._stdio_context.__aenter__()
(
self._read_stream,
self._write_stream,
) = await self._stdio_context.__aenter__()
# Create persistent session
self._session = ClientSession(self._read_stream, self._write_stream)
@@ -212,7 +216,7 @@ class MCPClient:
logger.info(f"Connected to MCP server '{self.config.name}' via STDIO (persistent)")
except Exception as e:
raise RuntimeError(f"Failed to connect to MCP server: {e}")
raise RuntimeError(f"Failed to connect to MCP server: {e}") from e
def _connect_http(self) -> None:
"""Connect to MCP server via HTTP transport."""
@@ -268,11 +272,13 @@ class MCPClient:
# Convert tools to dict format
tools_list = []
for tool in response.tools:
tools_list.append({
tools_list.append(
{
"name": tool.name,
"description": tool.description,
"inputSchema": tool.inputSchema,
})
}
)
return tools_list
@@ -300,7 +306,7 @@ class MCPClient:
return data.get("result", {}).get("tools", [])
except Exception as e:
raise RuntimeError(f"Failed to list tools via HTTP: {e}")
raise RuntimeError(f"Failed to list tools via HTTP: {e}") from e
def list_tools(self) -> list[MCPTool]:
"""
@@ -350,9 +356,9 @@ class MCPClient:
if len(result.content) > 0:
content_item = result.content[0]
# Check if it's a text content item
if hasattr(content_item, 'text'):
if hasattr(content_item, "text"):
return content_item.text
elif hasattr(content_item, 'data'):
elif hasattr(content_item, "data"):
return content_item.data
return result.content
@@ -384,7 +390,7 @@ class MCPClient:
return data.get("result", {}).get("content", [])
except Exception as e:
raise RuntimeError(f"Failed to call tool via HTTP: {e}")
raise RuntimeError(f"Failed to call tool via HTTP: {e}") from e
def disconnect(self) -> None:
"""Disconnect from the MCP server."""
+8 -11
View File
@@ -72,6 +72,7 @@ class AgentOrchestrator:
# Auto-create LLM - LiteLLM auto-detects provider and API key from model name
if self._llm is None:
from framework.llm.litellm import LiteLLMProvider
self._llm = LiteLLMProvider(model=self._model)
def register(
@@ -205,7 +206,7 @@ class AgentOrchestrator:
responses = await asyncio.gather(*tasks, return_exceptions=True)
for agent_name, response in zip(routing.selected_agents, responses):
for agent_name, response in zip(routing.selected_agents, responses, strict=False):
if isinstance(response, Exception):
results[agent_name] = {"error": str(response)}
else:
@@ -326,7 +327,7 @@ class AgentOrchestrator:
results = await asyncio.gather(*tasks, return_exceptions=True)
for name, result in zip(agent_names, results):
for name, result in zip(agent_names, results, strict=False):
if isinstance(result, Exception):
responses[name] = AgentMessage(
type=MessageType.RESPONSE,
@@ -355,7 +356,7 @@ class AgentOrchestrator:
results = await asyncio.gather(*tasks, return_exceptions=True)
capabilities = {}
for name, result in zip(agent_names, results):
for name, result in zip(agent_names, results, strict=False):
if isinstance(result, Exception):
capabilities[name] = CapabilityResponse(
agent_name=name,
@@ -399,11 +400,7 @@ class AgentOrchestrator:
return await self._llm_route(request, intent, capable)
# If no capable agents, check uncertain ones
uncertain = [
(name, cap)
for name, cap in capabilities.items()
if cap.level == CapabilityLevel.UNCERTAIN
]
uncertain = [(name, cap) for name, cap in capabilities.items() if cap.level == CapabilityLevel.UNCERTAIN]
if uncertain:
uncertain.sort(key=lambda x: -x[1].confidence)
return RoutingDecision(
@@ -429,8 +426,7 @@ class AgentOrchestrator:
"""Use LLM to decide routing when multiple agents are capable."""
agents_info = "\n".join(
f"- {name}: {cap.reasoning} (confidence: {cap.confidence:.2f})"
for name, cap in capable
f"- {name}: {cap.reasoning} (confidence: {cap.confidence:.2f})" for name, cap in capable
)
prompt = f"""Multiple agents can handle this request. Decide the best routing.
@@ -463,7 +459,8 @@ Respond with JSON only:
)
import re
json_match = re.search(r'\{[^{}]*\}', response.content, re.DOTALL)
json_match = re.search(r"\{[^{}]*\}", response.content, re.DOTALL)
if json_match:
data = json.loads(json_match.group())
selected = data.get("selected", [])
+1 -1
View File
@@ -1,10 +1,10 @@
"""Message protocol for multi-agent communication."""
import uuid
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Any
import uuid
class MessageType(Enum):
+29 -22
View File
@@ -2,20 +2,21 @@
import json
import os
from collections.abc import Callable
from dataclasses import dataclass, field
from pathlib import Path
from typing import TYPE_CHECKING, Callable
from typing import TYPE_CHECKING
from framework.graph import Goal
from framework.graph.edge import GraphSpec, EdgeSpec, EdgeCondition
from framework.graph.edge import EdgeCondition, EdgeSpec, GraphSpec
from framework.graph.executor import ExecutionResult, GraphExecutor
from framework.graph.node import NodeSpec
from framework.graph.executor import GraphExecutor, ExecutionResult
from framework.llm.provider import LLMProvider, Tool
from framework.runner.tool_registry import ToolRegistry
from framework.runtime.core import Runtime
if TYPE_CHECKING:
from framework.runner.protocol import CapabilityResponse, AgentMessage
from framework.runner.protocol import AgentMessage, CapabilityResponse
@dataclass
@@ -109,27 +110,31 @@ def load_agent_export(data: str | dict) -> tuple[GraphSpec, Goal]:
)
# Build Goal
from framework.graph.goal import SuccessCriterion, Constraint
from framework.graph.goal import Constraint, SuccessCriterion
success_criteria = []
for sc_data in goal_data.get("success_criteria", []):
success_criteria.append(SuccessCriterion(
success_criteria.append(
SuccessCriterion(
id=sc_data["id"],
description=sc_data["description"],
metric=sc_data.get("metric", ""),
target=sc_data.get("target", ""),
weight=sc_data.get("weight", 1.0),
))
)
)
constraints = []
for c_data in goal_data.get("constraints", []):
constraints.append(Constraint(
constraints.append(
Constraint(
id=c_data["id"],
description=c_data["description"],
constraint_type=c_data.get("constraint_type", "hard"),
category=c_data.get("category", "safety"),
check=c_data.get("check", ""),
))
)
)
goal = Goal(
id=goal_data.get("id", ""),
@@ -466,12 +471,16 @@ class AgentRunner:
entry_node=self.graph.entry_node,
terminal_nodes=self.graph.terminal_nodes,
success_criteria=[
{"id": sc.id, "description": sc.description, "metric": sc.metric, "target": sc.target}
{
"id": sc.id,
"description": sc.description,
"metric": sc.metric,
"target": sc.target,
}
for sc in self.goal.success_criteria
],
constraints=[
{"id": c.id, "description": c.description, "type": c.constraint_type}
for c in self.goal.constraints
{"id": c.id, "description": c.description, "type": c.constraint_type} for c in self.goal.constraints
],
required_tools=sorted(required_tools),
has_tools_module=(self.agent_path / "tools.py").exists(),
@@ -514,7 +523,7 @@ class AgentRunner:
# Check tool credentials (Tier 2)
missing_creds = cred_manager.get_missing_for_tools(info.required_tools)
for cred_name, spec in missing_creds:
for _cred_name, spec in missing_creds:
missing_credentials.append(spec.env_var)
affected_tools = [t for t in info.required_tools if t in spec.tools]
tools_str = ", ".join(affected_tools)
@@ -524,9 +533,9 @@ class AgentRunner:
warnings.append(warning_msg)
# Check node type credentials (e.g., ANTHROPIC_API_KEY for LLM nodes)
node_types = list(set(node.node_type for node in self.graph.nodes))
node_types = list({node.node_type for node in self.graph.nodes})
missing_node_creds = cred_manager.get_missing_for_node_types(node_types)
for cred_name, spec in missing_node_creds:
for _cred_name, spec in missing_node_creds:
if spec.env_var not in missing_credentials: # Avoid duplicates
missing_credentials.append(spec.env_var)
affected_types = [t for t in node_types if t in spec.node_types]
@@ -537,10 +546,7 @@ class AgentRunner:
warnings.append(warning_msg)
except ImportError:
# aden_tools not installed - fall back to direct check
has_llm_nodes = any(
node.node_type in ("llm_generate", "llm_tool_use")
for node in self.graph.nodes
)
has_llm_nodes = any(node.node_type in ("llm_generate", "llm_tool_use") for node in self.graph.nodes)
if has_llm_nodes and not os.environ.get("ANTHROPIC_API_KEY"):
warnings.append("Agent has LLM nodes but ANTHROPIC_API_KEY not set")
@@ -565,7 +571,7 @@ class AgentRunner:
Returns:
CapabilityResponse with level, confidence, and reasoning
"""
from framework.runner.protocol import CapabilityResponse, CapabilityLevel
from framework.runner.protocol import CapabilityLevel, CapabilityResponse
# Use provided LLM or set up our own
eval_llm = llm
@@ -622,7 +628,8 @@ Respond with JSON only:
# Parse response
import re
json_match = re.search(r'\{[^{}]*\}', response.content, re.DOTALL)
json_match = re.search(r"\{[^{}]*\}", response.content, re.DOTALL)
if json_match:
data = json.loads(json_match.group())
level_map = {
@@ -646,7 +653,7 @@ Respond with JSON only:
def _keyword_capability_check(self, request: dict) -> "CapabilityResponse":
"""Simple keyword-based capability check (fallback when no LLM)."""
from framework.runner.protocol import CapabilityResponse, CapabilityLevel
from framework.runner.protocol import CapabilityLevel, CapabilityResponse
info = self.info()
request_str = json.dumps(request).lower()
+4 -3
View File
@@ -4,11 +4,12 @@ import importlib.util
import inspect
import json
import logging
from collections.abc import Callable
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Callable
from typing import Any
from framework.llm.provider import Tool, ToolUse, ToolResult
from framework.llm.provider import Tool, ToolResult, ToolUse
logger = logging.getLogger(__name__)
@@ -142,7 +143,7 @@ class ToolRegistry:
# Check for TOOLS dict
if hasattr(module, "TOOLS"):
tools_dict = getattr(module, "TOOLS")
tools_dict = module.TOOLS
executor_func = getattr(module, "tool_executor", None)
for name, tool in tools_dict.items():
+12 -8
View File
@@ -6,13 +6,13 @@ that Builder can analyze. The agent calls simple methods, and the runtime
handles all the structured logging.
"""
from datetime import datetime
from typing import Any
from pathlib import Path
import logging
import uuid
from datetime import datetime
from pathlib import Path
from typing import Any
from framework.schemas.decision import Decision, Option, Outcome, DecisionType
from framework.schemas.decision import Decision, DecisionType, Option, Outcome
from framework.schemas.run import Run, RunStatus
from framework.storage.backend import FileStorage
@@ -174,7 +174,8 @@ class Runtime:
# Build Option objects
option_objects = []
for opt in options:
option_objects.append(Option(
option_objects.append(
Option(
id=opt["id"],
description=opt.get("description", ""),
action_type=opt.get("action_type", "unknown"),
@@ -182,7 +183,8 @@ class Runtime:
pros=opt.get("pros", []),
cons=opt.get("cons", []),
confidence=opt.get("confidence", 0.5),
))
)
)
# Create decision
decision_id = f"dec_{len(self._current_run.decisions)}"
@@ -370,11 +372,13 @@ class Runtime:
"""
return self.decide(
intent=intent,
options=[{
options=[
{
"id": "action",
"description": action,
"action_type": "execute",
}],
}
],
chosen="action",
reasoning=reasoning,
node_id=node_id,
+2 -2
View File
@@ -1,7 +1,7 @@
"""Schema definitions for runtime data."""
from framework.schemas.decision import Decision, Option, Outcome, DecisionEvaluation
from framework.schemas.run import Run, RunSummary, Problem
from framework.schemas.decision import Decision, DecisionEvaluation, Option, Outcome
from framework.schemas.run import Problem, Run, RunSummary
__all__ = [
"Decision",
+6 -1
View File
@@ -10,14 +10,15 @@ This is MORE important than actions because:
"""
from datetime import datetime
from typing import Any
from enum import Enum
from typing import Any
from pydantic import BaseModel, Field, computed_field
class DecisionType(str, Enum):
"""Types of decisions an agent can make."""
TOOL_SELECTION = "tool_selection" # Which tool to use
PARAMETER_CHOICE = "parameter_choice" # What parameters to pass
PATH_CHOICE = "path_choice" # Which branch to take
@@ -35,6 +36,7 @@ class Option(BaseModel):
Capturing options is crucial - it shows what the agent considered
and enables us to evaluate whether the right choice was made.
"""
id: str
description: str # Human-readable: "Call search API"
action_type: str # "tool_call", "generate", "delegate"
@@ -57,6 +59,7 @@ class Outcome(BaseModel):
This is filled in AFTER the action completes, allowing us to
correlate decisions with their results.
"""
success: bool
result: Any = None # The actual output
error: str | None = None # Error message if failed
@@ -81,6 +84,7 @@ class DecisionEvaluation(BaseModel):
This is computed AFTER the run completes, allowing us to
judge decisions in light of their eventual outcomes.
"""
# Did it move toward the goal?
goal_aligned: bool = True
alignment_score: float = Field(default=1.0, ge=0.0, le=1.0)
@@ -109,6 +113,7 @@ class Decision(BaseModel):
Every significant choice the agent makes is captured here.
This is the core data structure for understanding and improving agents.
"""
id: str
timestamp: datetime = Field(default_factory=datetime.now)
node_id: str
+6 -1
View File
@@ -6,8 +6,8 @@ summaries and metrics that Builder needs to understand what happened.
"""
from datetime import datetime
from typing import Any
from enum import Enum
from typing import Any
from pydantic import BaseModel, Field, computed_field
@@ -16,6 +16,7 @@ from framework.schemas.decision import Decision, Outcome
class RunStatus(str, Enum):
"""Status of a run."""
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
@@ -29,6 +30,7 @@ class Problem(BaseModel):
Problems are surfaced explicitly so Builder can focus on what needs fixing.
"""
id: str
severity: str = Field(description="critical, warning, or minor")
description: str
@@ -42,6 +44,7 @@ class Problem(BaseModel):
class RunMetrics(BaseModel):
"""Quantitative metrics about a run."""
total_decisions: int = 0
successful_decisions: int = 0
failed_decisions: int = 0
@@ -68,6 +71,7 @@ class Run(BaseModel):
Contains all decisions, problems, and metrics from a single run.
"""
id: str
goal_id: str
started_at: datetime = Field(default_factory=datetime.now)
@@ -191,6 +195,7 @@ class RunSummary(BaseModel):
This is what I (Builder) want to see first when analyzing runs.
"""
run_id: str
goal_id: str
status: RunStatus
+1 -1
View File
@@ -8,7 +8,7 @@ Uses Pydantic's built-in serialization.
import json
from pathlib import Path
from framework.schemas.run import Run, RunSummary, RunStatus
from framework.schemas.run import Run, RunStatus, RunSummary
class FileStorage:
+28 -28
View File
@@ -63,27 +63,7 @@ python -m framework test-debug <goal_id> <test_id>
"""
# Schemas
from framework.testing.test_case import (
ApprovalStatus,
TestType,
Test,
)
from framework.testing.test_result import (
ErrorCategory,
TestResult,
TestSuiteResult,
)
# Storage
from framework.testing.test_storage import TestStorage
# Generation
from framework.testing.constraint_gen import ConstraintTestGenerator
from framework.testing.success_gen import SuccessCriteriaTestGenerator
from framework.testing.prompts import (
CONSTRAINT_TEST_PROMPT,
SUCCESS_CRITERIA_TEST_PROMPT,
)
from framework.testing.approval_cli import batch_approval, interactive_approval
# Approval
from framework.testing.approval_types import (
@@ -93,20 +73,40 @@ from framework.testing.approval_types import (
BatchApprovalRequest,
BatchApprovalResult,
)
from framework.testing.approval_cli import interactive_approval, batch_approval
# Error categorization
from framework.testing.categorizer import ErrorCategorizer
# LLM Judge for semantic evaluation
from framework.testing.llm_judge import LLMJudge
# Debug
from framework.testing.debug_tool import DebugTool, DebugInfo
# CLI
from framework.testing.cli import register_testing_commands
# Generation
from framework.testing.constraint_gen import ConstraintTestGenerator
# Debug
from framework.testing.debug_tool import DebugInfo, DebugTool
# LLM Judge for semantic evaluation
from framework.testing.llm_judge import LLMJudge
from framework.testing.prompts import (
CONSTRAINT_TEST_PROMPT,
SUCCESS_CRITERIA_TEST_PROMPT,
)
from framework.testing.success_gen import SuccessCriteriaTestGenerator
from framework.testing.test_case import (
ApprovalStatus,
Test,
TestType,
)
from framework.testing.test_result import (
ErrorCategory,
TestResult,
TestSuiteResult,
)
# Storage
from framework.testing.test_storage import TestStorage
__all__ = [
# Schemas
"ApprovalStatus",
+13 -22
View File
@@ -6,19 +6,19 @@ This CLI provides the interactive approval workflow.
"""
import json
import tempfile
import subprocess
import os
from typing import Callable
import subprocess
import tempfile
from collections.abc import Callable
from framework.testing.test_case import Test
from framework.testing.test_storage import TestStorage
from framework.testing.approval_types import (
ApprovalAction,
ApprovalRequest,
ApprovalResult,
BatchApprovalResult,
)
from framework.testing.test_case import Test
from framework.testing.test_storage import TestStorage
def interactive_approval(
@@ -96,18 +96,14 @@ def batch_approval(
# Validate request
valid, error = req.validate_action()
if not valid:
results.append(ApprovalResult.error_result(
req.test_id, req.action, error or "Invalid request"
))
results.append(ApprovalResult.error_result(req.test_id, req.action, error or "Invalid request"))
counts["errors"] += 1
continue
# Load test
test = storage.load_test(goal_id, req.test_id)
if not test:
results.append(ApprovalResult.error_result(
req.test_id, req.action, f"Test {req.test_id} not found"
))
results.append(ApprovalResult.error_result(req.test_id, req.action, f"Test {req.test_id} not found"))
counts["errors"] += 1
continue
@@ -129,14 +125,12 @@ def batch_approval(
if req.action != ApprovalAction.SKIP:
storage.update_test(test)
results.append(ApprovalResult.success_result(
req.test_id, req.action, f"Test {req.action.value}d successfully"
))
results.append(
ApprovalResult.success_result(req.test_id, req.action, f"Test {req.action.value}d successfully")
)
except Exception as e:
results.append(ApprovalResult.error_result(
req.test_id, req.action, str(e)
))
results.append(ApprovalResult.error_result(req.test_id, req.action, str(e)))
counts["errors"] += 1
return BatchApprovalResult(
@@ -260,11 +254,7 @@ def _edit_test_code(code: str) -> str:
break
# Create temp file with code
with tempfile.NamedTemporaryFile(
mode="w",
suffix=".py",
delete=False
) as f:
with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as f:
f.write(code)
temp_path = f.name
@@ -292,4 +282,5 @@ def _edit_test_code(code: str) -> str:
def _command_exists(cmd: str) -> bool:
"""Check if a command exists in PATH."""
from shutil import which
return which(cmd) is not None
+10 -15
View File
@@ -5,8 +5,8 @@ These types are used for both interactive CLI approval and
programmatic/MCP-based approval.
"""
from enum import Enum
from datetime import datetime
from enum import Enum
from typing import Any
from pydantic import BaseModel, Field
@@ -14,6 +14,7 @@ from pydantic import BaseModel, Field
class ApprovalAction(str, Enum):
"""Actions a user can take on a generated test."""
APPROVE = "approve" # Accept as-is
MODIFY = "modify" # Accept with modifications
REJECT = "reject" # Decline
@@ -26,16 +27,11 @@ class ApprovalRequest(BaseModel):
Used by both CLI and MCP interfaces.
"""
test_id: str
action: ApprovalAction
modified_code: str | None = Field(
default=None,
description="New code if action is MODIFY"
)
reason: str | None = Field(
default=None,
description="Rejection reason if action is REJECT"
)
modified_code: str | None = Field(default=None, description="New code if action is MODIFY")
reason: str | None = Field(default=None, description="Rejection reason if action is REJECT")
approved_by: str = "user"
def validate_action(self) -> tuple[bool, str | None]:
@@ -56,6 +52,7 @@ class ApprovalResult(BaseModel):
"""
Result of processing an approval request.
"""
test_id: str
action: ApprovalAction
success: bool
@@ -64,9 +61,7 @@ class ApprovalResult(BaseModel):
timestamp: datetime = Field(default_factory=datetime.now)
@classmethod
def success_result(
cls, test_id: str, action: ApprovalAction, message: str | None = None
) -> "ApprovalResult":
def success_result(cls, test_id: str, action: ApprovalAction, message: str | None = None) -> "ApprovalResult":
"""Create a successful result."""
return cls(
test_id=test_id,
@@ -76,9 +71,7 @@ class ApprovalResult(BaseModel):
)
@classmethod
def error_result(
cls, test_id: str, action: ApprovalAction, error: str
) -> "ApprovalResult":
def error_result(cls, test_id: str, action: ApprovalAction, error: str) -> "ApprovalResult":
"""Create an error result."""
return cls(
test_id=test_id,
@@ -94,6 +87,7 @@ class BatchApprovalRequest(BaseModel):
Useful for MCP interface where user reviews all tests and submits decisions.
"""
goal_id: str
approvals: list[ApprovalRequest]
@@ -109,6 +103,7 @@ class BatchApprovalResult(BaseModel):
"""
Result of processing a batch approval request.
"""
goal_id: str
total: int
approved: int
+15 -31
View File
@@ -80,15 +80,9 @@ class ErrorCategorizer:
def __init__(self):
"""Initialize categorizer with compiled patterns."""
self._logic_patterns = [
re.compile(p, re.IGNORECASE) for p in self.LOGIC_ERROR_PATTERNS
]
self._impl_patterns = [
re.compile(p, re.IGNORECASE) for p in self.IMPLEMENTATION_ERROR_PATTERNS
]
self._edge_patterns = [
re.compile(p, re.IGNORECASE) for p in self.EDGE_CASE_PATTERNS
]
self._logic_patterns = [re.compile(p, re.IGNORECASE) for p in self.LOGIC_ERROR_PATTERNS]
self._impl_patterns = [re.compile(p, re.IGNORECASE) for p in self.IMPLEMENTATION_ERROR_PATTERNS]
self._edge_patterns = [re.compile(p, re.IGNORECASE) for p in self.EDGE_CASE_PATTERNS]
def categorize(self, result: TestResult) -> ErrorCategory | None:
"""
@@ -125,9 +119,7 @@ class ErrorCategorizer:
# Default to implementation error (most common)
return ErrorCategory.IMPLEMENTATION_ERROR
def categorize_with_confidence(
self, result: TestResult
) -> tuple[ErrorCategory | None, float]:
def categorize_with_confidence(self, result: TestResult) -> tuple[ErrorCategory | None, float]:
"""
Categorize with a confidence score.
@@ -143,15 +135,9 @@ class ErrorCategorizer:
error_text = self._get_error_text(result)
# Count pattern matches for each category
logic_matches = sum(
1 for p in self._logic_patterns if p.search(error_text)
)
impl_matches = sum(
1 for p in self._impl_patterns if p.search(error_text)
)
edge_matches = sum(
1 for p in self._edge_patterns if p.search(error_text)
)
logic_matches = sum(1 for p in self._logic_patterns if p.search(error_text))
impl_matches = sum(1 for p in self._impl_patterns if p.search(error_text))
edge_matches = sum(1 for p in self._edge_patterns if p.search(error_text))
total_matches = logic_matches + impl_matches + edge_matches
@@ -204,8 +190,7 @@ class ErrorCategorizer:
"The goal specification may not accurately describe the desired behavior."
),
ErrorCategory.IMPLEMENTATION_ERROR: (
"Fix the code in agent nodes/edges. "
"There's a bug in the implementation that needs to be corrected."
"Fix the code in agent nodes/edges. There's a bug in the implementation that needs to be corrected."
),
ErrorCategory.EDGE_CASE: (
"Add a new test for this edge case scenario. "
@@ -238,23 +223,22 @@ class ErrorCategorizer:
"action": "Fix nodes/edges implementation",
"restart_required": False,
"description": (
"There's a code bug. Fix the agent implementation, "
"then re-run Eval (skip Goal stage)."
"There's a code bug. Fix the agent implementation, then re-run Eval (skip Goal stage)."
),
},
ErrorCategory.EDGE_CASE: {
"stage": "Eval",
"action": "Add new test only",
"restart_required": False,
"description": (
"This is a new scenario. Add a test for it and continue "
"in the Eval stage."
),
"description": ("This is a new scenario. Add a test for it and continue in the Eval stage."),
},
}
return guidance.get(category, {
return guidance.get(
category,
{
"stage": "Unknown",
"action": "Review manually",
"restart_required": False,
"description": "Unable to determine category. Manual review required.",
})
},
)
+4 -3
View File
@@ -14,11 +14,10 @@ import subprocess
from pathlib import Path
from framework.graph.goal import Goal
from framework.testing.test_storage import TestStorage
from framework.testing.approval_cli import interactive_approval
from framework.testing.constraint_gen import ConstraintTestGenerator
from framework.testing.success_gen import SuccessCriteriaTestGenerator
from framework.testing.approval_cli import interactive_approval
from framework.testing.test_storage import TestStorage
DEFAULT_STORAGE_PATH = Path("exports")
@@ -173,6 +172,7 @@ def cmd_test_generate(args: argparse.Namespace) -> int:
# Get LLM provider
try:
from framework.llm import AnthropicProvider
llm = AnthropicProvider()
except Exception as e:
print(f"Error: Failed to initialize LLM provider: {e}")
@@ -376,6 +376,7 @@ def cmd_test_list(args: argparse.Namespace) -> int:
# Filter by status
if args.status != "all":
from framework.testing.test_case import ApprovalStatus
try:
filter_status = ApprovalStatus(args.status)
tests = [t for t in tests if t.approval_status == filter_status]
+13 -22
View File
@@ -8,10 +8,10 @@ Tests are returned with PENDING approval status.
import uuid
from typing import TYPE_CHECKING
from framework.graph.goal import Goal, Constraint
from framework.testing.test_case import Test, TestType, ApprovalStatus
from framework.graph.goal import Constraint, Goal
from framework.llm.provider import Tool, ToolResult, ToolUse
from framework.testing.prompts import CONSTRAINT_TEST_PROMPT
from framework.llm.provider import Tool, ToolUse, ToolResult
from framework.testing.test_case import ApprovalStatus, Test, TestType
if TYPE_CHECKING:
from framework.llm.provider import LLMProvider
@@ -103,16 +103,15 @@ class ConstraintTestGenerator:
def tool_executor(tool_use: ToolUse) -> ToolResult:
if tool_use.name == "submit_test":
collected_tests.append(tool_use.input)
return ToolResult(
tool_use_id=tool_use.id, content="Test recorded successfully"
)
return ToolResult(
tool_use_id=tool_use.id, content="Unknown tool", is_error=True
)
return ToolResult(tool_use_id=tool_use.id, content="Test recorded successfully")
return ToolResult(tool_use_id=tool_use.id, content="Unknown tool", is_error=True)
self.llm.complete_with_tools(
messages=[{"role": "user", "content": prompt}],
system="You are a test generation expert. For each constraint, call the submit_test tool with the test details.",
system=(
"You are a test generation expert. "
"For each constraint, call the submit_test tool with the test details."
),
tools=[SUBMIT_TEST_TOOL],
tool_executor=tool_executor,
max_iterations=5,
@@ -124,9 +123,7 @@ class ConstraintTestGenerator:
# Enforce max 5 tests total
return tests[:5]
def generate_for_constraint(
self, goal: Goal, constraint: Constraint, agent_module: str = "my_agent"
) -> list[Test]:
def generate_for_constraint(self, goal: Goal, constraint: Constraint, agent_module: str = "my_agent") -> list[Test]:
"""
Generate tests for a single constraint.
@@ -152,12 +149,8 @@ class ConstraintTestGenerator:
def tool_executor(tool_use: ToolUse) -> ToolResult:
if tool_use.name == "submit_test":
collected_tests.append(tool_use.input)
return ToolResult(
tool_use_id=tool_use.id, content="Test recorded successfully"
)
return ToolResult(
tool_use_id=tool_use.id, content="Unknown tool", is_error=True
)
return ToolResult(tool_use_id=tool_use.id, content="Test recorded successfully")
return ToolResult(tool_use_id=tool_use.id, content="Unknown tool", is_error=True)
self.llm.complete_with_tools(
messages=[{"role": "user", "content": prompt}],
@@ -186,9 +179,7 @@ class ConstraintTestGenerator:
- Description: {constraint.description}
- Check: {constraint.check}"""
def _create_tests_from_collected(
self, collected: list[dict], goal_id: str
) -> list[Test]:
def _create_tests_from_collected(self, collected: list[dict], goal_id: str) -> list[Test]:
"""Create Test objects from tool call data."""
tests = []
for td in collected:
+7 -9
View File
@@ -13,16 +13,17 @@ from typing import Any
from pydantic import BaseModel, Field
from framework.testing.test_case import Test
from framework.testing.test_result import TestResult, ErrorCategory
from framework.testing.test_storage import TestStorage
from framework.testing.categorizer import ErrorCategorizer
from framework.testing.test_case import Test
from framework.testing.test_result import ErrorCategory, TestResult
from framework.testing.test_storage import TestStorage
class DebugInfo(BaseModel):
"""
Comprehensive debug information for a failed test.
"""
test_id: str
test_name: str
@@ -239,12 +240,10 @@ class DebugTool:
return {
"execution_path": run.metrics.nodes_executed if hasattr(run, "metrics") else [],
"decisions": [
d.model_dump() if hasattr(d, "model_dump") else str(d)
for d in getattr(run, "decisions", [])
d.model_dump() if hasattr(d, "model_dump") else str(d) for d in getattr(run, "decisions", [])
],
"problems": [
p.model_dump() if hasattr(p, "model_dump") else str(p)
for p in getattr(run, "problems", [])
p.model_dump() if hasattr(p, "model_dump") else str(p) for p in getattr(run, "problems", [])
],
"status": run.status.value if hasattr(run, "status") else "unknown",
}
@@ -279,8 +278,7 @@ class DebugTool:
if failures_by_category["uncategorized"]:
suggestions.append(
f"Found {len(failures_by_category['uncategorized'])} uncategorized failures. "
"Manual review required."
f"Found {len(failures_by_category['uncategorized'])} uncategorized failures. Manual review required."
)
return suggestions
+2 -2
View File
@@ -41,8 +41,8 @@ class LLMJudge:
import anthropic
self._client = anthropic.Anthropic()
except ImportError:
raise RuntimeError("anthropic package required for LLM judge")
except ImportError as e:
raise RuntimeError("anthropic package required for LLM judge") from e
return self._client
def evaluate(
+11 -18
View File
@@ -9,9 +9,9 @@ import uuid
from typing import TYPE_CHECKING
from framework.graph.goal import Goal, SuccessCriterion
from framework.testing.test_case import Test, TestType, ApprovalStatus
from framework.llm.provider import Tool, ToolResult, ToolUse
from framework.testing.prompts import SUCCESS_CRITERIA_TEST_PROMPT
from framework.llm.provider import Tool, ToolUse, ToolResult
from framework.testing.test_case import ApprovalStatus, Test, TestType
if TYPE_CHECKING:
from framework.llm.provider import LLMProvider
@@ -115,16 +115,15 @@ class SuccessCriteriaTestGenerator:
def tool_executor(tool_use: ToolUse) -> ToolResult:
if tool_use.name == "submit_test":
collected_tests.append(tool_use.input)
return ToolResult(
tool_use_id=tool_use.id, content="Test recorded successfully"
)
return ToolResult(
tool_use_id=tool_use.id, content="Unknown tool", is_error=True
)
return ToolResult(tool_use_id=tool_use.id, content="Test recorded successfully")
return ToolResult(tool_use_id=tool_use.id, content="Unknown tool", is_error=True)
self.llm.complete_with_tools(
messages=[{"role": "user", "content": prompt}],
system="You are a test generation expert. For each success criterion, call the submit_test tool with the test details.",
system=(
"You are a test generation expert. "
"For each success criterion, call the submit_test tool with the test details."
),
tools=[SUBMIT_TEST_TOOL],
tool_executor=tool_executor,
max_iterations=12,
@@ -172,12 +171,8 @@ class SuccessCriteriaTestGenerator:
def tool_executor(tool_use: ToolUse) -> ToolResult:
if tool_use.name == "submit_test":
collected_tests.append(tool_use.input)
return ToolResult(
tool_use_id=tool_use.id, content="Test recorded successfully"
)
return ToolResult(
tool_use_id=tool_use.id, content="Unknown tool", is_error=True
)
return ToolResult(tool_use_id=tool_use.id, content="Test recorded successfully")
return ToolResult(tool_use_id=tool_use.id, content="Unknown tool", is_error=True)
self.llm.complete_with_tools(
messages=[{"role": "user", "content": prompt}],
@@ -206,9 +201,7 @@ class SuccessCriteriaTestGenerator:
- Weight: {criterion.weight}
- Currently met: {criterion.met}"""
def _create_tests_from_collected(
self, collected: list[dict], goal_id: str
) -> list[Test]:
def _create_tests_from_collected(self, collected: list[dict], goal_id: str) -> list[Test]:
"""Create Test objects from tool call data."""
tests = []
for td in collected:
+14 -42
View File
@@ -14,6 +14,7 @@ from pydantic import BaseModel, Field
class ApprovalStatus(str, Enum):
"""Status of user approval for a generated test."""
PENDING = "pending" # Awaiting user review
APPROVED = "approved" # User accepted as-is
MODIFIED = "modified" # User edited before accepting
@@ -22,6 +23,7 @@ class ApprovalStatus(str, Enum):
class TestType(str, Enum):
"""Type of test based on what it validates."""
__test__ = False # Not a pytest test class
CONSTRAINT = "constraint" # Validates constraint boundaries
SUCCESS_CRITERIA = "outcome" # Validates success criteria achievement
@@ -38,64 +40,34 @@ class Test(BaseModel):
All tests require approval before being added to the test suite.
"""
__test__ = False # Not a pytest test class
id: str
goal_id: str
parent_criteria_id: str = Field(
description="Links to success_criteria.id or constraint.id"
)
parent_criteria_id: str = Field(description="Links to success_criteria.id or constraint.id")
test_type: TestType
# Test definition
test_name: str = Field(
description="Descriptive function name, e.g., test_constraint_api_limits_respected"
)
test_code: str = Field(
description="Python test function code (pytest compatible)"
)
description: str = Field(
description="Human-readable description of what the test validates"
)
input: dict[str, Any] = Field(
default_factory=dict,
description="Test input data"
)
expected_output: dict[str, Any] = Field(
default_factory=dict,
description="Expected output or assertions"
)
test_name: str = Field(description="Descriptive function name, e.g., test_constraint_api_limits_respected")
test_code: str = Field(description="Python test function code (pytest compatible)")
description: str = Field(description="Human-readable description of what the test validates")
input: dict[str, Any] = Field(default_factory=dict, description="Test input data")
expected_output: dict[str, Any] = Field(default_factory=dict, description="Expected output or assertions")
# LLM generation metadata
generated_by: str = Field(
default="llm",
description="Who created the test: 'llm' or 'human'"
)
llm_confidence: float = Field(
default=0.0,
ge=0.0,
le=1.0,
description="LLM's confidence in the test quality (0-1)"
)
generated_by: str = Field(default="llm", description="Who created the test: 'llm' or 'human'")
llm_confidence: float = Field(default=0.0, ge=0.0, le=1.0, description="LLM's confidence in the test quality (0-1)")
# Approval tracking (CRITICAL - tests are never used without approval)
approval_status: ApprovalStatus = ApprovalStatus.PENDING
approved_by: str | None = None
approved_at: datetime | None = None
rejection_reason: str | None = Field(
default=None,
description="Reason for rejection if status is REJECTED"
)
original_code: str | None = Field(
default=None,
description="Original LLM-generated code if user modified it"
)
rejection_reason: str | None = Field(default=None, description="Reason for rejection if status is REJECTED")
original_code: str | None = Field(default=None, description="Original LLM-generated code if user modified it")
# Execution tracking
last_run: datetime | None = None
last_result: str | None = Field(
default=None,
description="Result of last run: 'passed', 'failed', 'error'"
)
last_result: str | None = Field(default=None, description="Result of last run: 'passed', 'failed', 'error'")
run_count: int = 0
pass_count: int = 0
fail_count: int = 0
+11 -31
View File
@@ -21,6 +21,7 @@ class ErrorCategory(str, Enum):
- IMPLEMENTATION_ERROR: Code bug fix nodes/edges in Agent stage
- EDGE_CASE: New scenario discovered add new test only
"""
LOGIC_ERROR = "logic_error"
IMPLEMENTATION_ERROR = "implementation_error"
EDGE_CASE = "edge_case"
@@ -36,13 +37,11 @@ class TestResult(BaseModel):
- Error details for debugging
- Runtime logs and execution path
"""
__test__ = False # Not a pytest test class
test_id: str
passed: bool
duration_ms: int = Field(
ge=0,
description="Test execution time in milliseconds"
)
duration_ms: int = Field(ge=0, description="Test execution time in milliseconds")
# Output comparison
actual_output: Any = None
@@ -54,24 +53,12 @@ class TestResult(BaseModel):
stack_trace: str | None = None
# Runtime data for debugging
runtime_logs: list[dict[str, Any]] = Field(
default_factory=list,
description="Log entries from test execution"
)
node_outputs: dict[str, Any] = Field(
default_factory=dict,
description="Output from each node executed during test"
)
execution_path: list[str] = Field(
default_factory=list,
description="Sequence of nodes executed"
)
runtime_logs: list[dict[str, Any]] = Field(default_factory=list, description="Log entries from test execution")
node_outputs: dict[str, Any] = Field(default_factory=dict, description="Output from each node executed during test")
execution_path: list[str] = Field(default_factory=list, description="Sequence of nodes executed")
# Associated run ID (links to Runtime data)
run_id: str | None = Field(
default=None,
description="Runtime run ID for detailed analysis"
)
run_id: str | None = Field(default=None, description="Runtime run ID for detailed analysis")
timestamp: datetime = Field(default_factory=datetime.now)
@@ -94,6 +81,7 @@ class TestSuiteResult(BaseModel):
Provides summary statistics and individual results.
"""
__test__ = False # Not a pytest test class
goal_id: str
total: int
@@ -104,10 +92,7 @@ class TestSuiteResult(BaseModel):
results: list[TestResult] = Field(default_factory=list)
duration_ms: int = Field(
default=0,
description="Total execution time in milliseconds"
)
duration_ms: int = Field(default=0, description="Total execution time in milliseconds")
timestamp: datetime = Field(default_factory=datetime.now)
@@ -145,11 +130,6 @@ class TestSuiteResult(BaseModel):
"""Get all failed test results for debugging."""
return [r for r in self.results if not r.passed]
def get_results_by_category(
self, category: ErrorCategory
) -> list[TestResult]:
def get_results_by_category(self, category: ErrorCategory) -> list[TestResult]:
"""Get failed results by error category."""
return [
r for r in self.results
if not r.passed and r.error_category == category
]
return [r for r in self.results if not r.passed and r.error_category == category]
+4 -6
View File
@@ -6,10 +6,10 @@ storing tests as JSON files with indexes for efficient querying.
"""
import json
from pathlib import Path
from datetime import datetime
from pathlib import Path
from framework.testing.test_case import Test, ApprovalStatus, TestType
from framework.testing.test_case import ApprovalStatus, Test, TestType
from framework.testing.test_result import TestResult
@@ -34,6 +34,7 @@ class TestStorage:
suites/
{goal_id}_suite.json # Test suite metadata
"""
__test__ = False # Not a pytest test class
def __init__(self, base_path: str | Path):
@@ -197,10 +198,7 @@ class TestStorage:
return []
# Get all result files except latest.json
result_files = sorted(
[f for f in results_dir.glob("*.json") if f.name != "latest.json"],
reverse=True
)[:limit]
result_files = sorted([f for f in results_dir.glob("*.json") if f.name != "latest.json"], reverse=True)[:limit]
results = []
for f in result_files:
+1 -1
View File
@@ -32,7 +32,7 @@ packages = ["framework"]
[tool.ruff]
target-version = "py311"
line-length = 100
line-length = 120
lint.select = [
"B", # bugbear errors
+12 -14
View File
@@ -14,11 +14,12 @@ from pathlib import Path
class Colors:
"""ANSI color codes for terminal output."""
GREEN = '\033[0;32m'
YELLOW = '\033[1;33m'
RED = '\033[0;31m'
BLUE = '\033[0;34m'
NC = '\033[0m' # No Color
GREEN = "\033[0;32m"
YELLOW = "\033[1;33m"
RED = "\033[0;31m"
BLUE = "\033[0;34m"
NC = "\033[0m" # No Color
def print_step(message: str, color: str = Colors.YELLOW):
@@ -58,10 +59,7 @@ def main():
# Step 1: Install framework package
print_step("Step 1: Installing framework package...")
if not run_command(
[sys.executable, "-m", "pip", "install", "-e", "."],
"Failed to install framework package"
):
if not run_command([sys.executable, "-m", "pip", "install", "-e", "."], "Failed to install framework package"):
sys.exit(1)
print_success("Framework package installed")
print()
@@ -70,7 +68,7 @@ def main():
print_step("Step 2: Installing MCP dependencies...")
if not run_command(
[sys.executable, "-m", "pip", "install", "mcp", "fastmcp"],
"Failed to install MCP dependencies"
"Failed to install MCP dependencies",
):
sys.exit(1)
print_success("MCP dependencies installed")
@@ -95,12 +93,12 @@ def main():
"agent-builder": {
"command": "python",
"args": ["-m", "framework.mcp.agent_builder_server"],
"cwd": str(script_dir)
"cwd": str(script_dir),
}
}
}
with open(mcp_config_path, 'w') as f:
with open(mcp_config_path, "w") as f:
json.dump(config, f, indent=2)
print_success("Created .mcp.json")
@@ -114,7 +112,7 @@ def main():
[sys.executable, "-c", "from framework.mcp import agent_builder_server"],
check=True,
capture_output=True,
text=True
text=True,
)
print_success("MCP server module verified")
except subprocess.CalledProcessError as e:
@@ -143,7 +141,7 @@ def main():
"agent-builder": {
"command": "python",
"args": ["-m", "framework.mcp.agent_builder_server"],
"cwd": str(script_dir)
"cwd": str(script_dir),
}
}
}
+1 -1
View File
@@ -2,7 +2,7 @@
from pathlib import Path
from framework import Runtime, BuilderQuery
from framework import BuilderQuery, Runtime
from framework.schemas.run import RunStatus
+40 -35
View File
@@ -10,27 +10,28 @@ Tests cover:
"""
import asyncio
import pytest
from framework.graph.plan import (
Plan,
PlanStep,
ActionSpec,
ActionType,
StepStatus,
Judgment,
JudgmentAction,
EvaluationRule,
PlanExecutionResult,
ExecutionStatus,
)
from framework.graph.code_sandbox import (
CodeSandbox,
safe_exec,
safe_eval,
safe_exec,
)
from framework.graph.judge import HybridJudge, create_default_judge
from framework.graph.goal import Goal, SuccessCriterion
from framework.graph.judge import HybridJudge, create_default_judge
from framework.graph.plan import (
ActionSpec,
ActionType,
EvaluationRule,
ExecutionStatus,
Judgment,
JudgmentAction,
Plan,
PlanExecutionResult,
PlanStep,
StepStatus,
)
class TestPlanDataStructures:
@@ -216,12 +217,14 @@ class TestHybridJudge:
def test_rule_based_accept(self):
"""Test rule-based accept judgment."""
judge = HybridJudge()
judge.add_rule(EvaluationRule(
judge.add_rule(
EvaluationRule(
id="success_check",
description="Accept on success flag",
condition="result.get('success') == True",
action=JudgmentAction.ACCEPT,
))
)
)
step = PlanStep(
id="test_step",
@@ -238,9 +241,7 @@ class TestHybridJudge:
)
# Use sync version for testing
judgment = asyncio.run(
judge.evaluate(step, {"success": True}, goal)
)
judgment = asyncio.run(judge.evaluate(step, {"success": True}, goal))
assert judgment.action == JudgmentAction.ACCEPT
assert judgment.rule_matched == "success_check"
@@ -248,13 +249,15 @@ class TestHybridJudge:
def test_rule_based_retry(self):
"""Test rule-based retry judgment."""
judge = HybridJudge()
judge.add_rule(EvaluationRule(
judge.add_rule(
EvaluationRule(
id="timeout_retry",
description="Retry on timeout",
condition="result.get('error_type') == 'timeout'",
action=JudgmentAction.RETRY,
feedback_template="Timeout occurred, please retry",
))
)
)
step = PlanStep(
id="test_step",
@@ -270,9 +273,7 @@ class TestHybridJudge:
],
)
judgment = asyncio.run(
judge.evaluate(step, {"error_type": "timeout"}, goal)
)
judgment = asyncio.run(judge.evaluate(step, {"error_type": "timeout"}, goal))
assert judgment.action == JudgmentAction.RETRY
@@ -281,22 +282,26 @@ class TestHybridJudge:
judge = HybridJudge()
# Lower priority - would match
judge.add_rule(EvaluationRule(
judge.add_rule(
EvaluationRule(
id="low_priority",
description="Low priority accept",
condition="True",
action=JudgmentAction.ACCEPT,
priority=1,
))
)
)
# Higher priority - should match first
judge.add_rule(EvaluationRule(
judge.add_rule(
EvaluationRule(
id="high_priority",
description="High priority escalate",
condition="True",
action=JudgmentAction.ESCALATE,
priority=100,
))
)
)
step = PlanStep(
id="test_step",
@@ -312,9 +317,7 @@ class TestHybridJudge:
],
)
judgment = asyncio.run(
judge.evaluate(step, {}, goal)
)
judgment = asyncio.run(judge.evaluate(step, {}, goal))
assert judgment.rule_matched == "high_priority"
assert judgment.action == JudgmentAction.ESCALATE
@@ -397,8 +400,8 @@ class TestFlexibleExecutorIntegration:
def test_executor_creation(self, tmp_path):
"""Test creating a FlexibleGraphExecutor."""
from framework.runtime.core import Runtime
from framework.graph.flexible_executor import FlexibleGraphExecutor
from framework.runtime.core import Runtime
runtime = Runtime(storage_path=tmp_path / "runtime")
executor = FlexibleGraphExecutor(runtime=runtime)
@@ -409,17 +412,19 @@ class TestFlexibleExecutorIntegration:
def test_executor_with_custom_judge(self, tmp_path):
"""Test executor with custom judge."""
from framework.runtime.core import Runtime
from framework.graph.flexible_executor import FlexibleGraphExecutor
from framework.runtime.core import Runtime
runtime = Runtime(storage_path=tmp_path / "runtime")
custom_judge = HybridJudge()
custom_judge.add_rule(EvaluationRule(
custom_judge.add_rule(
EvaluationRule(
id="custom_rule",
description="Custom rule",
condition="True",
action=JudgmentAction.ACCEPT,
))
)
)
executor = FlexibleGraphExecutor(runtime=runtime, judge=custom_judge)
+20 -37
View File
@@ -10,11 +10,11 @@ For live tests (requires API keys):
"""
import os
from unittest.mock import patch, MagicMock
from unittest.mock import MagicMock, patch
from framework.llm.litellm import LiteLLMProvider
from framework.llm.anthropic import AnthropicProvider
from framework.llm.provider import LLMProvider, Tool, ToolUse, ToolResult
from framework.llm.litellm import LiteLLMProvider
from framework.llm.provider import LLMProvider, Tool, ToolResult, ToolUse
class TestLiteLLMProviderInit:
@@ -41,11 +41,7 @@ class TestLiteLLMProviderInit:
def test_init_with_api_base(self):
"""Test initialization with custom API base."""
provider = LiteLLMProvider(
model="gpt-4o-mini",
api_key="my-key",
api_base="https://my-proxy.com/v1"
)
provider = LiteLLMProvider(model="gpt-4o-mini", api_key="my-key", api_base="https://my-proxy.com/v1")
assert provider.api_base == "https://my-proxy.com/v1"
def test_init_ollama_no_key_needed(self):
@@ -73,9 +69,7 @@ class TestLiteLLMProviderComplete:
mock_completion.return_value = mock_response
provider = LiteLLMProvider(model="gpt-4o-mini", api_key="test-key")
result = provider.complete(
messages=[{"role": "user", "content": "Hello"}]
)
result = provider.complete(messages=[{"role": "user", "content": "Hello"}])
assert result.content == "Hello! I'm an AI assistant."
assert result.model == "gpt-4o-mini"
@@ -102,10 +96,7 @@ class TestLiteLLMProviderComplete:
mock_completion.return_value = mock_response
provider = LiteLLMProvider(model="gpt-4o-mini", api_key="test-key")
provider.complete(
messages=[{"role": "user", "content": "Hello"}],
system="You are a helpful assistant."
)
provider.complete(messages=[{"role": "user", "content": "Hello"}], system="You are a helpful assistant.")
call_kwargs = mock_completion.call_args[1]
messages = call_kwargs["messages"]
@@ -131,18 +122,13 @@ class TestLiteLLMProviderComplete:
name="get_weather",
description="Get the weather for a location",
parameters={
"properties": {
"location": {"type": "string", "description": "City name"}
"properties": {"location": {"type": "string", "description": "City name"}},
"required": ["location"],
},
"required": ["location"]
}
)
]
provider.complete(
messages=[{"role": "user", "content": "What's the weather?"}],
tools=tools
)
provider.complete(messages=[{"role": "user", "content": "What's the weather?"}], tools=tools)
call_kwargs = mock_completion.call_args[1]
assert "tools" in call_kwargs
@@ -187,22 +173,21 @@ class TestLiteLLMProviderToolUse:
Tool(
name="get_weather",
description="Get the weather",
parameters={"properties": {"location": {"type": "string"}}, "required": ["location"]}
parameters={
"properties": {"location": {"type": "string"}},
"required": ["location"],
},
)
]
def tool_executor(tool_use: ToolUse) -> ToolResult:
return ToolResult(
tool_use_id=tool_use.id,
content="Sunny, 22C",
is_error=False
)
return ToolResult(tool_use_id=tool_use.id, content="Sunny, 22C", is_error=False)
result = provider.complete_with_tools(
messages=[{"role": "user", "content": "What's the weather in London?"}],
system="You are a weather assistant.",
tools=tools,
tool_executor=tool_executor
tool_executor=tool_executor,
)
assert result.content == "The weather in London is sunny."
@@ -222,11 +207,9 @@ class TestToolConversion:
name="search",
description="Search the web",
parameters={
"properties": {
"query": {"type": "string", "description": "Search query"}
"properties": {"query": {"type": "string", "description": "Search query"}},
"required": ["query"],
},
"required": ["query"]
}
)
result = provider._tool_to_openai_format(tool)
@@ -280,7 +263,7 @@ class TestAnthropicProviderBackwardCompatibility:
result = provider.complete(
messages=[{"role": "user", "content": "Hello"}],
system="You are helpful.",
max_tokens=100
max_tokens=100,
)
assert result.content == "Hello from Claude!"
@@ -313,7 +296,7 @@ class TestAnthropicProviderBackwardCompatibility:
Tool(
name="get_time",
description="Get current time",
parameters={"properties": {}, "required": []}
parameters={"properties": {}, "required": []},
)
]
@@ -324,7 +307,7 @@ class TestAnthropicProviderBackwardCompatibility:
messages=[{"role": "user", "content": "What time is it?"}],
system="You are a time assistant.",
tools=tools,
tool_executor=tool_executor
tool_executor=tool_executor,
)
assert result.content == "The time is 3:00 PM."
+12 -8
View File
@@ -7,12 +7,9 @@ import pytest
def _mcp_available() -> bool:
"""Check if MCP dependencies are installed."""
try:
import mcp
from mcp.server import FastMCP
return True
except ImportError:
return False
import importlib.util
return importlib.util.find_spec("mcp") is not None
MCP_AVAILABLE = _mcp_available()
@@ -28,6 +25,7 @@ class TestMCPDependencies:
pytest.skip(MCP_SKIP_REASON)
import mcp
assert mcp is not None
def test_fastmcp_available(self):
@@ -36,6 +34,7 @@ class TestMCPDependencies:
pytest.skip(MCP_SKIP_REASON)
from mcp.server import FastMCP
assert FastMCP is not None
@@ -48,6 +47,7 @@ class TestAgentBuilderServerModule:
pytest.skip(MCP_SKIP_REASON)
import framework.mcp.agent_builder_server as module
assert module is not None
def test_mcp_object_exported(self):
@@ -55,9 +55,10 @@ class TestAgentBuilderServerModule:
if not MCP_AVAILABLE:
pytest.skip(MCP_SKIP_REASON)
from framework.mcp.agent_builder_server import mcp
from mcp.server import FastMCP
from framework.mcp.agent_builder_server import mcp
assert mcp is not None
assert isinstance(mcp, FastMCP)
@@ -67,6 +68,7 @@ class TestAgentBuilderServerModule:
pytest.skip(MCP_SKIP_REASON)
from framework.mcp.agent_builder_server import mcp
assert mcp.name == "agent-builder"
@@ -79,6 +81,7 @@ class TestMCPPackageExports:
pytest.skip(MCP_SKIP_REASON)
import framework.mcp
assert framework.mcp is not None
def test_agent_builder_server_exported(self):
@@ -86,8 +89,9 @@ class TestMCPPackageExports:
if not MCP_AVAILABLE:
pytest.skip(MCP_SKIP_REASON)
from framework.mcp import agent_builder_server
from mcp.server import FastMCP
from framework.mcp import agent_builder_server
assert agent_builder_server is not None
assert isinstance(agent_builder_server, FastMCP)
+9 -9
View File
@@ -7,8 +7,8 @@ Run with:
from unittest.mock import Mock, patch
from framework.llm.provider import LLMProvider
from framework.llm.litellm import LiteLLMProvider
from framework.llm.provider import LLMProvider
from framework.runner.orchestrator import AgentOrchestrator
@@ -17,7 +17,7 @@ class TestOrchestratorLLMInitialization:
def test_auto_creates_litellm_provider_when_no_llm_passed(self):
"""Test that LiteLLMProvider is auto-created when no llm is passed."""
with patch.object(LiteLLMProvider, '__init__', return_value=None) as mock_init:
with patch.object(LiteLLMProvider, "__init__", return_value=None) as mock_init:
orchestrator = AgentOrchestrator()
mock_init.assert_called_once_with(model="claude-haiku-4-5-20251001")
@@ -25,14 +25,14 @@ class TestOrchestratorLLMInitialization:
def test_uses_custom_model_parameter(self):
"""Test that custom model parameter is passed to LiteLLMProvider."""
with patch.object(LiteLLMProvider, '__init__', return_value=None) as mock_init:
with patch.object(LiteLLMProvider, "__init__", return_value=None) as mock_init:
AgentOrchestrator(model="gpt-4o")
mock_init.assert_called_once_with(model="gpt-4o")
def test_supports_openai_model_names(self):
"""Test that OpenAI model names are supported."""
with patch.object(LiteLLMProvider, '__init__', return_value=None) as mock_init:
with patch.object(LiteLLMProvider, "__init__", return_value=None) as mock_init:
orchestrator = AgentOrchestrator(model="gpt-4o-mini")
mock_init.assert_called_once_with(model="gpt-4o-mini")
@@ -40,7 +40,7 @@ class TestOrchestratorLLMInitialization:
def test_supports_anthropic_model_names(self):
"""Test that Anthropic model names are supported."""
with patch.object(LiteLLMProvider, '__init__', return_value=None) as mock_init:
with patch.object(LiteLLMProvider, "__init__", return_value=None) as mock_init:
orchestrator = AgentOrchestrator(model="claude-3-haiku-20240307")
mock_init.assert_called_once_with(model="claude-3-haiku-20240307")
@@ -50,7 +50,7 @@ class TestOrchestratorLLMInitialization:
"""Test that auto-creation is skipped when llm is explicitly passed."""
mock_llm = Mock(spec=LLMProvider)
with patch.object(LiteLLMProvider, '__init__', return_value=None) as mock_init:
with patch.object(LiteLLMProvider, "__init__", return_value=None) as mock_init:
orchestrator = AgentOrchestrator(llm=mock_llm)
mock_init.assert_not_called()
@@ -58,7 +58,7 @@ class TestOrchestratorLLMInitialization:
def test_model_attribute_stored_correctly(self):
"""Test that _model attribute is stored correctly."""
with patch.object(LiteLLMProvider, '__init__', return_value=None):
with patch.object(LiteLLMProvider, "__init__", return_value=None):
orchestrator = AgentOrchestrator(model="gemini/gemini-1.5-flash")
assert orchestrator._model == "gemini/gemini-1.5-flash"
@@ -78,5 +78,5 @@ class TestOrchestratorLLMProviderType:
orchestrator = AgentOrchestrator()
assert isinstance(orchestrator._llm, LLMProvider)
assert hasattr(orchestrator._llm, 'complete')
assert hasattr(orchestrator._llm, 'complete_with_tools')
assert hasattr(orchestrator._llm, "complete")
assert hasattr(orchestrator._llm, "complete_with_tools")
+12 -8
View File
@@ -1,16 +1,18 @@
"""Tests for plan.py - Plan enums and Pydantic models."""
import json
import pytest
from framework.graph.plan import (
ActionType,
StepStatus,
ApprovalDecision,
JudgmentAction,
ExecutionStatus,
ActionSpec,
PlanStep,
ActionType,
ApprovalDecision,
ExecutionStatus,
JudgmentAction,
Plan,
PlanStep,
StepStatus,
)
@@ -210,7 +212,8 @@ class TestPlanFromJson:
def test_plan_from_json_string(self):
"""Parse Plan from JSON string."""
json_str = json.dumps({
json_str = json.dumps(
{
"id": "plan_1",
"goal_id": "goal_1",
"description": "Test plan",
@@ -224,7 +227,8 @@ class TestPlanFromJson:
},
}
],
})
}
)
plan = Plan.from_json(json_str)
+10 -2
View File
@@ -1,12 +1,16 @@
"""
Test the run module.
"""
from datetime import datetime
from framework.schemas.run import RunMetrics, Run, RunStatus, RunSummary
from framework.schemas.decision import Decision, Outcome, Option
from framework.schemas.decision import Decision, Option, Outcome
from framework.schemas.run import Run, RunMetrics, RunStatus, RunSummary
class TestRuntimeMetrics:
"""Test the RunMetrics class."""
def test_success_rate(self):
metrics = RunMetrics(
total_decisions=10,
@@ -23,8 +27,10 @@ class TestRuntimeMetrics:
)
assert metrics.success_rate == 0.0
class TestRun:
"""Test the Run class."""
def test_duration_ms(self):
run = Run(
id="test_run",
@@ -124,8 +130,10 @@ class TestRun:
assert run.status == RunStatus.COMPLETED
assert run.narrative == "Test narrative"
class TestRunSummary:
"""Test the RunSummary class."""
def test_from_run_basic(self):
run = Run(
id="test_run",
+2 -1
View File
@@ -1,8 +1,9 @@
"""Tests for the Runtime class - the agent's interface to record decisions."""
import pytest
from pathlib import Path
import pytest
from framework import Runtime
from framework.schemas.decision import DecisionType
+40 -21
View File
@@ -9,25 +9,25 @@ Tests cover:
import pytest
from framework.testing.test_case import (
Test,
TestType,
ApprovalStatus,
)
from framework.testing.test_result import (
TestResult,
TestSuiteResult,
ErrorCategory,
)
from framework.testing.test_storage import TestStorage
from framework.testing.categorizer import ErrorCategorizer
from framework.testing.debug_tool import DebugTool
from framework.testing.test_case import (
ApprovalStatus,
Test,
TestType,
)
from framework.testing.test_result import (
ErrorCategory,
TestResult,
TestSuiteResult,
)
from framework.testing.test_storage import TestStorage
# ============================================================================
# Test Schema Tests
# ============================================================================
class TestTestCaseSchema:
"""Tests for Test schema."""
@@ -189,8 +189,12 @@ class TestTestSuiteResult:
results = [
TestResult(test_id="t1", passed=True, duration_ms=100),
TestResult(test_id="t2", passed=True, duration_ms=50),
TestResult(test_id="t3", passed=False, duration_ms=75,
error_category=ErrorCategory.IMPLEMENTATION_ERROR),
TestResult(
test_id="t3",
passed=False,
duration_ms=75,
error_category=ErrorCategory.IMPLEMENTATION_ERROR,
),
]
suite = TestSuiteResult(
@@ -203,18 +207,30 @@ class TestTestSuiteResult:
)
assert not suite.all_passed
assert suite.pass_rate == pytest.approx(2/3)
assert suite.pass_rate == pytest.approx(2 / 3)
assert len(suite.get_failed_results()) == 1
def test_get_results_by_category(self):
"""Test filtering results by error category."""
results = [
TestResult(test_id="t1", passed=False, duration_ms=100,
error_category=ErrorCategory.LOGIC_ERROR),
TestResult(test_id="t2", passed=False, duration_ms=50,
error_category=ErrorCategory.IMPLEMENTATION_ERROR),
TestResult(test_id="t3", passed=False, duration_ms=75,
error_category=ErrorCategory.IMPLEMENTATION_ERROR),
TestResult(
test_id="t1",
passed=False,
duration_ms=100,
error_category=ErrorCategory.LOGIC_ERROR,
),
TestResult(
test_id="t2",
passed=False,
duration_ms=50,
error_category=ErrorCategory.IMPLEMENTATION_ERROR,
),
TestResult(
test_id="t3",
passed=False,
duration_ms=75,
error_category=ErrorCategory.IMPLEMENTATION_ERROR,
),
]
suite = TestSuiteResult(
@@ -233,6 +249,7 @@ class TestTestSuiteResult:
# Storage Tests
# ============================================================================
class TestTestStorage:
"""Tests for TestStorage."""
@@ -389,6 +406,7 @@ class TestTestStorage:
# Error Categorizer Tests
# ============================================================================
class TestErrorCategorizer:
"""Tests for ErrorCategorizer."""
@@ -463,6 +481,7 @@ class TestErrorCategorizer:
# Debug Tool Tests
# ============================================================================
class TestDebugTool:
"""Tests for DebugTool."""
+15 -20
View File
@@ -12,11 +12,11 @@ from pathlib import Path
class Colors:
GREEN = '\033[0;32m'
YELLOW = '\033[1;33m'
RED = '\033[0;31m'
BLUE = '\033[0;34m'
NC = '\033[0m'
GREEN = "\033[0;32m"
YELLOW = "\033[1;33m"
RED = "\033[0;31m"
BLUE = "\033[0;34m"
NC = "\033[0m"
def check(description: str) -> bool:
@@ -55,7 +55,7 @@ def main():
[sys.executable, "-c", "import framework; print(framework.__file__)"],
capture_output=True,
text=True,
check=True
check=True,
)
framework_path = result.stdout.strip()
success(f"installed at {framework_path}")
@@ -69,11 +69,7 @@ def main():
missing_deps = []
for dep in ["mcp", "fastmcp"]:
try:
subprocess.run(
[sys.executable, "-c", f"import {dep}"],
capture_output=True,
check=True
)
subprocess.run([sys.executable, "-c", f"import {dep}"], capture_output=True, check=True)
except subprocess.CalledProcessError:
missing_deps.append(dep)
@@ -91,7 +87,7 @@ def main():
[sys.executable, "-c", "from framework.mcp import agent_builder_server"],
capture_output=True,
text=True,
check=True
check=True,
)
success("loads successfully")
except subprocess.CalledProcessError as e:
@@ -137,11 +133,7 @@ def main():
failed_modules = []
for module in modules_to_check:
try:
subprocess.run(
[sys.executable, "-c", f"import {module}"],
capture_output=True,
check=True
)
subprocess.run([sys.executable, "-c", f"import {module}"], capture_output=True, check=True)
except subprocess.CalledProcessError:
failed_modules.append(module)
@@ -156,12 +148,15 @@ def main():
try:
# Try to import and instantiate the MCP server
result = subprocess.run(
[sys.executable, "-c",
"from framework.mcp.agent_builder_server import mcp; print('OK')"],
[
sys.executable,
"-c",
"from framework.mcp.agent_builder_server import mcp; print('OK')",
],
capture_output=True,
text=True,
check=True,
timeout=5
timeout=5,
)
if "OK" in result.stdout:
success("server can start")
+1 -1
View File
@@ -31,7 +31,7 @@ class TestGetSecurePath:
"""Session directory is created if it doesn't exist."""
from aden_tools.tools.file_system_toolkits.security import get_secure_path
result = get_secure_path("file.txt", **ids)
get_secure_path("file.txt", **ids)
session_dir = self.workspaces_dir / "test-workspace" / "test-agent" / "test-session"
assert session_dir.exists()