feat: update skills and agent builder tools, bump pinned ruff version

This commit is contained in:
Timothy
2026-02-04 09:52:28 -08:00
parent 119280da1a
commit a23f01973a
12 changed files with 1008 additions and 936 deletions
+34 -31
View File
@@ -46,6 +46,7 @@ Use this meta-skill when:
"Need to understand agent concepts" → building-agents-core
"Build a new agent" → building-agents-construction
"Optimize my agent design" → building-agents-patterns
"Need client-facing nodes or feedback loops" → building-agents-patterns
"Set up API keys for my agent" → setup-credentials
"Test my agent" → testing-agent
"Not sure what I need" → Read phases below, then decide
@@ -63,12 +64,12 @@ Use this meta-skill when:
- First time building an agent
- Need to understand node types, edges, goals
- Want to validate tool availability
- Learning about pause/resume architecture
- Learning about event loop architecture and client-facing nodes
### What This Phase Provides
- Architecture overview (Python packages, not JSON)
- Core concepts (Goal, Node, Edge, Pause/Resume)
- Core concepts (Goal, Node, Edge, Event Loop, Judges)
- Tool discovery and validation procedures
- Workflow overview
@@ -153,19 +154,20 @@ exports/agent_name/
### When to Use
- Want to add pause/resume functionality
- Want to add client-facing blocking or feedback edges
- Need judge patterns for output validation
- Want fan-out/fan-in (parallel execution)
- Need error handling patterns
- Want to optimize performance
- Need examples of complex routing
- Want best practices guidance
### What This Phase Provides
- Practical examples and patterns
- Pause/resume architecture
- Error handling strategies
- Client-facing interaction patterns
- Feedback edge routing with nullable output keys
- Judge patterns (implicit, SchemaJudge)
- Fan-out/fan-in parallel execution
- Context management and spillover patterns
- Anti-patterns to avoid
- Performance optimization techniques
**Skip this phase** if your agent design is straightforward.
@@ -291,15 +293,15 @@ User: "Build an agent"
→ Done: Working agent
```
### Pattern 4: Complex Agent with Patterns
### Pattern 4: Agent with Review Loops and HITL Checkpoints
```
User: "Build an agent with multi-turn conversations"
→ Use /building-agents-core (learn pause/resume)
→ Use /building-agents-construction (build structure)
→ Use /building-agents-patterns (implement pause/resume pattern)
→ Use /testing-agent (validate conversation flows)
→ Done: Complex conversational agent
User: "Build an agent with human review and feedback loops"
→ Use /building-agents-core (learn event loop, client-facing nodes)
→ Use /building-agents-construction (build structure with feedback edges)
→ Use /building-agents-patterns (implement client-facing + feedback patterns)
→ Use /testing-agent (validate review flows and edge routing)
→ Done: Agent with HITL checkpoints and review loops
```
## Skill Dependencies
@@ -308,25 +310,26 @@ User: "Build an agent with multi-turn conversations"
agent-workflow (meta-skill)
├── building-agents-core (foundational)
│ ├── Architecture concepts
│ ├── Node/Edge/Goal definitions
│ ├── Architecture concepts (event loop, judges)
│ ├── Node types (event_loop, function)
│ ├── Edge routing and priority
│ ├── Tool discovery procedures
│ └── Workflow overview
├── building-agents-construction (procedural)
│ ├── Creates package structure
│ ├── Defines goal
│ ├── Adds nodes incrementally
│ ├── Connects edges
│ ├── Adds nodes (event_loop, function)
│ ├── Connects edges with priority routing
│ ├── Finalizes agent class
│ └── Requires: building-agents-core
├── building-agents-patterns (reference)
│ ├── Best practices
│ ├── Pause/resume patterns
│ ├── Error handling
│ ├── Anti-patterns
│ └── Performance optimization
│ ├── Client-facing interaction patterns
│ ├── Feedback edges and review loops
│ ├── Judge patterns (implicit, SchemaJudge)
│ ├── Fan-out/fan-in parallel execution
│ └── Context management and anti-patterns
└── testing-agent
├── Reads agent goal
@@ -439,9 +442,9 @@ The workflow is **flexible** - skip phases as needed, iterate freely, and adapt
**Choose building-agents-core when:**
- First time building agents
- Need to understand architecture
- Need to understand event loop architecture
- Validating tool availability
- Learning about node types and edges
- Learning about node types, edges, and judges
**Choose building-agents-construction when:**
- Actually building an agent
@@ -451,13 +454,13 @@ The workflow is **flexible** - skip phases as needed, iterate freely, and adapt
**Choose building-agents-patterns when:**
- Agent structure complete
- Need advanced patterns
- Implementing pause/resume
- Optimizing performance
- Need client-facing nodes or feedback edges
- Implementing review loops or fan-out/fan-in
- Want judge patterns or context management
- Want best practices
**Choose testing-agent when:**
- Agent structure complete
- Ready to validate functionality
- Need comprehensive test coverage
- Debugging agent behavior
- Testing feedback loops, output keys, or fan-out
@@ -124,11 +124,14 @@ AskUserQuestion(questions=[{
- node_id (kebab-case)
- name
- description
- node_type: `"llm_generate"` (no tools) or `"llm_tool_use"` (uses tools)
- node_type: `"event_loop"` (recommended for all LLM work) or `"function"` (deterministic, no LLM)
- input_keys (what data this node receives)
- output_keys (what data this node produces)
- tools (ONLY tools that exist - empty list for llm_generate)
- system_prompt
- tools (ONLY tools that exist - empty list if no tools needed)
- system_prompt (should mention `set_output` for producing structured outputs)
- client_facing: True if this node interacts with the user
- nullable_output_keys (for mutually exclusive outputs)
- max_node_visits (>1 if this node is a feedback loop target)
**PRESENT the workflow to the user:**
@@ -136,7 +139,7 @@ AskUserQuestion(questions=[{
>
> 1. **[node-id]** - [description]
>
> - Type: [llm_generate/llm_tool_use]
> - Type: event_loop [client-facing] / function
> - Input: [keys]
> - Output: [keys]
> - Tools: [tools or "none"]
@@ -211,8 +214,8 @@ mcp__agent-builder__get_session_status()
- source (node that outputs)
- target (node that receives)
- condition: `"on_success"`, `"always"`, `"on_failure"`, or `"conditional"`
- condition_expr (Python expression, only if conditional)
- priority (integer, lower = higher priority)
- condition_expr (Python expression using `output.get(...)`, only if conditional)
- priority (positive = forward edge evaluated first, negative = feedback edge)
**FOR EACH edge, call:**
@@ -318,44 +321,113 @@ mcp__agent-builder__get_session_status()
## REFERENCE: Node Types
| Type | tools param | Use when |
| -------------- | ---------------------- | ---------------------------------------------- |
| `llm_generate` | `'[]'` | Pure reasoning, JSON output, no external calls |
| `llm_tool_use` | `'["tool1", "tool2"]'` | Needs to call MCP tools |
|------|-------------|----------|
| `event_loop` | `'["tool1"]'` or `'[]'` | **Recommended.** LLM-powered work with or without tools |
| `function` | N/A | Deterministic Python operations, no LLM |
| `llm_generate` (legacy) | `'[]'` | Deprecated — use `event_loop` instead |
| `llm_tool_use` (legacy) | `'["tool1"]'` | Deprecated — use `event_loop` instead |
---
## REFERENCE: Edge Conditions
## REFERENCE: NodeSpec New Fields
| Field | Default | Description |
|-------|---------|-------------|
| `client_facing` | `False` | Streams output to user, blocks for input between turns |
| `nullable_output_keys` | `[]` | Output keys that may remain unset (mutually exclusive outputs) |
| `max_node_visits` | `1` | Max executions per run. Set >1 for feedback loop targets. 0=unlimited |
---
## REFERENCE: Edge Conditions & Priority
| Condition | When edge is followed |
| ------------- | ------------------------------------- |
|-----------|--------------------------------------|
| `on_success` | Source node completed successfully |
| `on_failure` | Source node failed |
| `always` | Always, regardless of success/failure |
| `conditional` | When condition_expr evaluates to True |
**Priority:** Positive = forward edge (evaluated first). Negative = feedback edge (loops back to earlier node). Multiple ON_SUCCESS edges from same source = parallel execution (fan-out).
---
## REFERENCE: System Prompt Best Practice
For nodes with JSON output, include this in the system_prompt:
For event_loop nodes, instruct the LLM to use `set_output` for structured outputs:
```
CRITICAL: Return ONLY raw JSON. NO markdown, NO code blocks.
Just the JSON object starting with { and ending with }.
Use set_output(key, value) to store your results. For example:
- set_output("search_results", <your results as a JSON string>)
Return this exact structure:
{
"key1": "...",
"key2": "..."
}
Do NOT return raw JSON. Use the set_output tool to produce outputs.
```
---
## CRITICAL: EventLoopNode Registration
**`AgentRuntime` does NOT support `event_loop` nodes.** The `AgentRuntime` / `create_agent_runtime()` path creates `GraphExecutor` instances internally without passing a `node_registry`, causing all `event_loop` nodes to fail at runtime with:
```
EventLoopNode 'node-id' not found in registry. Register it with executor.register_node() before execution.
```
**The correct pattern**: Use `GraphExecutor` directly with a `node_registry` dict containing `EventLoopNode` instances:
```python
from framework.graph.executor import GraphExecutor, ExecutionResult
from framework.graph.event_loop_node import EventLoopNode, LoopConfig
from framework.runtime.event_bus import EventBus
from framework.runtime.core import Runtime # REQUIRED - executor calls runtime.start_run()
# 1. Build node_registry with EventLoopNode instances
event_bus = EventBus()
node_registry = {}
for node_spec in nodes:
if node_spec.node_type == "event_loop":
node_registry[node_spec.id] = EventLoopNode(
event_bus=event_bus,
judge=None, # implicit judge: accepts when output_keys are filled
config=LoopConfig(
max_iterations=50,
max_tool_calls_per_turn=15,
stall_detection_threshold=3,
max_history_tokens=32000,
),
tool_executor=tool_executor,
)
# 2. Create Runtime for run tracking (GraphExecutor calls runtime.start_run())
storage_path = Path.home() / ".hive" / "my_agent"
storage_path.mkdir(parents=True, exist_ok=True)
runtime = Runtime(storage_path)
# 3. Create GraphExecutor WITH node_registry and runtime
executor = GraphExecutor(
runtime=runtime, # NOT None - executor needs this for run tracking
llm=llm,
tools=tools,
tool_executor=tool_executor,
node_registry=node_registry, # EventLoopNode instances
)
# 4. Execute
result = await executor.execute(graph=graph, goal=goal, input_data=input_data)
```
**DO NOT use `AgentRuntime` or `create_agent_runtime()` for agents with `event_loop` nodes.**
**DO NOT pass `runtime=None` to `GraphExecutor`** — it will crash with `'NoneType' object has no attribute 'start_run'`.
---
## COMMON MISTAKES TO AVOID
1. **Using tools that don't exist** - Always check `mcp__agent-builder__list_mcp_tools()` first
2. **Wrong entry_points format** - Must be `{"start": "node-id"}`, NOT a set or list
3. **Skipping validation** - Always validate nodes and graph before proceeding
4. **Not waiting for approval** - Always ask user before major steps
5. **Displaying this file** - Execute the steps, don't show documentation
1. **Using `AgentRuntime` with event_loop nodes** - `AgentRuntime` does not register EventLoopNodes. Use `GraphExecutor` directly with `node_registry`
2. **Passing `runtime=None` to GraphExecutor** - The executor calls `runtime.start_run()` internally. Always provide a `Runtime(storage_path)` instance
3. **Using tools that don't exist** - Always check `mcp__agent-builder__list_mcp_tools()` first
4. **Wrong entry_points format** - Must be `{"start": "node-id"}`, NOT a set or list
5. **Skipping validation** - Always validate nodes and graph before proceeding
6. **Not waiting for approval** - Always ask user before major steps
7. **Displaying this file** - Execute the steps, don't show documentation
@@ -2,9 +2,10 @@
from framework.graph import EdgeSpec, EdgeCondition, Goal, SuccessCriterion, Constraint
from framework.graph.edge import GraphSpec
from framework.graph.executor import ExecutionResult
from framework.runtime.agent_runtime import AgentRuntime, create_agent_runtime
from framework.runtime.execution_stream import EntryPointSpec
from framework.graph.executor import ExecutionResult, GraphExecutor
from framework.graph.event_loop_node import EventLoopNode, LoopConfig
from framework.runtime.event_bus import EventBus
from framework.runtime.core import Runtime
from framework.llm import LiteLLMProvider
from framework.runner.tool_registry import ToolRegistry
@@ -166,7 +167,8 @@ class OnlineResearchAgent:
"""
Online Research Agent - Deep-dive research with narrative reports.
Uses AgentRuntime for multi-entrypoint support with HITL pause/resume.
Uses GraphExecutor directly with EventLoopNode instances registered
in the node_registry for multi-turn tool execution.
"""
def __init__(self, config=None):
@@ -178,60 +180,14 @@ class OnlineResearchAgent:
self.entry_points = entry_points
self.pause_nodes = pause_nodes
self.terminal_nodes = terminal_nodes
self._runtime: AgentRuntime | None = None
self._executor: GraphExecutor | None = None
self._graph: GraphSpec | None = None
self._event_bus: EventBus | None = None
self._tool_registry: ToolRegistry | None = None
def _build_entry_point_specs(self) -> list[EntryPointSpec]:
"""Convert entry_points dict to EntryPointSpec list."""
specs = []
for ep_id, node_id in self.entry_points.items():
if ep_id == "start":
trigger_type = "manual"
name = "Start"
elif "_resume" in ep_id:
trigger_type = "resume"
name = f"Resume from {ep_id.replace('_resume', '')}"
else:
trigger_type = "manual"
name = ep_id.replace("-", " ").title()
specs.append(
EntryPointSpec(
id=ep_id,
name=name,
entry_node=node_id,
trigger_type=trigger_type,
isolation_level="shared",
)
)
return specs
def _create_runtime(self, mock_mode=False) -> AgentRuntime:
"""Create AgentRuntime instance."""
import json
from pathlib import Path
# Persistent storage in ~/.hive for telemetry and run history
storage_path = Path.home() / ".hive" / "online_research_agent"
storage_path.mkdir(parents=True, exist_ok=True)
tool_registry = ToolRegistry()
# Load MCP servers (always load, needed for tool validation)
mcp_config_path = Path(__file__).parent / "mcp_servers.json"
if mcp_config_path.exists():
tool_registry.load_mcp_config(mcp_config_path)
llm = None
if not mock_mode:
# LiteLLMProvider uses environment variables for API keys
llm = LiteLLMProvider(
model=self.config.model,
api_key=self.config.api_key,
api_base=self.config.api_base,
)
self._graph = GraphSpec(
def _build_graph(self) -> GraphSpec:
"""Build the GraphSpec."""
return GraphSpec(
id="online-research-agent-graph",
goal_id=self.goal.id,
version="1.0.0",
@@ -245,54 +201,72 @@ class OnlineResearchAgent:
max_tokens=self.config.max_tokens,
)
# Create AgentRuntime with all entry points
self._runtime = create_agent_runtime(
graph=self._graph,
goal=self.goal,
storage_path=storage_path,
entry_points=self._build_entry_point_specs(),
llm=llm,
tools=list(tool_registry.get_tools().values()),
tool_executor=tool_registry.get_executor(),
def _build_node_registry(self, tool_executor=None) -> dict:
"""Create EventLoopNode instances for all event_loop nodes."""
registry = {}
for node_spec in self.nodes:
if node_spec.node_type == "event_loop":
registry[node_spec.id] = EventLoopNode(
event_bus=self._event_bus,
judge=None, # implicit judge: accept when output_keys are filled
config=LoopConfig(
max_iterations=50,
max_tool_calls_per_turn=15,
stall_detection_threshold=3,
max_history_tokens=32000,
),
tool_executor=tool_executor,
)
return registry
def _setup(self, mock_mode=False) -> GraphExecutor:
"""Set up the executor with all components."""
from pathlib import Path
storage_path = Path.home() / ".hive" / "online_research_agent"
storage_path.mkdir(parents=True, exist_ok=True)
self._event_bus = EventBus()
self._tool_registry = ToolRegistry()
mcp_config_path = Path(__file__).parent / "mcp_servers.json"
if mcp_config_path.exists():
self._tool_registry.load_mcp_config(mcp_config_path)
llm = None
if not mock_mode:
llm = LiteLLMProvider(
model=self.config.model,
api_key=self.config.api_key,
api_base=self.config.api_base,
)
return self._runtime
tool_executor = self._tool_registry.get_executor()
tools = list(self._tool_registry.get_tools().values())
self._graph = self._build_graph()
node_registry = self._build_node_registry(tool_executor=tool_executor)
runtime = Runtime(storage_path)
self._executor = GraphExecutor(
runtime=runtime,
llm=llm,
tools=tools,
tool_executor=tool_executor,
node_registry=node_registry,
)
return self._executor
async def start(self, mock_mode=False) -> None:
"""Start the agent runtime."""
if self._runtime is None:
self._create_runtime(mock_mode=mock_mode)
await self._runtime.start()
"""Set up the agent (initialize executor and tools)."""
if self._executor is None:
self._setup(mock_mode=mock_mode)
async def stop(self) -> None:
"""Stop the agent runtime."""
if self._runtime is not None:
await self._runtime.stop()
async def trigger(
self,
entry_point: str,
input_data: dict,
correlation_id: str | None = None,
session_state: dict | None = None,
) -> str:
"""
Trigger execution at a specific entry point (non-blocking).
Args:
entry_point: Entry point ID (e.g., "start", "pause-node_resume")
input_data: Input data for the execution
correlation_id: Optional ID to correlate related executions
session_state: Optional session state to resume from (with paused_at, memory)
Returns:
Execution ID for tracking
"""
if self._runtime is None or not self._runtime.is_running:
raise RuntimeError("Agent runtime not started. Call start() first.")
return await self._runtime.trigger(
entry_point, input_data, correlation_id, session_state=session_state
)
"""Clean up resources."""
self._executor = None
self._event_bus = None
async def trigger_and_wait(
self,
@@ -301,64 +275,32 @@ class OnlineResearchAgent:
timeout: float | None = None,
session_state: dict | None = None,
) -> ExecutionResult | None:
"""
Trigger execution and wait for completion.
"""Execute the graph and wait for completion."""
if self._executor is None:
raise RuntimeError("Agent not started. Call start() first.")
if self._graph is None:
raise RuntimeError("Graph not built. Call start() first.")
Args:
entry_point: Entry point ID
input_data: Input data for the execution
timeout: Maximum time to wait (seconds)
session_state: Optional session state to resume from (with paused_at, memory)
Returns:
ExecutionResult or None if timeout
"""
if self._runtime is None or not self._runtime.is_running:
raise RuntimeError("Agent runtime not started. Call start() first.")
return await self._runtime.trigger_and_wait(
entry_point, input_data, timeout, session_state=session_state
return await self._executor.execute(
graph=self._graph,
goal=self.goal,
input_data=input_data,
session_state=session_state,
)
async def run(
self, context: dict, mock_mode=False, session_state=None
) -> ExecutionResult:
"""
Run the agent (convenience method for simple single execution).
For more control, use start() + trigger_and_wait() + stop().
"""
"""Run the agent (convenience method for single execution)."""
await self.start(mock_mode=mock_mode)
try:
# Determine entry point based on session_state
if session_state and "paused_at" in session_state:
paused_node = session_state["paused_at"]
resume_key = f"{paused_node}_resume"
if resume_key in self.entry_points:
entry_point = resume_key
else:
entry_point = "start"
else:
entry_point = "start"
result = await self.trigger_and_wait(
entry_point, context, session_state=session_state
"start", context, session_state=session_state
)
return result or ExecutionResult(success=False, error="Execution timeout")
finally:
await self.stop()
async def get_goal_progress(self) -> dict:
"""Get goal progress across all executions."""
if self._runtime is None:
raise RuntimeError("Agent runtime not started")
return await self._runtime.get_goal_progress()
def get_stats(self) -> dict:
"""Get runtime statistics."""
if self._runtime is None:
return {"running": False}
return self._runtime.get_stats()
def info(self):
"""Get agent information."""
return {
@@ -401,7 +343,6 @@ class OnlineResearchAgent:
if pause not in node_ids:
errors.append(f"Pause node '{pause}' not found")
# Validate entry points
for ep_id, node_id in self.entry_points.items():
if node_id not in node_ids:
errors.append(
@@ -7,26 +7,9 @@ parse_query_node = NodeSpec(
id="parse-query",
name="Parse Query",
description="Analyze the research topic and generate 3-5 diverse search queries to cover different aspects",
node_type="llm_generate",
node_type="event_loop",
input_keys=["topic"],
output_keys=["search_queries", "research_focus", "key_aspects"],
output_schema={
"research_focus": {
"type": "string",
"required": True,
"description": "Brief statement of what we're researching",
},
"key_aspects": {
"type": "array",
"required": True,
"description": "List of 3-5 key aspects to investigate",
},
"search_queries": {
"type": "array",
"required": True,
"description": "List of 3-5 search queries",
},
},
system_prompt="""\
You are a research query strategist. Given a research topic, analyze it and generate search queries.
@@ -35,23 +18,12 @@ Your task:
2. Identify 3-5 key aspects to investigate
3. Generate 3-5 diverse search queries that will find comprehensive information
CRITICAL: Return ONLY raw JSON. NO markdown, NO code blocks.
Return this JSON structure:
{
"research_focus": "Brief statement of what we're researching",
"key_aspects": ["aspect1", "aspect2", "aspect3"],
"search_queries": [
"query 1 - broad overview",
"query 2 - specific angle",
"query 3 - recent developments",
"query 4 - expert opinions",
"query 5 - data/statistics"
]
}
Use set_output to store each result:
- set_output("research_focus", "Brief statement of what we're researching")
- set_output("key_aspects", ["aspect1", "aspect2", "aspect3"])
- set_output("search_queries", ["query 1", "query 2", "query 3", "query 4", "query 5"])
""",
tools=[],
max_retries=3,
)
# Node 2: Search Sources
@@ -59,21 +31,9 @@ search_sources_node = NodeSpec(
id="search-sources",
name="Search Sources",
description="Execute web searches using the generated queries to find 15+ source URLs",
node_type="llm_tool_use",
node_type="event_loop",
input_keys=["search_queries", "research_focus"],
output_keys=["source_urls", "search_results_summary"],
output_schema={
"source_urls": {
"type": "array",
"required": True,
"description": "List of source URLs found",
},
"search_results_summary": {
"type": "string",
"required": True,
"description": "Brief summary of what was found",
},
},
system_prompt="""\
You are a research assistant executing web searches. Use the web_search tool to find sources.
@@ -82,14 +42,11 @@ Your task:
2. Collect URLs from search results
3. Aim for 15+ diverse sources
After searching, return JSON with found sources:
{
"source_urls": ["url1", "url2", ...],
"search_results_summary": "Brief summary of what was found"
}
After searching, use set_output to store results:
- set_output("source_urls", ["url1", "url2", ...])
- set_output("search_results_summary", "Brief summary of what was found")
""",
tools=["web_search"],
max_retries=3,
)
# Node 3: Fetch Content
@@ -97,21 +54,9 @@ fetch_content_node = NodeSpec(
id="fetch-content",
name="Fetch Content",
description="Fetch and extract content from the discovered source URLs",
node_type="llm_tool_use",
node_type="event_loop",
input_keys=["source_urls", "research_focus"],
output_keys=["fetched_sources", "fetch_errors"],
output_schema={
"fetched_sources": {
"type": "array",
"required": True,
"description": "List of fetched source objects with url, title, content",
},
"fetch_errors": {
"type": "array",
"required": True,
"description": "List of URLs that failed to fetch",
},
},
system_prompt="""\
You are a content fetcher. Use web_scrape tool to retrieve content from URLs.
@@ -120,17 +65,11 @@ Your task:
2. Extract the main content relevant to the research focus
3. Track any URLs that failed to fetch
After fetching, return JSON:
{
"fetched_sources": [
{"url": "...", "title": "...", "content": "extracted text..."},
...
],
"fetch_errors": ["url that failed", ...]
}
After fetching, use set_output to store results:
- set_output("fetched_sources", [{"url": "...", "title": "...", "content": "..."}])
- set_output("fetch_errors", ["url that failed", ...])
""",
tools=["web_scrape"],
max_retries=3,
)
# Node 4: Evaluate Sources
@@ -138,21 +77,9 @@ evaluate_sources_node = NodeSpec(
id="evaluate-sources",
name="Evaluate Sources",
description="Score sources for relevance and quality, filter to top 10",
node_type="llm_generate",
node_type="event_loop",
input_keys=["fetched_sources", "research_focus", "key_aspects"],
output_keys=["ranked_sources", "source_analysis"],
output_schema={
"ranked_sources": {
"type": "array",
"required": True,
"description": "List of ranked sources with scores",
},
"source_analysis": {
"type": "string",
"required": True,
"description": "Overview of source quality and coverage",
},
},
system_prompt="""\
You are a source evaluator. Assess each source for quality and relevance.
@@ -168,17 +95,11 @@ Your task:
3. Select top 10 sources
4. Note what each source uniquely contributes
Return JSON:
{
"ranked_sources": [
{"url": "...", "title": "...", "content": "...", "score": 8.5, "unique_value": "..."},
...
],
"source_analysis": "Overview of source quality and coverage"
}
Use set_output to store results:
- set_output("ranked_sources", [{"url": "...", "title": "...", "score": 8.5}])
- set_output("source_analysis", "Overview of source quality and coverage")
""",
tools=[],
max_retries=3,
)
# Node 5: Synthesize Findings
@@ -186,26 +107,9 @@ synthesize_findings_node = NodeSpec(
id="synthesize-findings",
name="Synthesize Findings",
description="Extract key facts from sources and identify common themes",
node_type="llm_generate",
node_type="event_loop",
input_keys=["ranked_sources", "research_focus", "key_aspects"],
output_keys=["key_findings", "themes", "source_citations"],
output_schema={
"key_findings": {
"type": "array",
"required": True,
"description": "List of key findings with sources and confidence",
},
"themes": {
"type": "array",
"required": True,
"description": "List of themes with descriptions and supporting sources",
},
"source_citations": {
"type": "object",
"required": True,
"description": "Map of facts to supporting URLs",
},
},
system_prompt="""\
You are a research synthesizer. Analyze multiple sources to extract insights.
@@ -215,24 +119,12 @@ Your task:
3. Note contradictions or debates
4. Build a citation map (fact -> source URL)
Return JSON:
{
"key_findings": [
{"finding": "...", "sources": ["url1", "url2"], "confidence": "high/medium/low"},
...
],
"themes": [
{"theme": "...", "description": "...", "supporting_sources": ["url1", ...]},
...
],
"source_citations": {
"fact or claim": ["supporting url1", "url2"],
...
}
}
Use set_output to store each result:
- set_output("key_findings", [{"finding": "...", "sources": ["url1"], "confidence": "high"}])
- set_output("themes", [{"theme": "...", "description": "...", "supporting_sources": [...]}])
- set_output("source_citations", {"fact or claim": ["url1", "url2"]})
""",
tools=[],
max_retries=3,
)
# Node 6: Write Report
@@ -240,7 +132,7 @@ write_report_node = NodeSpec(
id="write-report",
name="Write Report",
description="Generate a narrative report with proper citations",
node_type="llm_generate",
node_type="event_loop",
input_keys=[
"key_findings",
"themes",
@@ -249,18 +141,6 @@ write_report_node = NodeSpec(
"ranked_sources",
],
output_keys=["report_content", "references"],
output_schema={
"report_content": {
"type": "string",
"required": True,
"description": "Full markdown report text with citations",
},
"references": {
"type": "array",
"required": True,
"description": "List of reference objects with number, url, title",
},
},
system_prompt="""\
You are a research report writer. Create a well-structured narrative report.
@@ -280,17 +160,11 @@ IMPORTANT:
- Be objective and balanced
- Highlight areas of consensus and debate
Return JSON:
{
"report_content": "Full markdown report text with citations...",
"references": [
{"number": 1, "url": "...", "title": "..."},
...
]
}
Use set_output to store results:
- set_output("report_content", "Full markdown report text with citations...")
- set_output("references", [{"number": 1, "url": "...", "title": "..."}])
""",
tools=[],
max_retries=3,
)
# Node 7: Quality Check
@@ -298,26 +172,9 @@ quality_check_node = NodeSpec(
id="quality-check",
name="Quality Check",
description="Verify all claims have citations and report is coherent",
node_type="llm_generate",
node_type="event_loop",
input_keys=["report_content", "references", "source_citations"],
output_keys=["quality_score", "issues", "final_report"],
output_schema={
"quality_score": {
"type": "number",
"required": True,
"description": "Quality score 0-1",
},
"issues": {
"type": "array",
"required": True,
"description": "List of issues found and fixed",
},
"final_report": {
"type": "string",
"required": True,
"description": "Corrected full report",
},
},
system_prompt="""\
You are a quality assurance reviewer. Check the research report for issues.
@@ -330,18 +187,12 @@ Check for:
If issues found, fix them in the final report.
Return JSON:
{
"quality_score": 0.95,
"issues": [
{"type": "uncited_claim", "location": "paragraph 3", "fixed": true},
...
],
"final_report": "Corrected full report with all issues fixed..."
}
Use set_output to store results:
- set_output("quality_score", 0.95)
- set_output("issues", [{"type": "uncited_claim", "location": "...", "fixed": true}])
- set_output("final_report", "Corrected full report with all issues fixed...")
""",
tools=[],
max_retries=3,
)
# Node 8: Save Report
@@ -349,21 +200,9 @@ save_report_node = NodeSpec(
id="save-report",
name="Save Report",
description="Write the final report to a local markdown file",
node_type="llm_tool_use",
node_type="event_loop",
input_keys=["final_report", "references", "research_focus"],
output_keys=["file_path", "save_status"],
output_schema={
"file_path": {
"type": "string",
"required": True,
"description": "Path where report was saved",
},
"save_status": {
"type": "string",
"required": True,
"description": "Status of save operation",
},
},
system_prompt="""\
You are a file manager. Save the research report to disk.
@@ -374,14 +213,11 @@ Your task:
Filename format: research_YYYY-MM-DD_topic-slug.md
Return JSON:
{
"file_path": "research_reports/research_2026-01-23_topic-name.md",
"save_status": "success"
}
Use set_output to store results:
- set_output("file_path", "research_reports/research_2026-01-23_topic-name.md")
- set_output("save_status", "success")
""",
tools=["write_to_file"],
max_retries=3,
)
__all__ = [
+157 -111
View File
@@ -1,10 +1,10 @@
---
name: building-agents-core
description: Core concepts for goal-driven agents - architecture, node types, tool discovery, and workflow overview. Use when starting agent development or need to understand agent fundamentals.
description: Core concepts for goal-driven agents - architecture, node types (event_loop, function), tool discovery, and workflow overview. Use when starting agent development or need to understand agent fundamentals.
license: Apache-2.0
metadata:
author: hive
version: "1.0"
version: "2.0"
type: foundational
part_of: building-agents
---
@@ -29,10 +29,10 @@ exports/my_agent/
**Key Principle: Agent is visible and editable during build**
- Files created immediately as components are approved
- User can watch files grow in their editor
- No session state - just direct file writes
- No "export" step - agent is ready when build completes
- Files created immediately as components are approved
- User can watch files grow in their editor
- No session state - just direct file writes
- No "export" step - agent is ready when build completes
## Core Concepts
@@ -73,62 +73,166 @@ Unit of work (written to nodes/__init__.py)
**Node Types:**
- `llm_generate` - Text generation, parsing
- `llm_tool_use` - Actions requiring tools
- `router` - Conditional branching
- `function` - Deterministic operations
- `event_loop`**Recommended for all LLM-powered work.** Multi-turn streaming loop with tool execution and judge-based evaluation. Works with or without tools.
- `function` — Deterministic Python operations. No LLM involved.
> **Legacy Note:** `llm_generate` and `llm_tool_use` still function but are deprecated. Use `event_loop` instead, which handles both cases in a single multi-turn streaming loop.
```python
search_node = NodeSpec(
id="search-web",
name="Search Web",
description="Search for information online",
node_type="llm_tool_use",
description="Search for information and extract results",
node_type="event_loop",
input_keys=["query"],
output_keys=["search_results"],
system_prompt="Search the web for: {query}",
system_prompt="Search the web for: {query}. Use the web_search tool to find results, then call set_output to store them.",
tools=["web_search"],
max_retries=3,
)
```
**NodeSpec Fields for Event Loop Nodes:**
| Field | Default | Description |
|-------|---------|-------------|
| `client_facing` | `False` | If True, streams output to user and blocks for input between turns |
| `nullable_output_keys` | `[]` | Output keys that may remain unset (for mutually exclusive outputs) |
| `max_node_visits` | `1` | Max times this node executes per run. Set >1 for feedback loop targets |
### Edge
Connection between nodes (written to agent.py)
**Edge Conditions:**
- `on_success` - Proceed if node succeeds
- `on_failure` - Handle errors
- `always` - Always proceed
- `conditional` - Based on expression
- `on_success` Proceed if node succeeds (most common)
- `on_failure` Handle errors
- `always` Always proceed
- `conditional` Based on expression evaluating node output
**Edge Priority:**
Priority controls evaluation order when multiple edges leave the same node. Higher priority edges are evaluated first. Use negative priority for feedback edges (edges that loop back to earlier nodes).
```python
# Forward edge (evaluated first)
EdgeSpec(
id="search-to-analyze",
source="search-web",
target="analyze-results",
condition=EdgeCondition.ON_SUCCESS,
id="review-to-campaign",
source="review",
target="campaign-builder",
condition=EdgeCondition.CONDITIONAL,
condition_expr="output.get('approved_contacts') is not None",
priority=1,
)
# Feedback edge (evaluated after forward edges)
EdgeSpec(
id="review-feedback",
source="review",
target="extractor",
condition=EdgeCondition.CONDITIONAL,
condition_expr="output.get('redo_extraction') is not None",
priority=-1,
)
```
### Pause/Resume
### Client-Facing Nodes
Multi-turn conversations
- **Pause nodes** - Stop execution, wait for user input
- **Resume entry points** - Continue from pause with user's response
For multi-turn conversations with the user, set `client_facing=True` on a node. The node will:
- Stream its LLM output directly to the end user
- Block for user input between conversational turns
- Resume when new input is injected via `inject_event()`
```python
# Example pause/resume configuration
pause_nodes = ["request-clarification"]
entry_points = {
"start": "analyze-request",
"request-clarification_resume": "process-clarification"
}
intake_node = NodeSpec(
id="intake",
name="Intake",
description="Gather requirements from the user",
node_type="event_loop",
client_facing=True,
input_keys=[],
output_keys=["repo_url", "project_url"],
system_prompt="You are the intake agent. Ask the user for the repo URL and project URL.",
)
```
> **Legacy Note:** The old `pause_nodes` / `entry_points` pattern still works but `client_facing=True` is preferred for new agents.
## Event Loop Architecture Concepts
### How EventLoopNode Works
An event loop node runs a multi-turn loop:
1. LLM receives system prompt + conversation history
2. LLM responds (text and/or tool calls)
3. Tool calls are executed, results added to conversation
4. Judge evaluates: ACCEPT (exit loop), RETRY (loop again), or ESCALATE
5. Repeat until judge ACCEPTs or max_iterations reached
### CRITICAL: EventLoopNode Runtime Requirements
EventLoopNodes are **not auto-created** by the graph executor. They must be explicitly instantiated and registered in a `node_registry` dict before execution.
**Required components:**
1. **`EventLoopNode` instances** — One per event_loop NodeSpec, registered in `node_registry`
2. **`Runtime` instance** — `GraphExecutor` calls `runtime.start_run()` internally. Passing `None` crashes the executor
3. **`GraphExecutor` (not `AgentRuntime`)** — `AgentRuntime`/`create_agent_runtime()` does NOT pass `node_registry` to the internal `GraphExecutor`, so all event_loop nodes fail with "not found in registry"
```python
from framework.graph.executor import GraphExecutor
from framework.graph.event_loop_node import EventLoopNode, LoopConfig
from framework.runtime.event_bus import EventBus
from framework.runtime.core import Runtime
# Build node_registry
event_bus = EventBus()
node_registry = {}
for node_spec in nodes:
if node_spec.node_type == "event_loop":
node_registry[node_spec.id] = EventLoopNode(
event_bus=event_bus,
config=LoopConfig(max_iterations=50, max_tool_calls_per_turn=15),
tool_executor=tool_executor,
)
# Create executor with Runtime and node_registry
runtime = Runtime(storage_path)
executor = GraphExecutor(
runtime=runtime,
llm=llm,
tools=tools,
tool_executor=tool_executor,
node_registry=node_registry,
)
```
### set_output
Nodes produce structured outputs by calling `set_output(key, value)` — a synthetic tool injected by the framework. When the LLM calls `set_output`, the value is stored in the output accumulator and made available to downstream nodes via shared memory.
### JudgeProtocol
The judge controls when a node's loop exits:
- **Implicit judge** (default, no judge configured): ACCEPTs when the LLM finishes with no tool calls and all required output keys are set
- **SchemaJudge**: Validates outputs against a Pydantic model
- **Custom judges**: Implement `evaluate(context) -> JudgeVerdict`
### LoopConfig
Controls loop behavior:
- `max_iterations` (default 50) — prevents infinite loops
- `max_tool_calls_per_turn` (default 10) — limits tool calls per LLM response
- `stall_detection_threshold` (default 3) — detects repeated identical responses
- `max_history_tokens` (default 32000) — triggers conversation compaction
### Fan-Out / Fan-In
Multiple ON_SUCCESS edges from the same source create parallel execution. All branches run concurrently via `asyncio.gather()`. Parallel event_loop nodes must have disjoint `output_keys`.
### max_node_visits
Controls how many times a node can execute in one graph run. Default is 1. Set higher for nodes that are targets of feedback edges (review-reject loops). Set 0 for unlimited (guarded by max_steps).
## Tool Discovery & Validation
**CRITICAL:** Before adding a node with tools, you MUST verify the tools exist.
@@ -157,29 +261,6 @@ mcp__agent-builder__list_mcp_tools()
mcp__agent-builder__list_mcp_tools(server_name="tools")
```
This returns available tools with their descriptions and parameters:
```json
{
"success": true,
"tools_by_server": {
"tools": [
{
"name": "web_search",
"description": "Search the web...",
"parameters": ["query"]
},
{
"name": "web_scrape",
"description": "Scrape a URL...",
"parameters": ["url"]
}
]
},
"total_tools": 14
}
```
### Step 3: Validate Before Adding Nodes
Before writing a node with `tools=[...]`:
@@ -193,27 +274,10 @@ Before writing a node with `tools=[...]`:
### Tool Validation Anti-Patterns
**Never assume a tool exists** - always call `list_mcp_tools()` first
**Never write a node with unverified tools** - validate before writing
**Never silently drop tools** - if a tool doesn't exist, inform the user
**Never guess tool names** - use exact names from discovery response
### Example Validation Flow
```python
# 1. User requests: "Add a node that searches the web"
# 2. Discover available tools
tools_response = mcp__agent-builder__list_mcp_tools()
# 3. Check if web_search exists
available = [t["name"] for tools in tools_response["tools_by_server"].values() for t in tools]
if "web_search" not in available:
# Inform user and ask how to proceed
print("'web_search' not available. Available tools:", available)
else:
# Proceed with node creation
# ...
```
- **Never assume a tool exists** - always call `list_mcp_tools()` first
- **Never write a node with unverified tools** - validate before writing
- **Never silently drop tools** - if a tool doesn't exist, inform the user
- **Never guess tool names** - use exact names from discovery response
## Workflow Overview: Incremental File Construction
@@ -221,42 +285,19 @@ else:
1. CREATE PACKAGE → mkdir + write skeletons
2. DEFINE GOAL → Write to agent.py + config.py
3. FOR EACH NODE:
- Propose design
- Propose design (event_loop for LLM work, function for deterministic)
- User approves
- Write to nodes/__init__.py IMMEDIATELY ← FILE WRITTEN
- (Optional) Validate with test_node ← MCP VALIDATION
- User can open file and see it
4. CONNECT EDGES → Update agent.py ← FILE WRITTEN
- (Optional) Validate with validate_graph ← MCP VALIDATION
5. FINALIZE → Write agent class to agent.py ← FILE WRITTEN
- Write to nodes/__init__.py IMMEDIATELY
- (Optional) Validate with test_node
4. CONNECT EDGES → Update agent.py
- Use priority for feedback edges (negative priority)
- (Optional) Validate with validate_graph
5. FINALIZE → Write agent class to agent.py
6. DONE - Agent ready at exports/my_agent/
```
**Files written immediately. MCP tools optional for validation/testing bookkeeping.**
### The Key Difference
**OLD (Bad):**
```
MCP add_node → Session State → MCP add_node → Session State → ...
MCP export_graph
Files appear
```
**NEW (Good):**
```
Write node to file → (Optional: MCP test_node) → Write node to file → ...
↓ ↓
File visible File visible
immediately immediately
```
**Bottom line:** Use Write/Edit for construction, MCP for validation if needed.
## When to Use This Skill
Use building-agents-core when:
@@ -285,12 +326,17 @@ mcp__agent-builder__test_node(
**validate_graph** - Check graph structure
```python
mcp__agent-builder__validate_graph()
# Returns: unreachable nodes, missing connections, etc.
# Returns: unreachable nodes, missing connections, event_loop validation, etc.
```
**create_session** - Track session state for bookkeeping
**configure_loop** - Set event loop parameters
```python
mcp__agent-builder__create_session(session_name="my-build")
mcp__agent-builder__configure_loop(
max_iterations=50,
max_tool_calls_per_turn=10,
stall_detection_threshold=3,
max_history_tokens=32000
)
```
**Key Point:** Files are written FIRST. MCP tools are for validation only.
@@ -298,6 +344,6 @@ mcp__agent-builder__create_session(session_name="my-build")
## Related Skills
- **building-agents-construction** - Step-by-step building process
- **building-agents-patterns** - Best practices and examples
- **building-agents-patterns** - Best practices: judges, feedback edges, fan-out, context management
- **agent-workflow** - Complete workflow orchestrator
- **testing-agent** - Test and validate completed agents
+215 -399
View File
@@ -1,10 +1,10 @@
---
name: building-agents-patterns
description: Best practices, patterns, and examples for building goal-driven agents. Includes pause/resume architecture, hybrid workflows, anti-patterns, and handoff to testing. Use when optimizing agent design.
description: Best practices, patterns, and examples for building goal-driven agents. Includes client-facing interaction, feedback edges, judge patterns, fan-out/fan-in, context management, and anti-patterns.
license: Apache-2.0
metadata:
author: hive
version: "1.0"
version: "2.0"
type: reference
part_of: building-agents
---
@@ -24,10 +24,10 @@ How to build a node using both direct file writes and optional MCP validation:
node_code = '''
search_node = NodeSpec(
id="search-web",
node_type="llm_tool_use",
node_type="event_loop",
input_keys=["query"],
output_keys=["search_results"],
system_prompt="Search the web for: {query}",
system_prompt="Search the web for: {query}. Use web_search, then call set_output to store results.",
tools=["web_search"],
)
'''
@@ -38,17 +38,12 @@ Edit(
new_string=node_code
)
print("✅ Added search_node to nodes/__init__.py")
print("📁 Open exports/research_agent/nodes/__init__.py to see it!")
# 2. OPTIONALLY VALIDATE WITH MCP (Secondary - bookkeeping)
validation = mcp__agent-builder__test_node(
node_id="search-web",
test_input='{"query": "python tutorials"}',
mock_llm_response='{"search_results": [...mock results...]}'
)
print(f"✓ Validation: {validation['success']}")
```
**User experience:**
@@ -57,401 +52,239 @@ print(f"✓ Validation: {validation['success']}")
- Gets validation feedback (from step 2)
- Can edit the file directly if needed
This combines visibility (files) with validation (MCP tools).
## Multi-Turn Interaction Patterns
## Pause/Resume Architecture
For agents needing multi-turn conversations with users, use `client_facing=True` on event_loop nodes.
For agents needing multi-turn conversations with user interaction:
### Client-Facing Nodes
### Basic Pause/Resume Flow
A client-facing node streams LLM output to the user and blocks for user input between conversational turns. This replaces the old pause/resume pattern.
```python
# Define pause nodes - execution stops at these nodes
pause_nodes = ["request-clarification", "await-approval"]
# Define entry points - where to resume from each pause
entry_points = {
"start": "analyze-request", # Initial entry
"request-clarification_resume": "process-clarification", # Resume from clarification
"await-approval_resume": "execute-action", # Resume from approval
}
```
### Example: Multi-Turn Research Agent
```python
# Nodes
nodes = [
NodeSpec(id="analyze-request", ...),
NodeSpec(id="request-clarification", ...), # PAUSE NODE
NodeSpec(id="process-clarification", ...),
NodeSpec(id="generate-results", ...),
NodeSpec(id="await-approval", ...), # PAUSE NODE
NodeSpec(id="execute-action", ...),
]
# Edges with resume flows
edges = [
EdgeSpec(
id="analyze-to-clarify",
source="analyze-request",
target="request-clarification",
condition=EdgeCondition.CONDITIONAL,
condition_expr="needs_clarification == true",
),
# When resumed, goes to process-clarification
EdgeSpec(
id="clarify-to-process",
source="request-clarification",
target="process-clarification",
condition=EdgeCondition.ALWAYS,
),
EdgeSpec(
id="results-to-approval",
source="generate-results",
target="await-approval",
condition=EdgeCondition.ALWAYS,
),
# When resumed, goes to execute-action
EdgeSpec(
id="approval-to-execute",
source="await-approval",
target="execute-action",
condition=EdgeCondition.ALWAYS,
),
]
# Configuration
pause_nodes = ["request-clarification", "await-approval"]
entry_points = {
"start": "analyze-request",
"request-clarification_resume": "process-clarification",
"await-approval_resume": "execute-action",
}
```
### Running Pause/Resume Agents
```python
# Initial run - will pause at first pause node
result1 = await agent.run(
context={"query": "research topic"},
session_state=None
# Client-facing node blocks for user input
intake_node = NodeSpec(
id="intake",
name="Intake",
description="Gather requirements from the user",
node_type="event_loop",
client_facing=True,
input_keys=[],
output_keys=["repo_url", "project_url"],
system_prompt="You are the intake agent. Ask the user for their repo URL and project URL. When you have both, call set_output for each.",
)
# Check if paused
if result1.paused_at:
print(f"Paused at: {result1.paused_at}")
# Resume with user input
result2 = await agent.run(
context={"user_response": "clarification details"},
session_state=result1.session_state # Pass previous state
)
# Internal node runs without user interaction
scanner_node = NodeSpec(
id="scanner",
name="Scanner",
description="Scan the repository",
node_type="event_loop",
input_keys=["repo_url"],
output_keys=["scan_results"],
system_prompt="Scan the repository at {repo_url}...",
tools=["scan_github_repo"],
)
```
**How it works:**
- Client-facing nodes stream LLM text to the user and block for input after each response
- User input is injected via `node.inject_event(text)`
- When the LLM calls `set_output` to produce structured outputs, the judge evaluates and ACCEPTs
- Internal nodes (non-client-facing) run their entire loop without blocking
### When to Use client_facing
| Scenario | client_facing | Why |
|----------|:---:|-----|
| Gathering user requirements | Yes | Need user input |
| Human review/approval checkpoint | Yes | Need human decision |
| Data processing (scanning, scoring) | No | Runs autonomously |
| Report generation | No | No user input needed |
| Final confirmation before action | Yes | Need explicit approval |
> **Legacy Note:** The `pause_nodes` / `entry_points` pattern still works for backward compatibility but `client_facing=True` is preferred for new agents.
## Edge-Based Routing and Feedback Loops
### Conditional Edge Routing
Multiple conditional edges from the same source replace the old `router` node type. Each edge checks a condition on the node's output.
```python
# Node with mutually exclusive outputs
review_node = NodeSpec(
id="review",
name="Review",
node_type="event_loop",
client_facing=True,
output_keys=["approved_contacts", "redo_extraction"],
nullable_output_keys=["approved_contacts", "redo_extraction"],
max_node_visits=3,
system_prompt="Present the contact list to the operator. If they approve, call set_output('approved_contacts', ...). If they want changes, call set_output('redo_extraction', 'true').",
)
# Forward edge (positive priority, evaluated first)
EdgeSpec(
id="review-to-campaign",
source="review",
target="campaign-builder",
condition=EdgeCondition.CONDITIONAL,
condition_expr="output.get('approved_contacts') is not None",
priority=1,
)
# Feedback edge (negative priority, evaluated after forward edges)
EdgeSpec(
id="review-feedback",
source="review",
target="extractor",
condition=EdgeCondition.CONDITIONAL,
condition_expr="output.get('redo_extraction') is not None",
priority=-1,
)
```
**Key concepts:**
- `nullable_output_keys`: Lists output keys that may remain unset. The node sets exactly one of the mutually exclusive keys per execution.
- `max_node_visits`: Must be >1 on the feedback target (extractor) so it can re-execute. Default is 1.
- `priority`: Positive = forward edge (evaluated first). Negative = feedback edge. The executor tries forward edges first; if none match, falls back to feedback edges.
### Routing Decision Table
| Pattern | Old Approach | New Approach |
|---------|-------------|--------------|
| Conditional branching | `router` node | Conditional edges with `condition_expr` |
| Binary approve/reject | `pause_nodes` + resume | `client_facing=True` + `nullable_output_keys` |
| Loop-back on rejection | Manual entry_points | Feedback edge with `priority=-1` |
| Multi-way routing | Router with routes dict | Multiple conditional edges with priorities |
## Judge Patterns
Judges control when an event_loop node's loop exits. Choose based on validation needs.
### Implicit Judge (Default)
When no judge is configured, the implicit judge ACCEPTs when:
- The LLM finishes its response with no tool calls
- All required output keys have been set via `set_output`
Best for simple nodes where "all outputs set" is sufficient validation.
### SchemaJudge
Validates outputs against a Pydantic model. Use when you need structural validation.
```python
from pydantic import BaseModel
class ScannerOutput(BaseModel):
github_users: list[dict] # Must be a list of user objects
class SchemaJudge:
def __init__(self, output_model: type[BaseModel]):
self._model = output_model
async def evaluate(self, context: dict) -> JudgeVerdict:
missing = context.get("missing_keys", [])
if missing:
return JudgeVerdict(
action="RETRY",
feedback=f"Missing output keys: {missing}. Use set_output to provide them.",
)
try:
self._model.model_validate(context["output_accumulator"])
return JudgeVerdict(action="ACCEPT")
except ValidationError as e:
return JudgeVerdict(action="RETRY", feedback=str(e))
```
### When to Use Which Judge
| Judge | Use When | Example |
|-------|----------|---------|
| Implicit (None) | Output keys are sufficient validation | Simple data extraction |
| SchemaJudge | Need structural validation of outputs | API response parsing |
| Custom | Domain-specific validation logic | Score must be 0.0-1.0 |
## Fan-Out / Fan-In (Parallel Execution)
Multiple ON_SUCCESS edges from the same source trigger parallel execution. All branches run concurrently via `asyncio.gather()`.
```python
# Scanner fans out to Profiler and Scorer in parallel
EdgeSpec(id="scanner-to-profiler", source="scanner", target="profiler",
condition=EdgeCondition.ON_SUCCESS)
EdgeSpec(id="scanner-to-scorer", source="scanner", target="scorer",
condition=EdgeCondition.ON_SUCCESS)
# Both fan in to Extractor
EdgeSpec(id="profiler-to-extractor", source="profiler", target="extractor",
condition=EdgeCondition.ON_SUCCESS)
EdgeSpec(id="scorer-to-extractor", source="scorer", target="extractor",
condition=EdgeCondition.ON_SUCCESS)
```
**Requirements:**
- Parallel event_loop nodes must have **disjoint output_keys** (no key written by both)
- Only one parallel branch may contain a `client_facing` node
- Fan-in node receives outputs from all completed branches in shared memory
## Context Management Patterns
### Tiered Compaction
EventLoopNode automatically manages context window usage with tiered compaction:
1. **Pruning** — Old tool results replaced with compact placeholders (zero-cost, no LLM call)
2. **Normal compaction** — LLM summarizes older messages
3. **Aggressive compaction** — Keeps only recent messages + summary
4. **Emergency** — Hard reset with tool history preservation
### Spillover Pattern
For large tool results, use `save_data()` to write to disk and pass the filename through `set_output`. This keeps the LLM context window small.
```
LLM calls save_data(filename, large_data) → file written to spillover/
LLM calls set_output("results_file", filename) → filename stored in output
Downstream node calls load_data(filename) → reads from spillover/
```
The `load_data()` tool supports `offset` and `limit` parameters for paginated reading of large files.
## Anti-Patterns
### What NOT to Do
**Don't rely on `export_graph`** - Write files immediately, not at end
```python
# BAD: Building in session state, exporting at end
mcp__agent-builder__add_node(...)
mcp__agent-builder__add_node(...)
mcp__agent-builder__export_graph() # Files appear only now
# GOOD: Writing files immediately
Write(file_path="...", content=node_code) # File visible now
Write(file_path="...", content=node_code) # File visible now
```
**Don't hide code in session** - Write to files as components approved
```python
# BAD: Accumulating changes invisibly
session.add_component(component1)
session.add_component(component2)
# User can't see anything yet
# GOOD: Incremental visibility
Edit(file_path="...", ...) # User sees change 1
Edit(file_path="...", ...) # User sees change 2
```
**Don't wait to write files** - Agent visible from first step
```python
# BAD: Building everything before writing
design_all_nodes()
design_all_edges()
write_everything_at_once()
# GOOD: Write as you go
write_package_structure() # Visible
write_goal() # Visible
write_node_1() # Visible
write_node_2() # Visible
```
**Don't batch everything** - Write incrementally
```python
# BAD: Batching all nodes
nodes = [design_node_1(), design_node_2(), ...]
write_all_nodes(nodes)
# GOOD: One at a time with user feedback
write_node_1() # User approves
write_node_2() # User approves
write_node_3() # User approves
```
- **Don't rely on `export_graph`** — Write files immediately, not at end
- **Don't hide code in session** — Write to files as components are approved
- **Don't wait to write files** — Agent visible from first step
- **Don't batch everything** — Write incrementally, one component at a time
### MCP Tools - Correct Usage
**MCP tools OK for:**
`test_node` - Validate node configuration with mock inputs
`validate_graph` - Check graph structure
`create_session` - Track session state for bookkeeping
✅ Other validation tools
- `test_node` Validate node configuration with mock inputs
- `validate_graph` Check graph structure
- `configure_loop` — Set event loop parameters
- `create_session` — Track session state for bookkeeping
**Just don't:** Use MCP as the primary construction method or rely on export_graph
## Best Practices
### 1. Show Progress After Each Write
```python
# After writing a node
print("✅ Added analyze_request_node to nodes/__init__.py")
print("📊 Progress: 1/6 nodes added")
print("📁 Open exports/my_agent/nodes/__init__.py to see it!")
```
### 2. Let User Open Files During Build
```python
# Encourage file inspection
print("✅ Goal written to agent.py")
print("")
print("💡 Tip: Open exports/my_agent/agent.py in your editor to see the goal!")
```
### 3. Write Incrementally - One Component at a Time
```python
# Good flow
write_package_structure()
show_user("Package created")
write_goal()
show_user("Goal written")
for node in nodes:
get_approval(node)
write_node(node)
show_user(f"Node {node.id} written")
```
### 4. Test As You Build
```python
# After adding several nodes
print("💡 You can test current state with:")
print(" PYTHONPATH=core:exports python -m my_agent validate")
print(" PYTHONPATH=core:exports python -m my_agent info")
```
### 5. Keep User Informed
```python
# Clear status updates
print("🔨 Creating package structure...")
print("✅ Package created: exports/my_agent/")
print("")
print("📝 Next: Define agent goal")
```
## Continuous Monitoring Agents
For agents that run continuously without terminal nodes:
```python
# No terminal nodes - loops forever
terminal_nodes = []
# Workflow loops back to start
edges = [
EdgeSpec(id="monitor-to-check", source="monitor", target="check-condition"),
EdgeSpec(id="check-to-wait", source="check-condition", target="wait"),
EdgeSpec(id="wait-to-monitor", source="wait", target="monitor"), # Loop
]
# Entry node only
entry_node = "monitor"
entry_points = {"start": "monitor"}
pause_nodes = []
```
**Example: File Monitor**
```python
nodes = [
NodeSpec(id="list-files", ...),
NodeSpec(id="check-new-files", node_type="router", ...),
NodeSpec(id="process-files", ...),
NodeSpec(id="wait-interval", node_type="function", ...),
]
edges = [
EdgeSpec(id="list-to-check", source="list-files", target="check-new-files"),
EdgeSpec(
id="check-to-process",
source="check-new-files",
target="process-files",
condition=EdgeCondition.CONDITIONAL,
condition_expr="new_files_count > 0",
),
EdgeSpec(
id="check-to-wait",
source="check-new-files",
target="wait-interval",
condition=EdgeCondition.CONDITIONAL,
condition_expr="new_files_count == 0",
),
EdgeSpec(id="process-to-wait", source="process-files", target="wait-interval"),
EdgeSpec(id="wait-to-list", source="wait-interval", target="list-files"), # Loop back
]
terminal_nodes = [] # No terminal - runs forever
```
## Complex Routing Patterns
### Multi-Condition Router
```python
router_node = NodeSpec(
id="decision-router",
node_type="router",
input_keys=["analysis_result"],
output_keys=["decision"],
system_prompt="""
Based on the analysis result, decide the next action:
- If confidence > 0.9: route to "execute"
- If 0.5 <= confidence <= 0.9: route to "review"
- If confidence < 0.5: route to "clarify"
Return: {"decision": "execute|review|clarify"}
""",
)
# Edges for each route
edges = [
EdgeSpec(
id="router-to-execute",
source="decision-router",
target="execute-action",
condition=EdgeCondition.CONDITIONAL,
condition_expr="decision == 'execute'",
priority=1,
),
EdgeSpec(
id="router-to-review",
source="decision-router",
target="human-review",
condition=EdgeCondition.CONDITIONAL,
condition_expr="decision == 'review'",
priority=2,
),
EdgeSpec(
id="router-to-clarify",
source="decision-router",
target="request-clarification",
condition=EdgeCondition.CONDITIONAL,
condition_expr="decision == 'clarify'",
priority=3,
),
]
```
## Error Handling Patterns
### Graceful Failure with Fallback
```python
# Primary node with error handling
nodes = [
NodeSpec(id="api-call", max_retries=3, ...),
NodeSpec(id="fallback-cache", ...),
NodeSpec(id="report-error", ...),
]
edges = [
# Success path
EdgeSpec(
id="api-success",
source="api-call",
target="process-results",
condition=EdgeCondition.ON_SUCCESS,
),
EdgeSpec(id="api-success", source="api-call", target="process-results",
condition=EdgeCondition.ON_SUCCESS),
# Fallback on failure
EdgeSpec(
id="api-to-fallback",
source="api-call",
target="fallback-cache",
condition=EdgeCondition.ON_FAILURE,
priority=1,
),
EdgeSpec(id="api-to-fallback", source="api-call", target="fallback-cache",
condition=EdgeCondition.ON_FAILURE, priority=1),
# Report if fallback also fails
EdgeSpec(
id="fallback-to-error",
source="fallback-cache",
target="report-error",
condition=EdgeCondition.ON_FAILURE,
priority=1,
),
]
```
## Performance Optimization
### Parallel Node Execution
```python
# Use multiple edges from same source for parallel execution
edges = [
EdgeSpec(
id="start-to-search1",
source="start",
target="search-source-1",
condition=EdgeCondition.ALWAYS,
),
EdgeSpec(
id="start-to-search2",
source="start",
target="search-source-2",
condition=EdgeCondition.ALWAYS,
),
EdgeSpec(
id="start-to-search3",
source="start",
target="search-source-3",
condition=EdgeCondition.ALWAYS,
),
# Converge results
EdgeSpec(
id="search1-to-merge",
source="search-source-1",
target="merge-results",
),
EdgeSpec(
id="search2-to-merge",
source="search-source-2",
target="merge-results",
),
EdgeSpec(
id="search3-to-merge",
source="search-source-3",
target="merge-results",
),
EdgeSpec(id="fallback-to-error", source="fallback-cache", target="report-error",
condition=EdgeCondition.ON_FAILURE, priority=1),
]
```
@@ -459,38 +292,21 @@ edges = [
When agent is complete, transition to testing phase:
```python
print("""
✅ Agent complete: exports/my_agent/
Next steps:
1. Switch to testing-agent skill
2. Generate and approve tests
3. Run evaluation
4. Debug any failures
Command: "Test the agent at exports/my_agent/"
""")
```
### Pre-Testing Checklist
Before handing off to testing-agent:
- [ ] Agent structure validates: `python -m agent_name validate`
- [ ] All nodes defined in nodes/__init__.py
- [ ] All edges connect valid nodes
- [ ] Entry node specified
- [ ] All edges connect valid nodes with correct priorities
- [ ] Feedback edge targets have `max_node_visits > 1`
- [ ] Client-facing nodes have meaningful system prompts
- [ ] Agent can be imported: `from exports.agent_name import default_agent`
- [ ] README.md with usage instructions
- [ ] CLI commands work (info, validate)
## Related Skills
- **building-agents-core** - Fundamental concepts
- **building-agents-construction** - Step-by-step building
- **testing-agent** - Test and validate agents
- **agent-workflow** - Complete workflow orchestrator
- **building-agents-core** Fundamental concepts (node types, edges, event loop architecture)
- **building-agents-construction** Step-by-step building process
- **testing-agent** Test and validate agents
- **agent-workflow** Complete workflow orchestrator
---
+53 -1
View File
@@ -930,9 +930,10 @@ assert approval == "APPROVED", f"Expected APPROVED, got {approval}"
- `steps_executed: int` - Number of nodes executed
- `total_tokens: int` - Cumulative token usage
- `total_latency_ms: int` - Total execution time
- `path: list[str]` - Node IDs traversed
- `path: list[str]` - Node IDs traversed (may contain repeated IDs from feedback loops)
- `paused_at: str | None` - Node ID if HITL pause occurred
- `session_state: dict` - State for resuming
- `node_visit_counts: dict[str, int]` - How many times each node executed (useful for feedback loop testing)
### Happy Path Test
```python
@@ -975,6 +976,57 @@ async def test_performance_latency(mock_mode):
assert duration < 5.0, f"Took {{duration}}s, expected <5s"
```
### Testing Event Loop Nodes
Event loop nodes run multi-turn loops internally. Tests should verify:
**Output Keys Test** — All required keys are set via `set_output`:
```python
@pytest.mark.asyncio
async def test_all_output_keys_set(mock_mode):
"""Test that event_loop nodes set all required output keys."""
result = await default_agent.run({{"query": "test"}}, mock_mode=mock_mode)
assert result.success, f"Agent failed: {{result.error}}"
output = result.output or {{}}
for key in ["expected_key_1", "expected_key_2"]:
assert key in output, f"Output key '{{key}}' not set by event_loop node"
```
**Feedback Loop Test** — Verify feedback loops terminate:
```python
@pytest.mark.asyncio
async def test_feedback_loop_respects_max_visits(mock_mode):
"""Test that feedback loops terminate at max_node_visits."""
result = await default_agent.run({{"input": "trigger_rejection"}}, mock_mode=mock_mode)
assert result.success or result.error is not None
visits = getattr(result, "node_visit_counts", {{}}) or {{}}
for node_id, count in visits.items():
assert count <= 5, f"Node {{node_id}} visited {{count}} times"
```
**Fan-Out Test** — Verify parallel branches both complete:
```python
@pytest.mark.asyncio
async def test_parallel_branches_complete(mock_mode):
"""Test that fan-out branches all complete and produce outputs."""
result = await default_agent.run({{"query": "test"}}, mock_mode=mock_mode)
assert result.success
output = result.output or {{}}
# Check outputs from both parallel branches
assert "branch_a_output" in output, "Branch A output missing"
assert "branch_b_output" in output, "Branch B output missing"
```
**Client-Facing Node Test** — In mock mode, client-facing nodes may not block:
```python
@pytest.mark.asyncio
async def test_client_facing_node(mock_mode):
"""Test that client-facing nodes produce output."""
result = await default_agent.run({{"query": "test"}}, mock_mode=mock_mode)
# In mock mode, client-facing blocking is typically bypassed
assert result.success or result.paused_at is not None
```
## Integration with building-agents
### Handoff Points
+1 -1
View File
@@ -1,6 +1,6 @@
repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.8.6
rev: v0.15.0
hooks:
- id: ruff
name: ruff lint (core)
+12 -1
View File
@@ -640,7 +640,18 @@ class EventLoopNode(NodeProtocol):
)
# Async write-through for set_output
if not result.is_error:
await accumulator.set(tc.tool_input["key"], tc.tool_input["value"])
value = tc.tool_input["value"]
# Parse JSON strings into native types so downstream
# consumers get lists/dicts instead of serialised JSON,
# and the hallucination validator skips non-string values.
if isinstance(value, str):
try:
parsed = json.loads(value)
if isinstance(parsed, (list, dict)):
value = parsed
except (json.JSONDecodeError, TypeError):
pass
await accumulator.set(tc.tool_input["key"], value)
else:
# Execute real tool
result = await self._execute_tool(tc)
+6 -5
View File
@@ -415,13 +415,14 @@ class GraphExecutor:
max_retries = getattr(node_spec, "max_retries", 3)
# Event loop nodes handle retry internally via judge —
# executor retry is catastrophic (retry multiplication)
if node_spec.node_type == "event_loop" and max_retries > 0:
self.logger.warning(
# but allow 1 executor retry for post-execution failures
# (e.g. output validation) that the judge can't catch.
if node_spec.node_type == "event_loop" and max_retries > 1:
self.logger.info(
f"EventLoopNode '{node_spec.id}' has max_retries={max_retries}. "
"Overriding to 0 — event loop nodes handle retry internally via judge."
"Capping to 1 — event loop nodes handle retry internally via judge."
)
max_retries = 0
max_retries = 1
if node_retry_counts[current_node_id] < max_retries:
# Retry - don't increment steps for retries
+1 -1
View File
@@ -207,7 +207,7 @@ class OutputValidator:
def validate_no_hallucination(
self,
output: dict[str, Any],
max_length: int = 10000,
max_length: int = 50000,
) -> ValidationResult:
"""
Check for signs of LLM hallucination in output values.
+305 -11
View File
@@ -44,6 +44,7 @@ class BuildSession:
self.nodes: list[NodeSpec] = []
self.edges: list[EdgeSpec] = []
self.mcp_servers: list[dict] = [] # MCP server configurations
self.loop_config: dict = {} # LoopConfig parameters for EventLoopNodes
self.created_at = datetime.now().isoformat()
self.last_modified = datetime.now().isoformat()
@@ -56,6 +57,7 @@ class BuildSession:
"nodes": [n.model_dump() for n in self.nodes],
"edges": [e.model_dump() for e in self.edges],
"mcp_servers": self.mcp_servers,
"loop_config": self.loop_config,
"created_at": self.created_at,
"last_modified": self.last_modified,
}
@@ -102,6 +104,9 @@ class BuildSession:
# Restore MCP servers
session.mcp_servers = data.get("mcp_servers", [])
# Restore loop config
session.loop_config = data.get("loop_config", {})
return session
@@ -551,14 +556,28 @@ def add_node(
node_id: Annotated[str, "Unique identifier for the node"],
name: Annotated[str, "Human-readable name"],
description: Annotated[str, "What this node does"],
node_type: Annotated[str, "Type: llm_generate, llm_tool_use, router, or function"],
node_type: Annotated[
str,
"Type: event_loop (recommended), function, router. "
"Deprecated: llm_generate, llm_tool_use (use event_loop instead)",
],
input_keys: Annotated[str, "JSON array of keys this node reads from shared memory"],
output_keys: Annotated[str, "JSON array of keys this node writes to shared memory"],
system_prompt: Annotated[str, "Instructions for LLM nodes"] = "",
tools: Annotated[str, "JSON array of tool names for llm_tool_use nodes"] = "[]",
tools: Annotated[str, "JSON array of tool names for event_loop or llm_tool_use nodes"] = "[]",
routes: Annotated[
str, "JSON object mapping conditions to target node IDs for router nodes"
] = "{}",
client_facing: Annotated[
bool, "If True, node streams output to user and blocks for input between turns"
] = False,
nullable_output_keys: Annotated[
str, "JSON array of output keys that may remain unset (for mutually exclusive outputs)"
] = "[]",
max_node_visits: Annotated[
int,
"Max times this node executes per graph run. Set >1 for feedback loop targets. 0=unlimited",
] = 1,
) -> str:
"""Add a node to the agent graph. Nodes process inputs and produce outputs."""
session = get_session()
@@ -569,6 +588,7 @@ def add_node(
output_keys_list = json.loads(output_keys)
tools_list = json.loads(tools)
routes_dict = json.loads(routes)
nullable_output_keys_list = json.loads(nullable_output_keys)
except json.JSONDecodeError as e:
return json.dumps(
{
@@ -597,6 +617,9 @@ def add_node(
system_prompt=system_prompt or None,
tools=tools_list,
routes=routes_dict,
client_facing=client_facing,
nullable_output_keys=nullable_output_keys_list,
max_node_visits=max_node_visits,
)
session.nodes.append(node)
@@ -616,6 +639,26 @@ def add_node(
if node_type in ("llm_generate", "llm_tool_use") and not system_prompt:
warnings.append(f"LLM node '{node_id}' should have a system_prompt")
# EventLoopNode validation
if node_type == "event_loop" and not system_prompt:
warnings.append(f"Event loop node '{node_id}' should have a system_prompt")
# Deprecated type warnings
if node_type in ("llm_generate", "llm_tool_use"):
warnings.append(
f"Node type '{node_type}' is deprecated. Use 'event_loop' instead. "
"EventLoopNode supports tool use, streaming, and judge-based evaluation."
)
# nullable_output_keys must be a subset of output_keys
if nullable_output_keys_list:
invalid_nullable = [k for k in nullable_output_keys_list if k not in output_keys_list]
if invalid_nullable:
errors.append(
f"nullable_output_keys {invalid_nullable} must be a subset of "
f"output_keys {output_keys_list}"
)
_save_session(session) # Auto-save
return json.dumps(
@@ -692,6 +735,7 @@ def add_edge(
# Validate
errors = []
warnings = []
if not any(n.id == source for n in session.nodes):
errors.append(f"Source node '{source}' not found")
@@ -700,12 +744,24 @@ def add_edge(
if edge_condition == EdgeCondition.CONDITIONAL and not condition_expr:
errors.append(f"Conditional edge '{edge_id}' needs condition_expr")
# Feedback edge validation
if priority < 0:
target_node = next((n for n in session.nodes if n.id == target), None)
if target_node and target_node.max_node_visits <= 1:
warnings.append(
f"Edge '{edge_id}' has negative priority (feedback edge) "
f"targeting '{target}', but node '{target}' has "
f"max_node_visits={target_node.max_node_visits}. "
"Consider increasing max_node_visits on the target node."
)
_save_session(session) # Auto-save
return json.dumps(
{
"valid": len(errors) == 0,
"errors": errors,
"warnings": warnings,
"edge": edge.model_dump(),
"total_edges": len(session.edges),
"approval_required": True,
@@ -739,12 +795,23 @@ def update_node(
node_id: Annotated[str, "ID of the node to update"],
name: Annotated[str, "Updated human-readable name"] = "",
description: Annotated[str, "Updated description"] = "",
node_type: Annotated[str, "Updated type: llm_generate, llm_tool_use, router, or function"] = "",
node_type: Annotated[
str,
"Updated type: event_loop (recommended), function, router. "
"Deprecated: llm_generate, llm_tool_use",
] = "",
input_keys: Annotated[str, "Updated JSON array of input keys"] = "",
output_keys: Annotated[str, "Updated JSON array of output keys"] = "",
system_prompt: Annotated[str, "Updated instructions for LLM nodes"] = "",
tools: Annotated[str, "Updated JSON array of tool names"] = "",
routes: Annotated[str, "Updated JSON object mapping conditions to target node IDs"] = "",
client_facing: Annotated[
str, "Updated client-facing flag ('true'/'false', empty=no change)"
] = "",
nullable_output_keys: Annotated[
str, "Updated JSON array of nullable output keys (empty=no change)"
] = "",
max_node_visits: Annotated[int, "Updated max node visits per graph run. 0=no change"] = 0,
) -> str:
"""Update an existing node in the agent graph. Only provided fields will be updated."""
session = get_session()
@@ -765,6 +832,9 @@ def update_node(
output_keys_list = json.loads(output_keys) if output_keys else None
tools_list = json.loads(tools) if tools else None
routes_dict = json.loads(routes) if routes else None
nullable_output_keys_list = (
json.loads(nullable_output_keys) if nullable_output_keys else None
)
except json.JSONDecodeError as e:
return json.dumps(
{
@@ -797,6 +867,12 @@ def update_node(
node.tools = tools_list
if routes_dict is not None:
node.routes = routes_dict
if client_facing:
node.client_facing = client_facing.lower() == "true"
if nullable_output_keys_list is not None:
node.nullable_output_keys = nullable_output_keys_list
if max_node_visits > 0:
node.max_node_visits = max_node_visits
# Validate
errors = []
@@ -809,6 +885,26 @@ def update_node(
if node.node_type in ("llm_generate", "llm_tool_use") and not node.system_prompt:
warnings.append(f"LLM node '{node_id}' should have a system_prompt")
# EventLoopNode validation
if node.node_type == "event_loop" and not node.system_prompt:
warnings.append(f"Event loop node '{node_id}' should have a system_prompt")
# Deprecated type warnings
if node.node_type in ("llm_generate", "llm_tool_use"):
warnings.append(
f"Node type '{node.node_type}' is deprecated. Use 'event_loop' instead. "
"EventLoopNode supports tool use, streaming, and judge-based evaluation."
)
# nullable_output_keys must be a subset of output_keys
if node.nullable_output_keys:
invalid_nullable = [k for k in node.nullable_output_keys if k not in node.output_keys]
if invalid_nullable:
errors.append(
f"nullable_output_keys {invalid_nullable} must be a subset of "
f"output_keys {node.output_keys}"
)
_save_session(session) # Auto-save
return json.dumps(
@@ -1147,6 +1243,87 @@ def validate_graph() -> str:
errors.extend(context_errors)
warnings.extend(context_warnings)
# === EventLoopNode-specific validation ===
from collections import defaultdict
# Detect fan-out: multiple ON_SUCCESS edges from same source
outgoing_success: dict[str, list[str]] = defaultdict(list)
for edge in session.edges:
cond = edge.condition.value if hasattr(edge.condition, "value") else edge.condition
if cond == "on_success":
outgoing_success[edge.source].append(edge.target)
for source_id, targets in outgoing_success.items():
if len(targets) > 1:
# Client-facing fan-out: cannot target multiple client_facing nodes
cf_targets = [
t for t in targets if any(n.id == t and n.client_facing for n in session.nodes)
]
if len(cf_targets) > 1:
errors.append(
f"Fan-out from '{source_id}' targets multiple client_facing "
f"nodes: {cf_targets}. Only one branch may be client-facing."
)
# Output key overlap on parallel event_loop nodes
el_targets = [
t
for t in targets
if any(n.id == t and n.node_type == "event_loop" for n in session.nodes)
]
if len(el_targets) > 1:
seen_keys: dict[str, str] = {}
for nid in el_targets:
node_obj = next((n for n in session.nodes if n.id == nid), None)
if node_obj:
for key in node_obj.output_keys:
if key in seen_keys:
errors.append(
f"Fan-out from '{source_id}': event_loop "
f"nodes '{seen_keys[key]}' and '{nid}' both "
f"write to output_key '{key}'. Parallel "
"nodes must have disjoint output_keys."
)
else:
seen_keys[key] = nid
# Feedback loop validation: targets should allow re-visits
for edge in session.edges:
if edge.priority < 0:
target_node = next((n for n in session.nodes if n.id == edge.target), None)
if target_node and target_node.max_node_visits <= 1:
warnings.append(
f"Feedback edge '{edge.id}' targets '{edge.target}' "
f"which has max_node_visits={target_node.max_node_visits}. "
"Consider setting max_node_visits > 1."
)
# nullable_output_keys must be subset of output_keys
for node in session.nodes:
if node.nullable_output_keys:
invalid = [k for k in node.nullable_output_keys if k not in node.output_keys]
if invalid:
errors.append(
f"Node '{node.id}': nullable_output_keys {invalid} "
f"must be a subset of output_keys {node.output_keys}"
)
# Deprecated node type warnings
deprecated_nodes = [
{"node_id": n.id, "type": n.node_type, "replacement": "event_loop"}
for n in session.nodes
if n.node_type in ("llm_generate", "llm_tool_use")
]
for dn in deprecated_nodes:
warnings.append(
f"Node '{dn['node_id']}' uses deprecated type '{dn['type']}'. Use 'event_loop' instead."
)
# Collect summary info
event_loop_nodes = [n.id for n in session.nodes if n.node_type == "event_loop"]
client_facing_nodes = [n.id for n in session.nodes if n.client_facing]
feedback_edges = [e.id for e in session.edges if e.priority < 0]
return json.dumps(
{
"valid": len(errors) == 0,
@@ -1163,6 +1340,10 @@ def validate_graph() -> str:
"context_flow": {node_id: list(keys) for node_id, keys in available_context.items()}
if available_context
else None,
"event_loop_nodes": event_loop_nodes,
"client_facing_nodes": client_facing_nodes,
"feedback_edges": feedback_edges,
"deprecated_node_types": deprecated_nodes,
}
)
@@ -1213,6 +1394,12 @@ def _generate_readme(session: BuildSession, export_data: dict, all_tools: set) -
if node.routes:
routes_str = ", ".join([f"{k}{v}" for k, v in node.routes.items()])
node_info.append(f" - Routes: {routes_str}")
if node.client_facing:
node_info.append(" - Client-facing: Yes (blocks for user input)")
if node.nullable_output_keys:
node_info.append(f" - Nullable outputs: `{', '.join(node.nullable_output_keys)}`")
if node.max_node_visits > 1:
node_info.append(f" - Max visits: {node.max_node_visits}")
nodes_section.append("\n".join(node_info))
# Build success criteria section
@@ -1266,7 +1453,12 @@ def _generate_readme(session: BuildSession, export_data: dict, all_tools: set) -
for edge in edges:
cond = edge.condition.value if hasattr(edge.condition, "value") else edge.condition
readme += f"- `{edge.source}` → `{edge.target}` (condition: {cond})\n"
priority_note = f", priority={edge.priority}" if edge.priority != 0 else ""
feedback_note = " **[FEEDBACK]**" if edge.priority < 0 else ""
readme += (
f"- `{edge.source}` → `{edge.target}` "
f"(condition: {cond}{priority_note}){feedback_note}\n"
)
readme += f"""
@@ -1481,6 +1673,10 @@ def export_graph() -> str:
"created_at": datetime.now().isoformat(),
}
# Include loop config if configured
if session.loop_config:
graph_spec["loop_config"] = session.loop_config
# Collect all tools referenced by nodes
all_tools = set()
for node in session.nodes:
@@ -1596,6 +1792,50 @@ def get_session_status() -> str:
"nodes": [n.id for n in session.nodes],
"edges": [(e.source, e.target) for e in session.edges],
"mcp_servers": [s["name"] for s in session.mcp_servers],
"event_loop_nodes": [n.id for n in session.nodes if n.node_type == "event_loop"],
"client_facing_nodes": [n.id for n in session.nodes if n.client_facing],
"deprecated_nodes": [
n.id for n in session.nodes if n.node_type in ("llm_generate", "llm_tool_use")
],
"feedback_edges": [e.id for e in session.edges if e.priority < 0],
}
)
@mcp.tool()
def configure_loop(
max_iterations: Annotated[int, "Maximum loop iterations per node execution (default 50)"] = 50,
max_tool_calls_per_turn: Annotated[int, "Maximum tool calls per LLM turn (default 10)"] = 10,
stall_detection_threshold: Annotated[
int, "Consecutive identical responses before stall detection triggers (default 3)"
] = 3,
max_history_tokens: Annotated[
int, "Maximum conversation history tokens before compaction (default 32000)"
] = 32000,
) -> str:
"""Configure event loop parameters for EventLoopNode execution.
These settings control how EventLoopNodes behave at runtime:
- max_iterations: prevents infinite loops
- max_tool_calls_per_turn: limits tool calls per LLM response
- stall_detection_threshold: detects when LLM repeats itself
- max_history_tokens: triggers conversation compaction
"""
session = get_session()
session.loop_config = {
"max_iterations": max_iterations,
"max_tool_calls_per_turn": max_tool_calls_per_turn,
"stall_detection_threshold": stall_detection_threshold,
"max_history_tokens": max_history_tokens,
}
_save_session(session)
return json.dumps(
{
"success": True,
"loop_config": session.loop_config,
}
)
@@ -1891,10 +2131,41 @@ def test_node(
result["routing_options"] = node_spec.routes
result["simulation"] = "Router would evaluate routes based on input and select target node"
elif node_spec.node_type in ("llm_generate", "llm_tool_use"):
# Show what prompt would be sent
elif node_spec.node_type == "event_loop":
# EventLoopNode simulation
result["system_prompt"] = node_spec.system_prompt
result["available_tools"] = node_spec.tools
result["client_facing"] = node_spec.client_facing
result["nullable_output_keys"] = node_spec.nullable_output_keys
result["max_node_visits"] = node_spec.max_node_visits
if mock_llm_response:
result["mock_response"] = mock_llm_response
result["simulation"] = (
"EventLoopNode would run a multi-turn streaming loop. "
"Each iteration: LLM call -> tool execution -> judge evaluation. "
"Loop continues until judge ACCEPTs or max_iterations reached."
)
else:
cf_note = (
"Node is client-facing: will block for user input between turns. "
if node_spec.client_facing
else ""
)
result["simulation"] = (
"EventLoopNode would stream LLM responses, execute tool calls, "
"and use judge evaluation to decide when to stop. "
+ cf_note
+ f"Max visits per graph run: {node_spec.max_node_visits}."
)
elif node_spec.node_type in ("llm_generate", "llm_tool_use"):
# Legacy LLM node types
result["system_prompt"] = node_spec.system_prompt
result["available_tools"] = node_spec.tools
result["deprecation_warning"] = (
f"Node type '{node_spec.node_type}' is deprecated. Use 'event_loop' instead."
)
if mock_llm_response:
result["mock_response"] = mock_llm_response
@@ -1909,6 +2180,7 @@ def test_node(
result["expected_memory_state"] = {
"inputs_available": {k: input_data.get(k, "<not provided>") for k in node_spec.input_keys},
"outputs_to_write": node_spec.output_keys,
"nullable_outputs": node_spec.nullable_output_keys or [],
}
return json.dumps(
@@ -1997,13 +2269,19 @@ def test_graph(
"writes": current_node.output_keys,
}
if current_node.node_type in ("llm_generate", "llm_tool_use"):
if current_node.node_type in ("llm_generate", "llm_tool_use", "event_loop"):
step_info["prompt_preview"] = (
current_node.system_prompt[:200] + "..."
if current_node.system_prompt and len(current_node.system_prompt) > 200
else current_node.system_prompt
)
step_info["tools_available"] = current_node.tools
if current_node.node_type == "event_loop":
step_info["event_loop_config"] = {
"client_facing": current_node.client_facing,
"max_node_visits": current_node.max_node_visits,
"nullable_output_keys": current_node.nullable_output_keys,
}
execution_trace.append(step_info)
@@ -2012,17 +2290,33 @@ def test_graph(
step_info["is_terminal"] = True
break
# Find next node via edges
# Find next node via edges (sorted by priority, highest first)
outgoing = sorted(
[e for e in session.edges if e.source == current_node_id],
key=lambda e: -e.priority,
)
next_node = None
for edge in session.edges:
if edge.source == current_node_id:
# In dry run, assume success path
for edge in outgoing:
# In dry run, follow success/always edges (highest priority first)
if edge.condition.value in ("always", "on_success"):
next_node = edge.target
step_info["next_node"] = next_node
step_info["edge_condition"] = edge.condition.value
step_info["edge_priority"] = edge.priority
break
# Note any feedback edges from this node
feedback = [e for e in outgoing if e.priority < 0]
if feedback:
step_info["feedback_edges"] = [
{
"target": e.target,
"condition_expr": e.condition_expr,
"priority": e.priority,
}
for e in feedback
]
if next_node is None:
step_info["note"] = "No outgoing edge found (end of path)"
break