Merge pull request #3557 from TimothyZhang7/feature/tui-dashboard

feat: TUI dashboard, EventLoopNode refinements, auto-creation, data tools, and runner overhaul
This commit is contained in:
Bryan @ Aden
2026-02-04 19:04:41 -08:00
committed by GitHub
27 changed files with 1146 additions and 701 deletions
@@ -267,7 +267,7 @@ This returns JSON with all the goal, nodes, edges, and MCP server configurations
- NOT: `{"first-node-id": ["input_keys"]}` (WRONG)
- NOT: `{"first-node-id"}` (WRONG - this is a set)
**Use the example agent** at `.claude/skills/building-agents-construction/examples/online_research_agent/` as a template for file structure and patterns.
**Use the example agent** at `.claude/skills/building-agents-construction/examples/deep_research_agent/` as a template for file structure and patterns. It demonstrates: STEP 1/STEP 2 prompts, client-facing nodes, feedback loops, nullable_output_keys, and data tools.
**AFTER writing all files, tell the user:**
@@ -354,7 +354,7 @@ mcp__agent-builder__get_session_status()
## REFERENCE: System Prompt Best Practice
For event_loop nodes, instruct the LLM to use `set_output` for structured outputs:
For **internal** event_loop nodes (not client-facing), instruct the LLM to use `set_output`:
```
Use set_output(key, value) to store your results. For example:
@@ -363,71 +363,55 @@ Use set_output(key, value) to store your results. For example:
Do NOT return raw JSON. Use the set_output tool to produce outputs.
```
For **client-facing** event_loop nodes, use the STEP 1/STEP 2 pattern:
```
**STEP 1 — Respond to the user (text only, NO tool calls):**
[Present information, ask questions, etc.]
**STEP 2 — After the user responds, call set_output:**
- set_output("key", "value based on user's response")
```
This prevents the LLM from calling `set_output` before the user has had a chance to respond. The "NO tool calls" instruction in STEP 1 ensures the node blocks for user input before proceeding.
---
## CRITICAL: EventLoopNode Registration
## EventLoopNode Runtime
**`AgentRuntime` does NOT support `event_loop` nodes.** The `AgentRuntime` / `create_agent_runtime()` path creates `GraphExecutor` instances internally without passing a `node_registry`, causing all `event_loop` nodes to fail at runtime with:
```
EventLoopNode 'node-id' not found in registry. Register it with executor.register_node() before execution.
```
**The correct pattern**: Use `GraphExecutor` directly with a `node_registry` dict containing `EventLoopNode` instances:
EventLoopNodes are **auto-created** by `GraphExecutor` at runtime. Both direct `GraphExecutor` and `AgentRuntime` / `create_agent_runtime()` handle event_loop nodes automatically. No manual `node_registry` setup is needed.
```python
from framework.graph.executor import GraphExecutor, ExecutionResult
from framework.graph.event_loop_node import EventLoopNode, LoopConfig
from framework.runtime.event_bus import EventBus
from framework.runtime.core import Runtime # REQUIRED - executor calls runtime.start_run()
# Direct execution
from framework.graph.executor import GraphExecutor
from framework.runtime.core import Runtime
# 1. Build node_registry with EventLoopNode instances
event_bus = EventBus()
node_registry = {}
for node_spec in nodes:
if node_spec.node_type == "event_loop":
node_registry[node_spec.id] = EventLoopNode(
event_bus=event_bus,
judge=None, # implicit judge: accepts when output_keys are filled
config=LoopConfig(
max_iterations=50,
max_tool_calls_per_turn=15,
stall_detection_threshold=3,
max_history_tokens=32000,
),
tool_executor=tool_executor,
)
# 2. Create Runtime for run tracking (GraphExecutor calls runtime.start_run())
storage_path = Path.home() / ".hive" / "my_agent"
storage_path.mkdir(parents=True, exist_ok=True)
runtime = Runtime(storage_path)
# 3. Create GraphExecutor WITH node_registry and runtime
executor = GraphExecutor(
runtime=runtime, # NOT None - executor needs this for run tracking
runtime=runtime,
llm=llm,
tools=tools,
tool_executor=tool_executor,
node_registry=node_registry, # EventLoopNode instances
storage_path=storage_path,
)
# 4. Execute
result = await executor.execute(graph=graph, goal=goal, input_data=input_data)
```
**DO NOT use `AgentRuntime` or `create_agent_runtime()` for agents with `event_loop` nodes.**
**DO NOT pass `runtime=None` to `GraphExecutor`** — it will crash with `'NoneType' object has no attribute 'start_run'`.
---
## COMMON MISTAKES TO AVOID
1. **Using `AgentRuntime` with event_loop nodes** - `AgentRuntime` does not register EventLoopNodes. Use `GraphExecutor` directly with `node_registry`
2. **Passing `runtime=None` to GraphExecutor** - The executor calls `runtime.start_run()` internally. Always provide a `Runtime(storage_path)` instance
3. **Using tools that don't exist** - Always check `mcp__agent-builder__list_mcp_tools()` first
4. **Wrong entry_points format** - Must be `{"start": "node-id"}`, NOT a set or list
5. **Skipping validation** - Always validate nodes and graph before proceeding
6. **Not waiting for approval** - Always ask user before major steps
7. **Displaying this file** - Execute the steps, don't show documentation
1. **Using tools that don't exist** - Always check `mcp__agent-builder__list_mcp_tools()` first
2. **Wrong entry_points format** - Must be `{"start": "node-id"}`, NOT a set or list
3. **Skipping validation** - Always validate nodes and graph before proceeding
4. **Not waiting for approval** - Always ask user before major steps
5. **Displaying this file** - Execute the steps, don't show documentation
6. **Too many thin nodes** - Prefer fewer, richer nodes (4 nodes > 8 nodes)
7. **Missing STEP 1/STEP 2 in client-facing prompts** - Client-facing nodes need explicit phases to prevent premature set_output
8. **Forgetting nullable_output_keys** - Mark input_keys that only arrive on certain edges (e.g., feedback) as nullable on the receiving node
9. **Adding framework gating for LLM behavior** - Fix prompts or use judges, not ad-hoc code
@@ -0,0 +1,24 @@
"""
Deep Research Agent - Interactive, rigorous research with TUI conversation.
Research any topic through multi-source web search, quality evaluation,
and synthesis. Features client-facing TUI interaction at key checkpoints
for user guidance and iterative deepening.
"""
from .agent import DeepResearchAgent, default_agent, goal, nodes, edges
from .config import RuntimeConfig, AgentMetadata, default_config, metadata
__version__ = "1.0.0"
__all__ = [
"DeepResearchAgent",
"default_agent",
"goal",
"nodes",
"edges",
"RuntimeConfig",
"AgentMetadata",
"default_config",
"metadata",
]
@@ -1,5 +1,5 @@
"""
CLI entry point for Online Research Agent.
CLI entry point for Deep Research Agent.
Uses AgentRuntime for multi-entrypoint support with HITL pause/resume.
"""
@@ -10,7 +10,7 @@ import logging
import sys
import click
from .agent import default_agent, OnlineResearchAgent
from .agent import default_agent, DeepResearchAgent
def setup_logging(verbose=False, debug=False):
@@ -28,7 +28,7 @@ def setup_logging(verbose=False, debug=False):
@click.group()
@click.version_option(version="1.0.0")
def cli():
"""Online Research Agent - Deep-dive research with narrative reports."""
"""Deep Research Agent - Interactive, rigorous research with TUI conversation."""
pass
@@ -59,6 +59,83 @@ def run(topic, mock, quiet, verbose, debug):
sys.exit(0 if result.success else 1)
@cli.command()
@click.option("--mock", is_flag=True, help="Run in mock mode")
@click.option("--verbose", "-v", is_flag=True, help="Show execution details")
@click.option("--debug", is_flag=True, help="Show debug logging")
def tui(mock, verbose, debug):
"""Launch the TUI dashboard for interactive research."""
setup_logging(verbose=verbose, debug=debug)
try:
from framework.tui.app import AdenTUI
except ImportError:
click.echo("TUI requires the 'textual' package. Install with: pip install textual")
sys.exit(1)
from pathlib import Path
from framework.llm import LiteLLMProvider
from framework.runner.tool_registry import ToolRegistry
from framework.runtime.agent_runtime import create_agent_runtime
from framework.runtime.event_bus import EventBus
from framework.runtime.execution_stream import EntryPointSpec
async def run_with_tui():
agent = DeepResearchAgent()
# Build graph and tools
agent._event_bus = EventBus()
agent._tool_registry = ToolRegistry()
mcp_config_path = Path(__file__).parent / "mcp_servers.json"
if mcp_config_path.exists():
agent._tool_registry.load_mcp_config(mcp_config_path)
llm = None
if not mock:
llm = LiteLLMProvider(
model=agent.config.model,
api_key=agent.config.api_key,
api_base=agent.config.api_base,
)
tools = list(agent._tool_registry.get_tools().values())
tool_executor = agent._tool_registry.get_executor()
graph = agent._build_graph()
storage_path = Path.home() / ".hive" / "deep_research_agent"
storage_path.mkdir(parents=True, exist_ok=True)
runtime = create_agent_runtime(
graph=graph,
goal=agent.goal,
storage_path=storage_path,
entry_points=[
EntryPointSpec(
id="start",
name="Start Research",
entry_node="intake",
trigger_type="manual",
isolation_level="isolated",
),
],
llm=llm,
tools=tools,
tool_executor=tool_executor,
)
await runtime.start()
try:
app = AdenTUI(runtime)
await app.run_async()
finally:
await runtime.stop()
asyncio.run(run_with_tui())
@cli.command()
@click.option("--json", "output_json", is_flag=True)
def info(output_json):
@@ -71,6 +148,7 @@ def info(output_json):
click.echo(f"Version: {info_data['version']}")
click.echo(f"Description: {info_data['description']}")
click.echo(f"\nNodes: {', '.join(info_data['nodes'])}")
click.echo(f"Client-facing: {', '.join(info_data['client_facing_nodes'])}")
click.echo(f"Entry: {info_data['entry_node']}")
click.echo(f"Terminal: {', '.join(info_data['terminal_nodes'])}")
@@ -81,6 +159,9 @@ def validate():
validation = default_agent.validate()
if validation["valid"]:
click.echo("Agent is valid")
if validation["warnings"]:
for warning in validation["warnings"]:
click.echo(f" WARNING: {warning}")
else:
click.echo("Agent has errors:")
for error in validation["errors"]:
@@ -91,7 +172,7 @@ def validate():
@cli.command()
@click.option("--verbose", "-v", is_flag=True)
def shell(verbose):
"""Interactive research session."""
"""Interactive research session (CLI, no TUI)."""
asyncio.run(_interactive_shell(verbose))
@@ -99,10 +180,10 @@ async def _interactive_shell(verbose=False):
"""Async interactive shell."""
setup_logging(verbose=verbose)
click.echo("=== Online Research Agent ===")
click.echo("=== Deep Research Agent ===")
click.echo("Enter a topic to research (or 'quit' to exit):\n")
agent = OnlineResearchAgent()
agent = DeepResearchAgent()
await agent.start()
try:
@@ -118,7 +199,7 @@ async def _interactive_shell(verbose=False):
if not topic.strip():
continue
click.echo("\nResearching... (this may take a few minutes)\n")
click.echo("\nResearching...\n")
result = await agent.trigger_and_wait("start", {"topic": topic})
@@ -128,16 +209,14 @@ async def _interactive_shell(verbose=False):
if result.success:
output = result.output
if "file_path" in output:
click.echo(f"\nReport saved to: {output['file_path']}\n")
if "final_report" in output:
click.echo("\n--- Report Preview ---\n")
preview = (
output["final_report"][:500] + "..."
if len(output.get("final_report", "")) > 500
else output.get("final_report", "")
)
click.echo(preview)
if "report_content" in output:
click.echo("\n--- Report ---\n")
click.echo(output["report_content"])
click.echo("\n")
if "references" in output:
click.echo("--- References ---\n")
for ref in output.get("references", []):
click.echo(f" [{ref.get('number', '?')}] {ref.get('title', '')} - {ref.get('url', '')}")
click.echo("\n")
else:
click.echo(f"\nResearch failed: {result.error}\n")
@@ -148,7 +227,6 @@ async def _interactive_shell(verbose=False):
except Exception as e:
click.echo(f"Error: {e}", err=True)
import traceback
traceback.print_exc()
finally:
await agent.stop()
@@ -1,9 +1,8 @@
"""Agent graph construction for Online Research Agent."""
"""Agent graph construction for Deep Research Agent."""
from framework.graph import EdgeSpec, EdgeCondition, Goal, SuccessCriterion, Constraint
from framework.graph.edge import GraphSpec
from framework.graph.executor import ExecutionResult, GraphExecutor
from framework.graph.event_loop_node import EventLoopNode, LoopConfig
from framework.runtime.event_bus import EventBus
from framework.runtime.core import Runtime
from framework.llm import LiteLLMProvider
@@ -11,164 +10,132 @@ from framework.runner.tool_registry import ToolRegistry
from .config import default_config, metadata
from .nodes import (
parse_query_node,
search_sources_node,
fetch_content_node,
evaluate_sources_node,
synthesize_findings_node,
write_report_node,
quality_check_node,
save_report_node,
intake_node,
research_node,
review_node,
report_node,
)
# Goal definition
goal = Goal(
id="comprehensive-online-research",
name="Comprehensive Online Research",
description="Research any topic by searching multiple sources, synthesizing information, and producing a well-structured narrative report with citations.",
id="rigorous-interactive-research",
name="Rigorous Interactive Research",
description=(
"Research any topic by searching diverse sources, analyzing findings, "
"and producing a cited report — with user checkpoints to guide direction."
),
success_criteria=[
SuccessCriterion(
id="source-coverage",
description="Query 10+ diverse sources",
id="source-diversity",
description="Use multiple diverse, authoritative sources",
metric="source_count",
target=">=10",
weight=0.20,
),
SuccessCriterion(
id="relevance",
description="All sources directly address the query",
metric="relevance_score",
target="90%",
target=">=5",
weight=0.25,
),
SuccessCriterion(
id="synthesis",
description="Synthesize findings into coherent narrative",
metric="coherence_score",
target="85%",
weight=0.25,
),
SuccessCriterion(
id="citations",
description="Include citations for all claims",
id="citation-coverage",
description="Every factual claim in the report cites its source",
metric="citation_coverage",
target="100%",
weight=0.15,
weight=0.25,
),
SuccessCriterion(
id="actionable",
description="Report answers the user's question",
metric="answer_completeness",
id="user-satisfaction",
description="User reviews findings before report generation",
metric="user_approval",
target="true",
weight=0.25,
),
SuccessCriterion(
id="report-completeness",
description="Final report answers the original research questions",
metric="question_coverage",
target="90%",
weight=0.15,
weight=0.25,
),
],
constraints=[
Constraint(
id="no-hallucination",
description="Only include information found in sources",
description="Only include information found in fetched sources",
constraint_type="quality",
category="accuracy",
),
Constraint(
id="source-attribution",
description="Every factual claim must cite its source",
description="Every claim must cite its source with a numbered reference",
constraint_type="quality",
category="accuracy",
),
Constraint(
id="recency-preference",
description="Prefer recent sources when relevant",
constraint_type="quality",
category="relevance",
),
Constraint(
id="no-paywalled",
description="Avoid sources that require payment to access",
id="user-checkpoint",
description="Present findings to the user before writing the final report",
constraint_type="functional",
category="accessibility",
category="interaction",
),
],
)
# Node list
nodes = [
parse_query_node,
search_sources_node,
fetch_content_node,
evaluate_sources_node,
synthesize_findings_node,
write_report_node,
quality_check_node,
save_report_node,
intake_node,
research_node,
review_node,
report_node,
]
# Edge definitions
edges = [
# intake -> research
EdgeSpec(
id="parse-to-search",
source="parse-query",
target="search-sources",
id="intake-to-research",
source="intake",
target="research",
condition=EdgeCondition.ON_SUCCESS,
priority=1,
),
# research -> review
EdgeSpec(
id="search-to-fetch",
source="search-sources",
target="fetch-content",
id="research-to-review",
source="research",
target="review",
condition=EdgeCondition.ON_SUCCESS,
priority=1,
),
# review -> research (feedback loop)
EdgeSpec(
id="fetch-to-evaluate",
source="fetch-content",
target="evaluate-sources",
condition=EdgeCondition.ON_SUCCESS,
id="review-to-research-feedback",
source="review",
target="research",
condition=EdgeCondition.CONDITIONAL,
condition_expr="needs_more_research == True",
priority=1,
),
# review -> report (user satisfied)
EdgeSpec(
id="evaluate-to-synthesize",
source="evaluate-sources",
target="synthesize-findings",
condition=EdgeCondition.ON_SUCCESS,
priority=1,
),
EdgeSpec(
id="synthesize-to-write",
source="synthesize-findings",
target="write-report",
condition=EdgeCondition.ON_SUCCESS,
priority=1,
),
EdgeSpec(
id="write-to-quality",
source="write-report",
target="quality-check",
condition=EdgeCondition.ON_SUCCESS,
priority=1,
),
EdgeSpec(
id="quality-to-save",
source="quality-check",
target="save-report",
condition=EdgeCondition.ON_SUCCESS,
priority=1,
id="review-to-report",
source="review",
target="report",
condition=EdgeCondition.CONDITIONAL,
condition_expr="needs_more_research == False",
priority=2,
),
]
# Graph configuration
entry_node = "parse-query"
entry_points = {"start": "parse-query"}
entry_node = "intake"
entry_points = {"start": "intake"}
pause_nodes = []
terminal_nodes = ["save-report"]
terminal_nodes = ["report"]
class OnlineResearchAgent:
class DeepResearchAgent:
"""
Online Research Agent - Deep-dive research with narrative reports.
Deep Research Agent 4-node pipeline with user checkpoints.
Uses GraphExecutor directly with EventLoopNode instances registered
in the node_registry for multi-turn tool execution.
Flow: intake -> research -> review -> report
^ |
+-- feedback loop (if user wants more)
"""
def __init__(self, config=None):
@@ -188,7 +155,7 @@ class OnlineResearchAgent:
def _build_graph(self) -> GraphSpec:
"""Build the GraphSpec."""
return GraphSpec(
id="online-research-agent-graph",
id="deep-research-agent-graph",
goal_id=self.goal.id,
version="1.0.0",
entry_node=self.entry_node,
@@ -201,29 +168,11 @@ class OnlineResearchAgent:
max_tokens=self.config.max_tokens,
)
def _build_node_registry(self, tool_executor=None) -> dict:
"""Create EventLoopNode instances for all event_loop nodes."""
registry = {}
for node_spec in self.nodes:
if node_spec.node_type == "event_loop":
registry[node_spec.id] = EventLoopNode(
event_bus=self._event_bus,
judge=None, # implicit judge: accept when output_keys are filled
config=LoopConfig(
max_iterations=50,
max_tool_calls_per_turn=15,
stall_detection_threshold=3,
max_history_tokens=32000,
),
tool_executor=tool_executor,
)
return registry
def _setup(self, mock_mode=False) -> GraphExecutor:
"""Set up the executor with all components."""
from pathlib import Path
storage_path = Path.home() / ".hive" / "online_research_agent"
storage_path = Path.home() / ".hive" / "deep_research_agent"
storage_path.mkdir(parents=True, exist_ok=True)
self._event_bus = EventBus()
@@ -245,7 +194,6 @@ class OnlineResearchAgent:
tools = list(self._tool_registry.get_tools().values())
self._graph = self._build_graph()
node_registry = self._build_node_registry(tool_executor=tool_executor)
runtime = Runtime(storage_path)
self._executor = GraphExecutor(
@@ -253,7 +201,8 @@ class OnlineResearchAgent:
llm=llm,
tools=tools,
tool_executor=tool_executor,
node_registry=node_registry,
event_bus=self._event_bus,
storage_path=storage_path,
)
return self._executor
@@ -317,7 +266,7 @@ class OnlineResearchAgent:
"entry_points": self.entry_points,
"pause_nodes": self.pause_nodes,
"terminal_nodes": self.terminal_nodes,
"multi_entrypoint": True,
"client_facing_nodes": [n.id for n in self.nodes if n.client_facing],
}
def validate(self):
@@ -339,10 +288,6 @@ class OnlineResearchAgent:
if terminal not in node_ids:
errors.append(f"Terminal node '{terminal}' not found")
for pause in self.pause_nodes:
if pause not in node_ids:
errors.append(f"Pause node '{pause}' not found")
for ep_id, node_id in self.entry_points.items():
if node_id not in node_ids:
errors.append(
@@ -357,4 +302,4 @@ class OnlineResearchAgent:
# Create default instance
default_agent = OnlineResearchAgent()
default_agent = DeepResearchAgent()
@@ -32,12 +32,15 @@ class RuntimeConfig:
default_config = RuntimeConfig()
# Agent metadata
@dataclass
class AgentMetadata:
name: str = "Online Research Agent"
name: str = "Deep Research Agent"
version: str = "1.0.0"
description: str = "Research any topic by searching multiple sources, synthesizing information, and producing a well-structured narrative report with citations."
description: str = (
"Interactive research agent that rigorously investigates topics through "
"multi-source search, quality evaluation, and synthesis - with TUI conversation "
"at key checkpoints for user guidance and feedback."
)
metadata = AgentMetadata()
@@ -0,0 +1,147 @@
"""Node definitions for Deep Research Agent."""
from framework.graph import NodeSpec
# Node 1: Intake (client-facing)
# Brief conversation to clarify what the user wants researched.
intake_node = NodeSpec(
id="intake",
name="Research Intake",
description="Discuss the research topic with the user, clarify scope, and confirm direction",
node_type="event_loop",
client_facing=True,
input_keys=["topic"],
output_keys=["research_brief"],
system_prompt="""\
You are a research intake specialist. The user wants to research a topic.
Have a brief conversation to clarify what they need.
**STEP 1 Read and respond (text only, NO tool calls):**
1. Read the topic provided
2. If it's vague, ask 1-2 clarifying questions (scope, angle, depth)
3. If it's already clear, confirm your understanding and ask the user to confirm
Keep it short. Don't over-ask.
**STEP 2 After the user confirms, call set_output:**
- set_output("research_brief", "A clear paragraph describing exactly what to research, \
what questions to answer, what scope to cover, and how deep to go.")
""",
tools=[],
)
# Node 2: Research
# The workhorse — searches the web, fetches content, analyzes sources.
# One node with both tools avoids the context-passing overhead of 5 separate nodes.
research_node = NodeSpec(
id="research",
name="Research",
description="Search the web, fetch source content, and compile findings",
node_type="event_loop",
max_node_visits=3,
input_keys=["research_brief", "feedback"],
output_keys=["findings", "sources", "gaps"],
nullable_output_keys=["feedback"],
system_prompt="""\
You are a research agent. Given a research brief, find and analyze sources.
If feedback is provided, this is a follow-up round focus on the gaps identified.
Work in phases:
1. **Search**: Use web_search with 3-5 diverse queries covering different angles.
Prioritize authoritative sources (.edu, .gov, established publications).
2. **Fetch**: Use web_scrape on the most promising URLs (aim for 5-8 sources).
Skip URLs that fail. Extract the substantive content.
3. **Analyze**: Review what you've collected. Identify key findings, themes,
and any contradictions between sources.
Important:
- Work in batches of 3-4 tool calls at a time to manage context
- After each batch, assess whether you have enough material
- Prefer quality over quantity 5 good sources beat 15 thin ones
- Track which URL each finding comes from (you'll need citations later)
When done, use set_output:
- set_output("findings", "Structured summary: key findings with source URLs for each claim. \
Include themes, contradictions, and confidence levels.")
- set_output("sources", [{"url": "...", "title": "...", "summary": "..."}])
- set_output("gaps", "What aspects of the research brief are NOT well-covered yet, if any.")
""",
tools=["web_search", "web_scrape", "load_data", "save_data", "list_data_files"],
)
# Node 3: Review (client-facing)
# Shows the user what was found and asks whether to dig deeper or proceed.
review_node = NodeSpec(
id="review",
name="Review Findings",
description="Present findings to user and decide whether to research more or write the report",
node_type="event_loop",
client_facing=True,
max_node_visits=3,
input_keys=["findings", "sources", "gaps", "research_brief"],
output_keys=["needs_more_research", "feedback"],
system_prompt="""\
Present the research findings to the user clearly and concisely.
**STEP 1 Present (your first message, text only, NO tool calls):**
1. **Summary** (2-3 sentences of what was found)
2. **Key Findings** (bulleted, with confidence levels)
3. **Sources Used** (count and quality assessment)
4. **Gaps** (what's still unclear or under-covered)
End by asking: Are they satisfied, or do they want deeper research? \
Should we proceed to writing the final report?
**STEP 2 After the user responds, call set_output:**
- set_output("needs_more_research", "true") if they want more
- set_output("needs_more_research", "false") if they're satisfied
- set_output("feedback", "What the user wants explored further, or empty string")
""",
tools=[],
)
# Node 4: Report (client-facing)
# Writes the final report and presents it to the user.
report_node = NodeSpec(
id="report",
name="Write & Deliver Report",
description="Write a cited report from the findings and present it to the user",
node_type="event_loop",
client_facing=True,
input_keys=["findings", "sources", "research_brief"],
output_keys=["delivery_status"],
system_prompt="""\
Write a comprehensive research report and present it to the user.
**STEP 1 Write and present the report (text only, NO tool calls):**
Report structure:
1. **Executive Summary** (2-3 paragraphs)
2. **Findings** (organized by theme, with [n] citations)
3. **Analysis** (synthesis, implications, areas of debate)
4. **Conclusion** (key takeaways, confidence assessment)
5. **References** (numbered list of sources cited)
Requirements:
- Every factual claim must cite its source with [n] notation
- Be objective present multiple viewpoints where sources disagree
- Distinguish well-supported conclusions from speculation
- Answer the original research questions from the brief
End by asking the user if they have questions or want to save the report.
**STEP 2 After the user responds:**
- Answer follow-up questions from the research material
- If they want to save, use write_to_file tool
- When the user is satisfied: set_output("delivery_status", "completed")
""",
tools=["write_to_file"],
)
__all__ = [
"intake_node",
"research_node",
"review_node",
"report_node",
]
@@ -1,80 +0,0 @@
# Online Research Agent
Deep-dive research agent that searches 10+ sources and produces comprehensive narrative reports with citations.
## Features
- Generates multiple search queries from a topic
- Searches and fetches 15+ web sources
- Evaluates and ranks sources by relevance
- Synthesizes findings into themes
- Writes narrative report with numbered citations
- Quality checks for uncited claims
- Saves report to local markdown file
## Usage
### CLI
```bash
# Show agent info
python -m online_research_agent info
# Validate structure
python -m online_research_agent validate
# Run research on a topic
python -m online_research_agent run --topic "impact of AI on healthcare"
# Interactive shell
python -m online_research_agent shell
```
### Python API
```python
from online_research_agent import default_agent
# Simple usage
result = await default_agent.run({"topic": "climate change solutions"})
# Check output
if result.success:
print(f"Report saved to: {result.output['file_path']}")
print(result.output['final_report'])
```
## Workflow
```
parse-query → search-sources → fetch-content → evaluate-sources
write-report ← synthesize-findings
quality-check → save-report
```
## Output
Reports are saved to `./research_reports/` as markdown files with:
1. Executive Summary
2. Introduction
3. Key Findings (by theme)
4. Analysis
5. Conclusion
6. References
## Requirements
- Python 3.11+
- LLM provider API key (Groq, Cerebras, etc.)
- Internet access for web search/fetch
## Configuration
Edit `config.py` to change:
- `model`: LLM model (default: groq/moonshotai/kimi-k2-instruct-0905)
- `temperature`: Generation temperature (default: 0.7)
- `max_tokens`: Max tokens per response (default: 16384)
@@ -1,23 +0,0 @@
"""
Online Research Agent - Deep-dive research with narrative reports.
Research any topic by searching multiple sources, synthesizing information,
and producing a well-structured narrative report with citations.
"""
from .agent import OnlineResearchAgent, default_agent, goal, nodes, edges
from .config import RuntimeConfig, AgentMetadata, default_config, metadata
__version__ = "1.0.0"
__all__ = [
"OnlineResearchAgent",
"default_agent",
"goal",
"nodes",
"edges",
"RuntimeConfig",
"AgentMetadata",
"default_config",
"metadata",
]
@@ -1,232 +0,0 @@
"""Node definitions for Online Research Agent."""
from framework.graph import NodeSpec
# Node 1: Parse Query
parse_query_node = NodeSpec(
id="parse-query",
name="Parse Query",
description="Analyze the research topic and generate 3-5 diverse search queries to cover different aspects",
node_type="event_loop",
input_keys=["topic"],
output_keys=["search_queries", "research_focus", "key_aspects"],
system_prompt="""\
You are a research query strategist. Given a research topic, analyze it and generate search queries.
Your task:
1. Understand the core research question
2. Identify 3-5 key aspects to investigate
3. Generate 3-5 diverse search queries that will find comprehensive information
Use set_output to store each result:
- set_output("research_focus", "Brief statement of what we're researching")
- set_output("key_aspects", ["aspect1", "aspect2", "aspect3"])
- set_output("search_queries", ["query 1", "query 2", "query 3", "query 4", "query 5"])
""",
tools=[],
)
# Node 2: Search Sources
search_sources_node = NodeSpec(
id="search-sources",
name="Search Sources",
description="Execute web searches using the generated queries to find 15+ source URLs",
node_type="event_loop",
input_keys=["search_queries", "research_focus"],
output_keys=["source_urls", "search_results_summary"],
system_prompt="""\
You are a research assistant executing web searches. Use the web_search tool to find sources.
Your task:
1. Execute each search query using web_search tool
2. Collect URLs from search results
3. Aim for 15+ diverse sources
After searching, use set_output to store results:
- set_output("source_urls", ["url1", "url2", ...])
- set_output("search_results_summary", "Brief summary of what was found")
""",
tools=["web_search"],
)
# Node 3: Fetch Content
fetch_content_node = NodeSpec(
id="fetch-content",
name="Fetch Content",
description="Fetch and extract content from the discovered source URLs",
node_type="event_loop",
input_keys=["source_urls", "research_focus"],
output_keys=["fetched_sources", "fetch_errors"],
system_prompt="""\
You are a content fetcher. Use web_scrape tool to retrieve content from URLs.
Your task:
1. Fetch content from each source URL using web_scrape tool
2. Extract the main content relevant to the research focus
3. Track any URLs that failed to fetch
After fetching, use set_output to store results:
- set_output("fetched_sources", [{"url": "...", "title": "...", "content": "..."}])
- set_output("fetch_errors", ["url that failed", ...])
""",
tools=["web_scrape"],
)
# Node 4: Evaluate Sources
evaluate_sources_node = NodeSpec(
id="evaluate-sources",
name="Evaluate Sources",
description="Score sources for relevance and quality, filter to top 10",
node_type="event_loop",
input_keys=["fetched_sources", "research_focus", "key_aspects"],
output_keys=["ranked_sources", "source_analysis"],
system_prompt="""\
You are a source evaluator. Assess each source for quality and relevance.
Scoring criteria:
- Relevance to research focus (1-10)
- Source credibility (1-10)
- Information depth (1-10)
- Recency if relevant (1-10)
Your task:
1. Score each source
2. Rank by combined score
3. Select top 10 sources
4. Note what each source uniquely contributes
Use set_output to store results:
- set_output("ranked_sources", [{"url": "...", "title": "...", "score": 8.5}])
- set_output("source_analysis", "Overview of source quality and coverage")
""",
tools=[],
)
# Node 5: Synthesize Findings
synthesize_findings_node = NodeSpec(
id="synthesize-findings",
name="Synthesize Findings",
description="Extract key facts from sources and identify common themes",
node_type="event_loop",
input_keys=["ranked_sources", "research_focus", "key_aspects"],
output_keys=["key_findings", "themes", "source_citations"],
system_prompt="""\
You are a research synthesizer. Analyze multiple sources to extract insights.
Your task:
1. Identify key facts from each source
2. Find common themes across sources
3. Note contradictions or debates
4. Build a citation map (fact -> source URL)
Use set_output to store each result:
- set_output("key_findings", [{"finding": "...", "sources": ["url1"], "confidence": "high"}])
- set_output("themes", [{"theme": "...", "description": "...", "supporting_sources": [...]}])
- set_output("source_citations", {"fact or claim": ["url1", "url2"]})
""",
tools=[],
)
# Node 6: Write Report
write_report_node = NodeSpec(
id="write-report",
name="Write Report",
description="Generate a narrative report with proper citations",
node_type="event_loop",
input_keys=[
"key_findings",
"themes",
"source_citations",
"research_focus",
"ranked_sources",
],
output_keys=["report_content", "references"],
system_prompt="""\
You are a research report writer. Create a well-structured narrative report.
Report structure:
1. Executive Summary (2-3 paragraphs)
2. Introduction (context and scope)
3. Key Findings (organized by theme)
4. Analysis (synthesis and implications)
5. Conclusion
6. References (numbered list of all sources)
Citation format: Use numbered citations like [1], [2] that correspond to the References section.
IMPORTANT:
- Every factual claim MUST have a citation
- Write in clear, professional prose
- Be objective and balanced
- Highlight areas of consensus and debate
Use set_output to store results:
- set_output("report_content", "Full markdown report text with citations...")
- set_output("references", [{"number": 1, "url": "...", "title": "..."}])
""",
tools=[],
)
# Node 7: Quality Check
quality_check_node = NodeSpec(
id="quality-check",
name="Quality Check",
description="Verify all claims have citations and report is coherent",
node_type="event_loop",
input_keys=["report_content", "references", "source_citations"],
output_keys=["quality_score", "issues", "final_report"],
system_prompt="""\
You are a quality assurance reviewer. Check the research report for issues.
Check for:
1. Uncited claims (factual statements without [n] citation)
2. Broken citations (references to non-existent numbers)
3. Coherence (logical flow between sections)
4. Completeness (all key aspects covered)
5. Accuracy (claims match source content)
If issues found, fix them in the final report.
Use set_output to store results:
- set_output("quality_score", 0.95)
- set_output("issues", [{"type": "uncited_claim", "location": "...", "fixed": true}])
- set_output("final_report", "Corrected full report with all issues fixed...")
""",
tools=[],
)
# Node 8: Save Report
save_report_node = NodeSpec(
id="save-report",
name="Save Report",
description="Write the final report to a local markdown file",
node_type="event_loop",
input_keys=["final_report", "references", "research_focus"],
output_keys=["file_path", "save_status"],
system_prompt="""\
You are a file manager. Save the research report to disk.
Your task:
1. Generate a filename from the research focus (slugified, with date)
2. Use the write_to_file tool to save the report as markdown
3. Save to the ./research_reports/ directory
Filename format: research_YYYY-MM-DD_topic-slug.md
Use set_output to store results:
- set_output("file_path", "research_reports/research_2026-01-23_topic-name.md")
- set_output("save_status", "success")
""",
tools=["write_to_file"],
)
__all__ = [
"parse_query_node",
"search_sources_node",
"fetch_content_node",
"evaluate_sources_node",
"synthesize_findings_node",
"write_report_node",
"quality_check_node",
"save_report_node",
]
+70 -22
View File
@@ -158,6 +158,43 @@ intake_node = NodeSpec(
> **Legacy Note:** The old `pause_nodes` / `entry_points` pattern still works but `client_facing=True` is preferred for new agents.
**STEP 1 / STEP 2 Prompt Pattern:** For client-facing nodes, structure the system prompt with two explicit phases:
```python
system_prompt="""\
**STEP 1 — Respond to the user (text only, NO tool calls):**
[Present information, ask questions, etc.]
**STEP 2 — After the user responds, call set_output:**
[Call set_output with the structured outputs]
"""
```
This prevents the LLM from calling `set_output` prematurely before the user has had a chance to respond.
### Node Design: Fewer, Richer Nodes
Prefer fewer nodes that do more work over many thin single-purpose nodes:
- **Bad**: 8 thin nodes (parse query → search → fetch → evaluate → synthesize → write → check → save)
- **Good**: 4 rich nodes (intake → research → review → report)
Why: Each node boundary requires serializing outputs and passing context. Fewer nodes means the LLM retains full context of its work within the node. A research node that searches, fetches, and analyzes keeps all the source material in its conversation history.
### nullable_output_keys for Cross-Edge Inputs
When a node receives inputs that only arrive on certain edges (e.g., `feedback` only comes from a review → research feedback loop, not from intake → research), mark those keys as `nullable_output_keys`:
```python
research_node = NodeSpec(
id="research",
input_keys=["research_brief", "feedback"],
nullable_output_keys=["feedback"], # Not present on first visit
max_node_visits=3,
...
)
```
## Event Loop Architecture Concepts
### How EventLoopNode Works
@@ -169,40 +206,30 @@ An event loop node runs a multi-turn loop:
4. Judge evaluates: ACCEPT (exit loop), RETRY (loop again), or ESCALATE
5. Repeat until judge ACCEPTs or max_iterations reached
### CRITICAL: EventLoopNode Runtime Requirements
### EventLoopNode Runtime
EventLoopNodes are **not auto-created** by the graph executor. They must be explicitly instantiated and registered in a `node_registry` dict before execution.
**Required components:**
1. **`EventLoopNode` instances** — One per event_loop NodeSpec, registered in `node_registry`
2. **`Runtime` instance** — `GraphExecutor` calls `runtime.start_run()` internally. Passing `None` crashes the executor
3. **`GraphExecutor` (not `AgentRuntime`)** — `AgentRuntime`/`create_agent_runtime()` does NOT pass `node_registry` to the internal `GraphExecutor`, so all event_loop nodes fail with "not found in registry"
EventLoopNodes are **auto-created** by `GraphExecutor` at runtime. You do NOT need to manually register them. Both `GraphExecutor` (direct) and `AgentRuntime` / `create_agent_runtime()` handle event_loop nodes automatically.
```python
# Direct execution — executor auto-creates EventLoopNodes
from framework.graph.executor import GraphExecutor
from framework.graph.event_loop_node import EventLoopNode, LoopConfig
from framework.runtime.event_bus import EventBus
from framework.runtime.core import Runtime
# Build node_registry
event_bus = EventBus()
node_registry = {}
for node_spec in nodes:
if node_spec.node_type == "event_loop":
node_registry[node_spec.id] = EventLoopNode(
event_bus=event_bus,
config=LoopConfig(max_iterations=50, max_tool_calls_per_turn=15),
tool_executor=tool_executor,
)
# Create executor with Runtime and node_registry
runtime = Runtime(storage_path)
executor = GraphExecutor(
runtime=runtime,
llm=llm,
tools=tools,
tool_executor=tool_executor,
node_registry=node_registry,
storage_path=storage_path,
)
result = await executor.execute(graph=graph, goal=goal, input_data=input_data)
# TUI execution — AgentRuntime also works
from framework.runtime.agent_runtime import create_agent_runtime
runtime = create_agent_runtime(
graph=graph, goal=goal, storage_path=storage_path,
entry_points=[...], llm=llm, tools=tools, tool_executor=tool_executor,
)
```
@@ -210,8 +237,12 @@ executor = GraphExecutor(
Nodes produce structured outputs by calling `set_output(key, value)` — a synthetic tool injected by the framework. When the LLM calls `set_output`, the value is stored in the output accumulator and made available to downstream nodes via shared memory.
`set_output` is NOT a real tool — it is excluded from `real_tool_results`. For client-facing nodes, this means a turn where the LLM only calls `set_output` (no other tools) is treated as a conversational boundary and will block for user input.
### JudgeProtocol
**The judge is the SOLE mechanism for acceptance decisions.** Do not add ad-hoc framework gating, output rollback, or premature rejection logic. If the LLM calls `set_output` too early, fix it with better prompts or a custom judge — not framework-level guards.
The judge controls when a node's loop exits:
- **Implicit judge** (default, no judge configured): ACCEPTs when the LLM finishes with no tool calls and all required output keys are set
- **SchemaJudge**: Validates outputs against a Pydantic model
@@ -225,6 +256,23 @@ Controls loop behavior:
- `stall_detection_threshold` (default 3) — detects repeated identical responses
- `max_history_tokens` (default 32000) — triggers conversation compaction
### Data Tools (Spillover Management)
When tool results exceed the context window, the framework automatically saves them to a spillover directory and truncates with a hint. Nodes that produce or consume large data should include the data tools:
- `save_data(filename, data, data_dir)` — Write data to a file in the data directory
- `load_data(filename, data_dir, offset=0, limit=50)` — Read data with line-based pagination
- `list_data_files(data_dir)` — List available data files
These are real MCP tools (not synthetic). Add them to nodes that handle large tool results:
```python
research_node = NodeSpec(
...
tools=["web_search", "web_scrape", "load_data", "save_data", "list_data_files"],
)
```
### Fan-Out / Fan-In
Multiple ON_SUCCESS edges from the same source create parallel execution. All branches run concurrently via `asyncio.gather()`. Parallel event_loop nodes must have disjoint `output_keys`.
@@ -61,28 +61,38 @@ For agents needing multi-turn conversations with users, use `client_facing=True`
A client-facing node streams LLM output to the user and blocks for user input between conversational turns. This replaces the old pause/resume pattern.
```python
# Client-facing node blocks for user input
# Client-facing node with STEP 1/STEP 2 prompt pattern
intake_node = NodeSpec(
id="intake",
name="Intake",
description="Gather requirements from the user",
node_type="event_loop",
client_facing=True,
input_keys=[],
output_keys=["repo_url", "project_url"],
system_prompt="You are the intake agent. Ask the user for their repo URL and project URL. When you have both, call set_output for each.",
input_keys=["topic"],
output_keys=["research_brief"],
system_prompt="""\
You are an intake specialist.
**STEP 1 — Read and respond (text only, NO tool calls):**
1. Read the topic provided
2. If it's vague, ask 1-2 clarifying questions
3. If it's clear, confirm your understanding
**STEP 2 — After the user confirms, call set_output:**
- set_output("research_brief", "Clear description of what to research")
""",
)
# Internal node runs without user interaction
scanner_node = NodeSpec(
id="scanner",
name="Scanner",
description="Scan the repository",
research_node = NodeSpec(
id="research",
name="Research",
description="Search and analyze sources",
node_type="event_loop",
input_keys=["repo_url"],
output_keys=["scan_results"],
system_prompt="Scan the repository at {repo_url}...",
tools=["scan_github_repo"],
input_keys=["research_brief"],
output_keys=["findings", "sources"],
system_prompt="Research the topic using web_search and web_scrape...",
tools=["web_search", "web_scrape", "load_data", "save_data"],
)
```
@@ -91,6 +101,9 @@ scanner_node = NodeSpec(
- User input is injected via `node.inject_event(text)`
- When the LLM calls `set_output` to produce structured outputs, the judge evaluates and ACCEPTs
- Internal nodes (non-client-facing) run their entire loop without blocking
- `set_output` is a synthetic tool — a turn with only `set_output` calls (no real tools) triggers user input blocking
**STEP 1/STEP 2 pattern:** Always structure client-facing prompts with explicit phases. STEP 1 is text-only conversation. STEP 2 calls `set_output` after user confirmation. This prevents the LLM from calling `set_output` prematurely before the user responds.
### When to Use client_facing
@@ -160,6 +173,12 @@ EdgeSpec(
## Judge Patterns
**Core Principle: The judge is the SOLE mechanism for acceptance decisions.** Never add ad-hoc framework gating to compensate for LLM behavior. If the LLM calls `set_output` prematurely, fix the system prompt or use a custom judge. Anti-patterns to avoid:
- Output rollback logic
- `_user_has_responded` flags
- Premature set_output rejection
- Interaction protocol injection into system prompts
Judges control when an event_loop node's loop exits. Choose based on validation needs.
### Implicit Judge (Default)
@@ -241,15 +260,34 @@ EventLoopNode automatically manages context window usage with tiered compaction:
### Spillover Pattern
For large tool results, use `save_data()` to write to disk and pass the filename through `set_output`. This keeps the LLM context window small.
The framework automatically truncates large tool results and saves full content to a spillover directory. The LLM receives a truncation message with instructions to use `load_data` to read the full result.
```
LLM calls save_data(filename, large_data) → file written to spillover/
LLM calls set_output("results_file", filename) → filename stored in output
Downstream node calls load_data(filename) → reads from spillover/
For explicit data management, use the data tools (real MCP tools, not synthetic):
```python
# save_data, load_data, list_data_files are real MCP tools
# Each takes a data_dir parameter since the MCP server is shared
# Saving large results
save_data(filename="sources.json", data=large_json_string, data_dir="/path/to/spillover")
# Reading with pagination (line-based offset/limit)
load_data(filename="sources.json", data_dir="/path/to/spillover", offset=0, limit=50)
# Listing available files
list_data_files(data_dir="/path/to/spillover")
```
The `load_data()` tool supports `offset` and `limit` parameters for paginated reading of large files.
Add data tools to nodes that handle large tool results:
```python
research_node = NodeSpec(
...
tools=["web_search", "web_scrape", "load_data", "save_data", "list_data_files"],
)
```
The `data_dir` is passed by the framework (from the node's spillover directory). The LLM sees `data_dir` in truncation messages and uses it when calling `load_data`.
## Anti-Patterns
@@ -259,6 +297,29 @@ The `load_data()` tool supports `offset` and `limit` parameters for paginated re
- **Don't hide code in session** — Write to files as components are approved
- **Don't wait to write files** — Agent visible from first step
- **Don't batch everything** — Write incrementally, one component at a time
- **Don't create too many thin nodes** — Prefer fewer, richer nodes (see below)
- **Don't add framework gating for LLM behavior** — Fix prompts or use judges instead
### Fewer, Richer Nodes
A common mistake is splitting work into too many small single-purpose nodes. Each node boundary requires serializing outputs, losing in-context information, and adding edge complexity.
| Bad (8 thin nodes) | Good (4 rich nodes) |
|---------------------|---------------------|
| parse-query | intake (client-facing) |
| search-sources | research (search + fetch + analyze) |
| fetch-content | review (client-facing) |
| evaluate-sources | report (write + deliver) |
| synthesize-findings | |
| write-report | |
| quality-check | |
| save-report | |
**Why fewer nodes are better:**
- The LLM retains full context of its work within a single node
- A research node that searches, fetches, and analyzes keeps all source material in its conversation history
- Fewer edges means simpler graph and fewer failure points
- Data tools (`save_data`/`load_data`) handle context window limits within a single node
### MCP Tools - Correct Usage
+110 -84
View File
@@ -144,19 +144,19 @@ class EventLoopNode(NodeProtocol):
1. Try to restore from durable state (crash recovery)
2. If no prior state, init from NodeSpec.system_prompt + input_keys
3. Loop: drain injection queue -> stream LLM -> execute tools
-> if client_facing + no tools: block for user input (inject_event)
-> if not client_facing or tools present: judge evaluates
-> if client_facing + no real tools: block for user input
-> judge evaluates (acceptance criteria)
(each add_* and set_output writes through to store immediately)
4. Publish events to EventBus at each stage
5. Write cursor after each iteration
6. Terminate when judge returns ACCEPT, shutdown signaled, or max iterations
7. Build output dict from OutputAccumulator
Client-facing blocking: When ``client_facing=True`` and the LLM produces
text without tool calls (a natural conversational turn), the node blocks
via ``_await_user_input()`` until ``inject_event()`` or ``signal_shutdown()``
is called. This separates blocking (node concern) from output evaluation
(judge concern).
Client-facing blocking: When ``client_facing=True`` and the LLM finishes
without real tool calls (stop_reason != tool_call), the node blocks via
``_await_user_input()`` until ``inject_event()`` or ``signal_shutdown()``
is called. After user input, the judge evaluates the judge is the
sole mechanism for acceptance decisions.
Always returns NodeResult with retryable=False semantics. The executor
must NOT retry event loop nodes -- retry is handled internally by the
@@ -212,8 +212,10 @@ class EventLoopNode(NodeProtocol):
# 2. Restore or create new conversation + accumulator
conversation, accumulator, start_iteration = await self._restore(ctx)
if conversation is None:
system_prompt = ctx.node_spec.system_prompt or ""
conversation = NodeConversation(
system_prompt=ctx.node_spec.system_prompt or "",
system_prompt=system_prompt,
max_history_tokens=self._config.max_history_tokens,
output_keys=ctx.node_spec.output_keys or None,
store=self._conversation_store,
@@ -276,15 +278,20 @@ class EventLoopNode(NodeProtocol):
iteration,
len(conversation.messages),
)
assistant_text, tool_results_list, turn_tokens = await self._run_single_turn(
ctx, conversation, tools, iteration, accumulator
)
(
assistant_text,
real_tool_results,
outputs_set,
turn_tokens,
) = await self._run_single_turn(ctx, conversation, tools, iteration, accumulator)
logger.info(
"[%s] iter=%d: LLM done — text=%d chars, tool_calls=%d, tokens=%s, accumulator=%s",
"[%s] iter=%d: LLM done — text=%d chars, real_tools=%d, "
"outputs_set=%s, tokens=%s, accumulator=%s",
node_id,
iteration,
len(assistant_text),
len(tool_results_list),
len(real_tool_results),
outputs_set or "[]",
turn_tokens,
{k: ("set" if v is not None else "None") for k, v in accumulator.to_dict().items()},
)
@@ -300,6 +307,31 @@ class EventLoopNode(NodeProtocol):
if conversation.needs_compaction():
await self._compact_tiered(ctx, conversation, accumulator)
# 6e'''. Empty response guard — if the LLM returned nothing
# (no text, no real tools, no set_output) and all required
# outputs are already set, accept immediately. This prevents
# wasted iterations when the LLM has genuinely finished its
# work (e.g. after calling set_output in a previous turn).
truly_empty = not assistant_text and not real_tool_results and not outputs_set
if truly_empty and accumulator is not None:
missing = self._get_missing_output_keys(
accumulator, ctx.node_spec.output_keys, ctx.node_spec.nullable_output_keys
)
if not missing:
logger.info(
"[%s] iter=%d: empty response but all outputs set — accepting",
node_id,
iteration,
)
await self._publish_loop_completed(stream_id, node_id, iteration + 1)
latency_ms = int((time.time() - start_time) * 1000)
return NodeResult(
success=True,
output=accumulator.to_dict(),
tokens_used=total_input_tokens + total_output_tokens,
latency_ms=latency_ms,
)
# 6f. Stall detection
recent_responses.append(assistant_text)
if len(recent_responses) > self._config.stall_detection_threshold:
@@ -321,18 +353,17 @@ class EventLoopNode(NodeProtocol):
# 6g. Write cursor checkpoint
await self._write_cursor(ctx, conversation, accumulator, iteration)
# 6h. Client-facing input wait
logger.info(
"[%s] iter=%d: 6h check — client_facing=%s, tool_results=%d",
node_id,
iteration,
ctx.node_spec.client_facing,
len(tool_results_list),
)
if ctx.node_spec.client_facing and not tool_results_list:
# LLM finished speaking (no tool calls) on a client-facing node.
# This is a conversational turn boundary: block for user input
# instead of running the judge.
# 6h. Client-facing input blocking
#
# For client_facing nodes, block for user input whenever the
# LLM finishes without making real tool calls (i.e. the LLM's
# stop_reason is not tool_call). set_output is separated from
# real tools by _run_single_turn, so this correctly treats
# set_output-only turns as conversational boundaries.
#
# After user input, always fall through to judge evaluation
# (6i). The judge handles all acceptance decisions.
if ctx.node_spec.client_facing and not real_tool_results:
if self._shutdown:
await self._publish_loop_completed(stream_id, node_id, iteration + 1)
latency_ms = int((time.time() - start_time) * 1000)
@@ -347,7 +378,6 @@ class EventLoopNode(NodeProtocol):
got_input = await self._await_user_input(ctx)
logger.info("[%s] iter=%d: unblocked, got_input=%s", node_id, iteration, got_input)
if not got_input:
# Shutdown signaled during wait
await self._publish_loop_completed(stream_id, node_id, iteration + 1)
latency_ms = int((time.time() - start_time) * 1000)
return NodeResult(
@@ -357,46 +387,13 @@ class EventLoopNode(NodeProtocol):
latency_ms=latency_ms,
)
# Clear stall detection — user input resets the conversation
recent_responses.clear()
# For nodes with an explicit judge, fall through to judge
# evaluation so the LLM gets structured feedback about missing
# outputs (e.g. "Missing output keys: [...]"). Without this,
# the LLM may generate text like "Ready to proceed!" without
# ever calling set_output, and the judge feedback never reaches it.
#
# For nodes without a judge (HITL review/approval with all-
# nullable keys), keep conversing UNLESS the LLM has already
# set an output — in that case fall through to the implicit
# judge which will ACCEPT and terminate the node.
if self._judge is None:
has_outputs = accumulator and any(
v is not None for v in accumulator.to_dict().values()
)
if not has_outputs:
logger.info(
"[%s] iter=%d: no judge, no outputs, continuing",
node_id,
iteration,
)
continue
logger.info(
"[%s] iter=%d: no judge, outputs set — implicit judge",
node_id,
iteration,
)
else:
logger.info(
"[%s] iter=%d: has judge, falling through to 6i",
node_id,
iteration,
)
# Fall through to judge evaluation (6i)
# 6i. Judge evaluation
should_judge = (
(iteration + 1) % self._config.judge_every_n_turns == 0
or not tool_results_list # no tool calls = natural stop
or not real_tool_results # no real tool calls = natural stop
)
logger.info("[%s] iter=%d: 6i should_judge=%s", node_id, iteration, should_judge)
@@ -406,7 +403,7 @@ class EventLoopNode(NodeProtocol):
conversation,
accumulator,
assistant_text,
tool_results_list,
real_tool_results,
iteration,
)
fb_preview = (verdict.feedback or "")[:200]
@@ -526,16 +523,24 @@ class EventLoopNode(NodeProtocol):
tools: list[Tool],
iteration: int,
accumulator: OutputAccumulator,
) -> tuple[str, list[dict], dict[str, int]]:
) -> tuple[str, list[dict], list[str], dict[str, int]]:
"""Run a single LLM turn with streaming and tool execution.
Returns (assistant_text, tool_results, token_counts).
Returns (assistant_text, real_tool_results, outputs_set, token_counts).
``real_tool_results`` contains only results from actual tools (web_search,
etc.), NOT from the synthetic ``set_output`` tool. ``outputs_set`` lists
the output keys written via ``set_output`` during this turn. This
separation lets the caller treat set_output as a framework concern
rather than a tool-execution concern.
"""
stream_id = ctx.node_id
node_id = ctx.node_id
token_counts: dict[str, int] = {"input": 0, "output": 0}
tool_call_count = 0
final_text = ""
# Track output keys set via set_output across all inner iterations
outputs_set_this_turn: list[str] = []
# Inner tool loop: stream may produce tool calls requiring re-invocation
while True:
@@ -606,10 +611,10 @@ class EventLoopNode(NodeProtocol):
# If no tool calls, turn is complete
if not tool_calls:
return final_text, [], token_counts
return final_text, [], outputs_set_this_turn, token_counts
# Execute tool calls
tool_results: list[dict] = []
# Execute tool calls — separate real tools from set_output
real_tool_results: list[dict] = []
limit_hit = False
executed_in_batch = 0
for tc in tool_calls:
@@ -624,21 +629,21 @@ class EventLoopNode(NodeProtocol):
stream_id, node_id, tc.tool_use_id, tc.tool_name, tc.tool_input
)
# Handle set_output synthetic tool
logger.info(
"[%s] tool_call: %s(%s)",
node_id,
tc.tool_name,
json.dumps(tc.tool_input)[:200],
)
if tc.tool_name == "set_output":
# --- Framework-level set_output handling ---
result = self._handle_set_output(tc.tool_input, ctx.node_spec.output_keys)
result = ToolResult(
tool_use_id=tc.tool_use_id,
content=result.content,
is_error=result.is_error,
)
# Async write-through for set_output
if not result.is_error:
value = tc.tool_input["value"]
# Parse JSON strings into native types so downstream
@@ -652,26 +657,27 @@ class EventLoopNode(NodeProtocol):
except (json.JSONDecodeError, TypeError):
pass
await accumulator.set(tc.tool_input["key"], value)
outputs_set_this_turn.append(tc.tool_input["key"])
else:
# Execute real tool
# --- Real tool execution ---
result = await self._execute_tool(tc)
# Truncate large results to prevent context blowup
result = self._truncate_tool_result(result, tc.tool_name)
real_tool_results.append(
{
"tool_use_id": tc.tool_use_id,
"tool_name": tc.tool_name,
"content": result.content,
"is_error": result.is_error,
}
)
# Record tool result in conversation (write-through)
# Record tool result in conversation (both real and set_output
# go into the conversation for LLM context continuity)
await conversation.add_tool_result(
tool_use_id=tc.tool_use_id,
content=result.content,
is_error=result.is_error,
)
tool_results.append(
{
"tool_use_id": tc.tool_use_id,
"tool_name": tc.tool_name,
"content": result.content,
"is_error": result.is_error,
}
)
# Publish tool call completed
await self._publish_tool_completed(
@@ -708,7 +714,9 @@ class EventLoopNode(NodeProtocol):
content=discard_msg,
is_error=True,
)
tool_results.append(
# Discarded calls go into real_tool_results so the
# caller sees they were attempted (for judge context).
real_tool_results.append(
{
"tool_use_id": tc.tool_use_id,
"tool_name": tc.tool_name,
@@ -716,9 +724,24 @@ class EventLoopNode(NodeProtocol):
"is_error": True,
}
)
# Prune old tool results NOW to prevent context bloat on the
# next turn. The char-based token estimator underestimates
# actual API tokens, so the standard compaction check in the
# outer loop may not trigger in time.
protect = max(2000, self._config.max_history_tokens // 12)
pruned = await conversation.prune_old_tool_results(
protect_tokens=protect,
min_prune_tokens=max(1000, protect // 3),
)
if pruned > 0:
logger.info(
"Post-limit pruning: cleared %d old tool results (budget: %d)",
pruned,
self._config.max_history_tokens,
)
# Limit hit — return from this turn so the judge can
# evaluate instead of looping back for another stream.
return final_text, tool_results, token_counts
return final_text, real_tool_results, outputs_set_this_turn, token_counts
# --- Mid-turn pruning: prevent context blowup within a single turn ---
if conversation.usage_ratio() >= 0.6:
@@ -1025,7 +1048,8 @@ class EventLoopNode(NodeProtocol):
truncated = (
f"[Result from {tool_name}: {len(result.content)} chars — "
f"too large for context, saved to '{filename}'. "
f"Use load_data('{filename}') to read the full result.]\n\n"
f"Use load_data(filename='{filename}', data_dir='{spill_dir}') "
f"to read the full result.]\n\n"
f"Preview:\n{preview}"
)
logger.info(
@@ -1244,9 +1268,11 @@ class EventLoopNode(NodeProtocol):
# 5. Spillover files hint
if self._config.spillover_dir:
spill = self._config.spillover_dir
parts.append(
"NOTE: Large tool results were saved to files. "
"Use load_data('<filename>') to read them."
f"Use load_data(filename='<filename>', data_dir='{spill}') "
"to read them."
)
# 6. Tool call history (prevent re-calling tools)
+61 -11
View File
@@ -14,6 +14,7 @@ import logging
import warnings
from collections.abc import Callable
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
from framework.graph.edge import EdgeCondition, EdgeSpec, GraphSpec
@@ -130,6 +131,7 @@ class GraphExecutor:
parallel_config: ParallelExecutionConfig | None = None,
event_bus: Any | None = None,
stream_id: str = "",
storage_path: str | Path | None = None,
):
"""
Initialize the executor.
@@ -146,6 +148,7 @@ class GraphExecutor:
parallel_config: Configuration for parallel execution behavior
event_bus: Optional event bus for emitting node lifecycle events
stream_id: Stream ID for event correlation
storage_path: Optional base path for conversation persistence
"""
self.runtime = runtime
self.llm = llm
@@ -157,6 +160,7 @@ class GraphExecutor:
self.logger = logging.getLogger(__name__)
self._event_bus = event_bus
self._stream_id = stream_id
self._storage_path = Path(storage_path) if storage_path else None
# Initialize output cleaner
self.cleansing_config = cleansing_config or CleansingConfig()
@@ -380,8 +384,16 @@ class GraphExecutor:
)
if result.success:
# Validate output before accepting it
if result.output and node_spec.output_keys:
# Validate output before accepting it.
# Skip for event_loop nodes — their judge system is
# the sole acceptance mechanism (see WP-8). Empty
# strings and other flexible outputs are legitimate
# for LLM-driven nodes that already passed the judge.
if (
result.output
and node_spec.output_keys
and node_spec.node_type != "event_loop"
):
validation = self.validator.validate_all(
output=result.output,
expected_keys=node_spec.output_keys,
@@ -817,11 +829,43 @@ class GraphExecutor:
)
if node_spec.node_type == "event_loop":
# Event loop nodes must be pre-registered (like function nodes)
raise RuntimeError(
f"EventLoopNode '{node_spec.id}' not found in registry. "
"Register it with executor.register_node() before execution."
# Auto-create EventLoopNode with sensible defaults.
# Custom configs can still be pre-registered via node_registry.
from framework.graph.event_loop_node import EventLoopNode, LoopConfig
# Create a FileConversationStore if a storage path is available
conv_store = None
if self._storage_path:
from framework.storage.conversation_store import FileConversationStore
store_path = self._storage_path / "conversations" / node_spec.id
conv_store = FileConversationStore(base_path=store_path)
# Auto-configure spillover directory for large tool results.
# When a tool result exceeds max_tool_result_chars, the full
# content is written to spillover_dir and the agent gets a
# truncated preview with instructions to use load_data().
spillover = None
if self._storage_path:
spillover = str(self._storage_path / "data")
node = EventLoopNode(
event_bus=self._event_bus,
judge=None, # implicit judge: accept when output_keys are filled
config=LoopConfig(
max_iterations=100 if node_spec.client_facing else 50,
max_tool_calls_per_turn=10,
stall_detection_threshold=3,
max_history_tokens=32000,
max_tool_result_chars=3_000,
spillover_dir=spillover,
),
tool_executor=self.tool_executor,
conversation_store=conv_store,
)
# Cache so inject_event() is reachable for client-facing input
self.node_registry[node_spec.id] = node
return node
# Should never reach here due to validation above
raise RuntimeError(f"Unhandled node type: {node_spec.node_type}")
@@ -850,9 +894,12 @@ class GraphExecutor:
source_node_name=current_node_spec.name if current_node_spec else current_node_id,
target_node_name=target_node_spec.name if target_node_spec else edge.target,
):
# Validate and clean output before mapping inputs
# Validate and clean output before mapping inputs.
# Use full memory state (not just result.output) because
# target input_keys may come from earlier nodes in the
# graph, not only from the immediate source node.
if self.cleansing_config.enabled and target_node_spec:
output_to_validate = result.output
output_to_validate = memory.read_all()
validation = self.output_cleaner.validate_output(
output=output_to_validate,
@@ -1048,10 +1095,13 @@ class GraphExecutor:
branch.status = "running"
try:
# Validate and clean output before mapping inputs (same as _follow_edges)
# Validate and clean output before mapping inputs (same as _follow_edges).
# Use full memory state since target input_keys may come
# from earlier nodes, not just the immediate source.
if self.cleansing_config.enabled and node_spec:
mem_snapshot = memory.read_all()
validation = self.output_cleaner.validate_output(
output=source_result.output,
output=mem_snapshot,
source_node_id=source_node_spec.id if source_node_spec else "unknown",
target_node_spec=node_spec,
)
@@ -1062,7 +1112,7 @@ class GraphExecutor:
f"{branch.node_id}: {validation.errors}"
)
cleaned_output = self.output_cleaner.clean_output(
output=source_result.output,
output=mem_snapshot,
source_node_id=source_node_spec.id if source_node_spec else "unknown",
target_node_spec=node_spec,
validation_errors=validation.errors,
+4 -1
View File
@@ -144,8 +144,11 @@ class OutputCleaner:
errors = []
warnings = []
# Check 1: Required input keys present
# Check 1: Required input keys present (skip nullable keys)
nullable = set(getattr(target_node_spec, "nullable_output_keys", None) or [])
for key in target_node_spec.input_keys:
if key in nullable:
continue
if key not in output:
errors.append(f"Missing required key: '{key}'")
continue
+10 -6
View File
@@ -572,17 +572,21 @@ class LiteLLMProvider(LLMProvider):
# and we skip the retry path — nothing was yielded in vain.)
has_content = accumulated_text or tool_calls_acc
if not has_content and attempt < RATE_LIMIT_MAX_RETRIES:
# If the conversation ends with an assistant message,
# an empty stream is expected (nothing new to say).
# Don't retry — just flush whatever we have.
# If the conversation ends with an assistant or tool
# message, an empty stream is expected — the LLM has
# nothing new to say. Don't burn retries on this;
# let the caller (EventLoopNode) decide what to do.
# Typical case: client_facing node where the LLM set
# all outputs via set_output tool calls, and the tool
# results are the last messages.
last_role = next(
(m["role"] for m in reversed(full_messages) if m.get("role") != "system"),
None,
)
if last_role == "assistant":
if last_role in ("assistant", "tool"):
logger.debug(
"[stream] Empty response after assistant message — "
"expected, not retrying."
"[stream] Empty response after %s message — expected, not retrying.",
last_role,
)
for event in tail_events:
yield event
+56 -15
View File
@@ -1105,17 +1105,30 @@ def validate_graph() -> str:
errors.append(f"Unreachable nodes: {unreachable}")
# === CONTEXT FLOW VALIDATION ===
# Build dependency map (node_id -> list of nodes it depends on)
# Build dependency maps — separate forward edges from feedback edges.
# Feedback edges (priority < 0) create cycles; they must not block the
# topological sort. Context they carry arrives on *revisits*, not on
# the first execution of a node.
feedback_edge_ids = {e.id for e in session.edges if e.priority < 0}
forward_dependencies: dict[str, list[str]] = {node.id: [] for node in session.nodes}
feedback_sources: dict[str, list[str]] = {node.id: [] for node in session.nodes}
# Combined map kept for error-message generation (all deps)
dependencies: dict[str, list[str]] = {node.id: [] for node in session.nodes}
for edge in session.edges:
if edge.target in dependencies:
dependencies[edge.target].append(edge.source)
if edge.target not in forward_dependencies:
continue
dependencies[edge.target].append(edge.source)
if edge.id in feedback_edge_ids:
feedback_sources[edge.target].append(edge.source)
else:
forward_dependencies[edge.target].append(edge.source)
# Build output map (node_id -> keys it produces)
node_outputs: dict[str, set[str]] = {node.id: set(node.output_keys) for node in session.nodes}
# Compute available context for each node (what keys it can read)
# Using topological order
# Using topological order on the forward-edge DAG
available_context: dict[str, set[str]] = {}
computed = set()
nodes_by_id = {n.id: n for n in session.nodes}
@@ -1125,7 +1138,8 @@ def validate_graph() -> str:
# Entry nodes can only read from initial context
initial_context_keys: set[str] = set()
# Compute in topological order
# Compute in topological order (forward edges only — feedback edges
# don't block, since their context arrives on revisits)
remaining = {n.id for n in session.nodes}
max_iterations = len(session.nodes) * 2
@@ -1134,18 +1148,23 @@ def validate_graph() -> str:
break
for node_id in list(remaining):
deps = dependencies.get(node_id, [])
fwd_deps = forward_dependencies.get(node_id, [])
# Can compute if all dependencies are computed (or no dependencies)
if all(d in computed for d in deps):
# Collect outputs from all dependencies
# Can compute if all FORWARD dependencies are computed
if all(d in computed for d in fwd_deps):
# Collect outputs from all forward dependencies
available = set(initial_context_keys)
for dep_id in deps:
# Add outputs from dependency
for dep_id in fwd_deps:
available.update(node_outputs.get(dep_id, set()))
# Also add what was available to the dependency (transitive)
available.update(available_context.get(dep_id, set()))
# Also include context from already-computed feedback
# sources (bonus, not blocking)
for fb_src in feedback_sources.get(node_id, []):
if fb_src in computed:
available.update(node_outputs.get(fb_src, set()))
available.update(available_context.get(fb_src, set()))
available_context[node_id] = available
computed.add(node_id)
remaining.remove(node_id)
@@ -1155,15 +1174,37 @@ def validate_graph() -> str:
context_errors = []
context_warnings = []
missing_inputs: dict[str, list[str]] = {}
feedback_only_inputs: dict[str, list[str]] = {}
for node in session.nodes:
available = available_context.get(node.id, set())
for input_key in node.input_keys:
if input_key not in available:
if node.id not in missing_inputs:
missing_inputs[node.id] = []
missing_inputs[node.id].append(input_key)
# Check if this input is provided by a feedback source
fb_provides = set()
for fb_src in feedback_sources.get(node.id, []):
fb_provides.update(node_outputs.get(fb_src, set()))
fb_provides.update(available_context.get(fb_src, set()))
if input_key in fb_provides:
# Input arrives via feedback edge — warn, don't error
if node.id not in feedback_only_inputs:
feedback_only_inputs[node.id] = []
feedback_only_inputs[node.id].append(input_key)
else:
if node.id not in missing_inputs:
missing_inputs[node.id] = []
missing_inputs[node.id].append(input_key)
# Warn about feedback-only inputs (available on revisits, not first run)
for node_id, fb_keys in feedback_only_inputs.items():
fb_srcs = feedback_sources.get(node_id, [])
context_warnings.append(
f"Node '{node_id}' input(s) {fb_keys} are only provided via "
f"feedback edge(s) from {fb_srcs}. These will be available on "
f"revisits but not on the first execution."
)
# Generate helpful error messages
for node_id, missing in missing_inputs.items():
+10 -1
View File
@@ -280,7 +280,16 @@ class ToolRegistry:
return
base_dir = config_path.parent
for server_config in config.get("servers", []):
# Support both formats:
# {"servers": [{"name": "x", ...}]} (list format)
# {"server-name": {"transport": ...}, ...} (dict format)
server_list = config.get("servers", [])
if not server_list and "servers" not in config:
# Treat top-level keys as server names
server_list = [{"name": name, **cfg} for name, cfg in config.items()]
for server_config in server_list:
cwd = server_config.get("cwd")
if cwd and not Path(cwd).is_absolute():
server_config["cwd"] = str((base_dir / cwd).resolve())
+19
View File
@@ -296,6 +296,25 @@ class AgentRuntime:
raise ValueError(f"Entry point '{entry_point_id}' not found")
return await stream.wait_for_completion(exec_id, timeout)
async def inject_input(self, node_id: str, content: str) -> bool:
"""Inject user input into a running client-facing node.
Routes input to the EventLoopNode identified by ``node_id``
across all active streams. Used by the TUI ChatRepl to deliver
user responses during client-facing node execution.
Args:
node_id: The node currently waiting for input
content: The user's input text
Returns:
True if input was delivered, False if no matching node found
"""
for stream in self._streams.values():
if await stream.inject_input(node_id, content):
return True
return False
async def get_goal_progress(self) -> dict[str, Any]:
"""
Evaluate goal progress across all streams.
+26 -1
View File
@@ -153,6 +153,7 @@ class ExecutionStream:
# Execution tracking
self._active_executions: dict[str, ExecutionContext] = {}
self._execution_tasks: dict[str, asyncio.Task] = {}
self._active_executors: dict[str, GraphExecutor] = {}
self._execution_results: OrderedDict[str, ExecutionResult] = OrderedDict()
self._execution_result_times: dict[str, float] = {}
self._completion_events: dict[str, asyncio.Event] = {}
@@ -237,6 +238,21 @@ class ExecutionStream:
)
)
async def inject_input(self, node_id: str, content: str) -> bool:
"""Inject user input into a running client-facing EventLoopNode.
Searches active executors for a node matching ``node_id`` and calls
its ``inject_event()`` method to unblock ``_await_user_input()``.
Returns True if input was delivered, False otherwise.
"""
for executor in self._active_executors.values():
node = executor.node_registry.get(node_id)
if node is not None and hasattr(node, "inject_event"):
await node.inject_event(content)
return True
return False
async def execute(
self,
input_data: dict[str, Any],
@@ -314,7 +330,10 @@ class ExecutionStream:
# Create runtime adapter for this execution
runtime_adapter = StreamRuntimeAdapter(self._runtime, execution_id)
# Create executor for this execution
# Create executor for this execution.
# Scope storage by execution_id so each execution gets
# fresh conversations and spillover directories.
exec_storage = self._storage.base_path / "sessions" / execution_id
executor = GraphExecutor(
runtime=runtime_adapter,
llm=self._llm,
@@ -322,7 +341,10 @@ class ExecutionStream:
tool_executor=self._tool_executor,
event_bus=self._event_bus,
stream_id=self.stream_id,
storage_path=exec_storage,
)
# Track executor so inject_input() can reach EventLoopNode instances
self._active_executors[execution_id] = executor
# Create modified graph with entry point
# We need to override the entry_node to use our entry point
@@ -336,6 +358,9 @@ class ExecutionStream:
session_state=ctx.session_state,
)
# Clean up executor reference
self._active_executors.pop(execution_id, None)
# Store result with retention
self._record_execution_result(execution_id, result)
+3 -1
View File
@@ -372,7 +372,9 @@ class AdenTUI(App):
elif et == EventType.EXECUTION_FAILED:
self.chat_repl.handle_execution_failed(event.data.get("error", "Unknown error"))
elif et == EventType.CLIENT_INPUT_REQUESTED:
self.chat_repl.handle_input_requested(event.data.get("prompt", ""))
self.chat_repl.handle_input_requested(
event.node_id or event.data.get("node_id", ""),
)
# --- Graph view events ---
if et in (
+66 -8
View File
@@ -6,6 +6,12 @@ Streaming display approach:
(Label.update() replaces text in-place, unlike RichLog which is append-only).
- On EXECUTION_COMPLETED, the final output is written to RichLog as permanent history.
- Tool events are written directly to RichLog as discrete status lines.
Client-facing input:
- When a client_facing=True EventLoopNode emits CLIENT_INPUT_REQUESTED, the
ChatRepl transitions to "waiting for input" state: input is re-enabled and
subsequent submissions are routed to runtime.inject_input() instead of
starting a new execution.
"""
import asyncio
@@ -66,6 +72,8 @@ class ChatRepl(Vertical):
self.runtime = runtime
self._current_exec_id: str | None = None
self._streaming_snapshot: str = ""
self._waiting_for_input: bool = False
self._input_node_id: str | None = None
# Dedicated event loop for agent execution.
# Keeps blocking runtime code (LLM calls, MCP tools) off
@@ -97,11 +105,38 @@ class ChatRepl(Vertical):
history.write("[bold cyan]Chat REPL Ready[/bold cyan] — Type your input below\n")
async def on_input_submitted(self, message: Input.Submitted) -> None:
"""Handle input submission — fire-and-forget via trigger()."""
"""Handle input submission — either start new execution or inject input."""
user_input = message.value.strip()
if not user_input:
return
# Client-facing input: route to the waiting node
if self._waiting_for_input and self._input_node_id:
self._write_history(f"[bold green]You:[/bold green] {user_input}")
message.input.value = ""
# Disable input while agent processes the response
chat_input = self.query_one("#chat-input", Input)
chat_input.disabled = True
chat_input.placeholder = "Enter input for agent..."
self._waiting_for_input = False
indicator = self.query_one("#processing-indicator", Label)
indicator.update("Thinking...")
node_id = self._input_node_id
self._input_node_id = None
try:
future = asyncio.run_coroutine_threadsafe(
self.runtime.inject_input(node_id, user_input),
self._agent_loop,
)
await asyncio.wrap_future(future)
except Exception as e:
self._write_history(f"[bold red]Error delivering input:[/bold red] {e}")
return
# Double-submit guard: reject input while an execution is in-flight
if self._current_exec_id is not None:
self._write_history("[dim]Agent is still running — please wait.[/dim]")
@@ -206,17 +241,23 @@ class ChatRepl(Vertical):
indicator = self.query_one("#processing-indicator", Label)
indicator.display = False
# Write the final output to permanent history
output_str = str(output.get("output_string", output))
self._write_history(f"[bold blue]Agent:[/bold blue] {output_str}")
# Write the final streaming snapshot to permanent history (if any)
if self._streaming_snapshot:
self._write_history(f"[bold blue]Agent:[/bold blue] {self._streaming_snapshot}")
else:
output_str = str(output.get("output_string", output))
self._write_history(f"[bold blue]Agent:[/bold blue] {output_str}")
self._write_history("") # separator
self._current_exec_id = None
self._streaming_snapshot = ""
self._waiting_for_input = False
self._input_node_id = None
# Re-enable input
chat_input = self.query_one("#chat-input", Input)
chat_input.disabled = False
chat_input.placeholder = "Enter input for agent..."
chat_input.focus()
def handle_execution_failed(self, error: str) -> None:
@@ -229,17 +270,34 @@ class ChatRepl(Vertical):
self._current_exec_id = None
self._streaming_snapshot = ""
self._waiting_for_input = False
self._input_node_id = None
# Re-enable input
chat_input = self.query_one("#chat-input", Input)
chat_input.disabled = False
chat_input.placeholder = "Enter input for agent..."
chat_input.focus()
def handle_input_requested(self, prompt: str) -> None:
"""Handle the agent requesting input from the user."""
def handle_input_requested(self, node_id: str) -> None:
"""Handle a client-facing node requesting user input.
Transitions to 'waiting for input' state: flushes the current
streaming snapshot to history, re-enables the input widget,
and sets a flag so the next submission routes to inject_input().
"""
# Flush accumulated streaming text as agent output
if self._streaming_snapshot:
self._write_history(f"[bold blue]Agent:[/bold blue] {self._streaming_snapshot}")
self._streaming_snapshot = ""
self._waiting_for_input = True
self._input_node_id = node_id or None
indicator = self.query_one("#processing-indicator", Label)
indicator.update(f"Input requested: {prompt}")
indicator.update("Waiting for your input...")
chat_input = self.query_one("#chat-input", Input)
chat_input.placeholder = prompt or "Agent is waiting for input..."
chat_input.disabled = False
chat_input.placeholder = "Type your response..."
chat_input.focus()
+8 -4
View File
@@ -104,8 +104,10 @@ def test_event_loop_node_spec_accepted():
# --- _get_node_implementation() tests ---
def test_unregistered_event_loop_raises(runtime):
"""An event_loop node not in the registry should raise RuntimeError."""
def test_unregistered_event_loop_auto_creates(runtime):
"""An event_loop node not in the registry should be auto-created."""
from framework.graph.event_loop_node import EventLoopNode
spec = NodeSpec(
id="el1",
name="Event Loop",
@@ -114,8 +116,10 @@ def test_unregistered_event_loop_raises(runtime):
)
executor = GraphExecutor(runtime=runtime)
with pytest.raises(RuntimeError, match="not found in registry"):
executor._get_node_implementation(spec)
result = executor._get_node_implementation(spec)
assert isinstance(result, EventLoopNode)
# Auto-created node should be cached in registry
assert "el1" in executor.node_registry
def test_registered_event_loop_returns_impl(runtime):
+5
View File
@@ -26,6 +26,7 @@ from .email_tool import register_tools as register_email
from .example_tool import register_tools as register_example
from .file_system_toolkits.apply_diff import register_tools as register_apply_diff
from .file_system_toolkits.apply_patch import register_tools as register_apply_patch
from .file_system_toolkits.data_tools import register_tools as register_data_tools
from .file_system_toolkits.execute_command_tool import (
register_tools as register_execute_command,
)
@@ -82,6 +83,7 @@ def register_all_tools(
register_apply_patch(mcp)
register_grep_search(mcp)
register_execute_command(mcp)
register_data_tools(mcp)
register_csv(mcp)
return [
@@ -97,6 +99,9 @@ def register_all_tools(
"apply_patch",
"grep_search",
"execute_command_tool",
"load_data",
"save_data",
"list_data_files",
"csv_read",
"csv_write",
"csv_append",
@@ -0,0 +1,3 @@
from .data_tools import register_tools
__all__ = ["register_tools"]
@@ -0,0 +1,179 @@
"""
Data Tools - Load, save, and list data files for agent pipelines.
These tools let agents store large intermediate results in files and
retrieve them with pagination, keeping the LLM conversation context small.
Used in conjunction with the spillover system: when a tool result is too
large, the framework writes it to a file and the agent can load it back
with load_data().
"""
from __future__ import annotations
import json
from pathlib import Path
from mcp.server.fastmcp import FastMCP
def register_tools(mcp: FastMCP) -> None:
"""Register data management tools with the MCP server."""
@mcp.tool()
def save_data(filename: str, data: str, data_dir: str) -> dict:
"""
Purpose
Save data to a file for later retrieval by this or downstream nodes.
When to use
Store large results (search results, profiles, analysis) instead
of passing them inline through set_output.
Returns a brief summary with the filename to reference later.
Rules & Constraints
filename must be a simple name like 'results.json' no paths or '..'
data_dir must be the absolute path to the data directory
Args:
filename: Simple filename like 'github_users.json'. No paths or '..'.
data: The string data to write (typically JSON).
data_dir: Absolute path to the data directory.
Returns:
Dict with success status and file metadata, or error dict
"""
if not filename or ".." in filename or "/" in filename or "\\" in filename:
return {"error": "Invalid filename. Use simple names like 'users.json'"}
if not data_dir:
return {"error": "data_dir is required"}
try:
dir_path = Path(data_dir)
dir_path.mkdir(parents=True, exist_ok=True)
path = dir_path / filename
path.write_text(data, encoding="utf-8")
lines = data.count("\n") + 1
return {
"success": True,
"filename": filename,
"size_bytes": len(data.encode("utf-8")),
"lines": lines,
"preview": data[:200] + ("..." if len(data) > 200 else ""),
}
except Exception as e:
return {"error": f"Failed to save data: {str(e)}"}
@mcp.tool()
def load_data(
filename: str,
data_dir: str,
offset: int = 0,
limit: int = 50,
) -> dict:
"""
Purpose
Load data from a previously saved file with pagination.
When to use
Retrieve large tool results that were spilled to disk.
Read data saved by save_data or by the spillover system.
Page through large files without loading everything into context.
Rules & Constraints
filename must match a file in data_dir
Returns a page of lines with metadata about the full file
Args:
filename: The filename to load (as shown in spillover messages or save_data results).
data_dir: Absolute path to the data directory.
offset: 0-based line number to start reading from. Default 0.
limit: Max number of lines to return. Default 50.
Returns:
Dict with content, pagination info, and metadata
Examples:
load_data('users.json', '/path/to/data') # first 50 lines
load_data('users.json', '/path/to/data', offset=50, limit=50) # next 50
load_data('users.json', '/path/to/data', limit=200) # first 200 lines
"""
if not filename or ".." in filename or "/" in filename or "\\" in filename:
return {"error": "Invalid filename"}
if not data_dir:
return {"error": "data_dir is required"}
try:
offset = int(offset)
limit = int(limit)
path = Path(data_dir) / filename
if not path.exists():
return {"error": f"File not found: {filename}"}
content = path.read_text(encoding="utf-8")
size_bytes = len(content.encode("utf-8"))
# If content is a single long line, try to pretty-print JSON so
# line-based pagination actually works.
all_lines = content.split("\n")
if len(all_lines) <= 2 and size_bytes > 500:
try:
parsed = json.loads(content)
content = json.dumps(parsed, indent=2, ensure_ascii=False)
all_lines = content.split("\n")
except (json.JSONDecodeError, TypeError, ValueError):
pass
total = len(all_lines)
start = min(offset, total)
end = min(start + limit, total)
sliced = all_lines[start:end]
return {
"success": True,
"filename": filename,
"content": "\n".join(sliced),
"total_lines": total,
"size_bytes": size_bytes,
"offset": start,
"lines_returned": len(sliced),
"has_more": end < total,
}
except Exception as e:
return {"error": f"Failed to load data: {str(e)}"}
@mcp.tool()
def list_data_files(data_dir: str) -> dict:
"""
Purpose
List all data files in the data directory.
When to use
Discover what intermediate results or spillover files are available.
Check what data was saved by previous nodes in the pipeline.
Args:
data_dir: Absolute path to the data directory.
Returns:
Dict with list of files and their sizes
"""
if not data_dir:
return {"error": "data_dir is required"}
try:
dir_path = Path(data_dir)
if not dir_path.exists():
return {"files": []}
files = []
for f in sorted(dir_path.iterdir()):
if f.is_file():
files.append(
{
"filename": f.name,
"size_bytes": f.stat().st_size,
}
)
return {"files": files}
except Exception as e:
return {"error": f"Failed to list data files: {str(e)}"}
Generated
+62
View File
@@ -769,6 +769,11 @@ dependencies = [
{ name = "tools" },
]
[package.optional-dependencies]
tui = [
{ name = "textual" },
]
[package.dev-dependencies]
dev = [
{ name = "ruff" },
@@ -786,8 +791,10 @@ requires-dist = [
{ name = "pytest", specifier = ">=8.0" },
{ name = "pytest-asyncio", specifier = ">=0.23" },
{ name = "pytest-xdist", specifier = ">=3.0" },
{ name = "textual", marker = "extra == 'tui'", specifier = ">=0.75.0" },
{ name = "tools", editable = "tools" },
]
provides-extras = ["tui"]
[package.metadata.requires-dev]
dev = [
@@ -1310,6 +1317,18 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/81/db/e655086b7f3a705df045bf0933bdd9c2f79bb3c97bfef1384598bb79a217/keyring-25.7.0-py3-none-any.whl", hash = "sha256:be4a0b195f149690c166e850609a477c532ddbfbaed96a404d4e43f8d5e2689f", size = 39160, upload-time = "2025-11-16T16:26:08.402Z" },
]
[[package]]
name = "linkify-it-py"
version = "2.0.3"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "uc-micro-py" },
]
sdist = { url = "https://files.pythonhosted.org/packages/2a/ae/bb56c6828e4797ba5a4821eec7c43b8bf40f69cda4d4f5f8c8a2810ec96a/linkify-it-py-2.0.3.tar.gz", hash = "sha256:68cda27e162e9215c17d786649d1da0021a451bdc436ef9e0fa0ba5234b9b048", size = 27946, upload-time = "2024-02-04T14:48:04.179Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/04/1e/b832de447dee8b582cac175871d2f6c3d5077cc56d5575cadba1fd1cccfa/linkify_it_py-2.0.3-py3-none-any.whl", hash = "sha256:6bcbc417b0ac14323382aef5c5192c0075bf8a9d6b41820a2b66371eac6b6d79", size = 19820, upload-time = "2024-02-04T14:48:02.496Z" },
]
[[package]]
name = "litellm"
version = "1.81.7"
@@ -1408,6 +1427,11 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/94/54/e7d793b573f298e1c9013b8c4dade17d481164aa517d1d7148619c2cedbf/markdown_it_py-4.0.0-py3-none-any.whl", hash = "sha256:87327c59b172c5011896038353a81343b6754500a08cd7a4973bb48c6d578147", size = 87321, upload-time = "2025-08-11T12:57:51.923Z" },
]
[package.optional-dependencies]
linkify = [
{ name = "linkify-it-py" },
]
[[package]]
name = "markupsafe"
version = "3.0.3"
@@ -1507,6 +1531,18 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/fd/d9/eaa1f80170d2b7c5ba23f3b59f766f3a0bb41155fbc32a69adfa1adaaef9/mcp-1.26.0-py3-none-any.whl", hash = "sha256:904a21c33c25aa98ddbeb47273033c435e595bbacfdb177f4bd87f6dceebe1ca", size = 233615, upload-time = "2026-01-24T19:40:30.652Z" },
]
[[package]]
name = "mdit-py-plugins"
version = "0.5.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "markdown-it-py" },
]
sdist = { url = "https://files.pythonhosted.org/packages/b2/fd/a756d36c0bfba5f6e39a1cdbdbfdd448dc02692467d83816dff4592a1ebc/mdit_py_plugins-0.5.0.tar.gz", hash = "sha256:f4918cb50119f50446560513a8e311d574ff6aaed72606ddae6d35716fe809c6", size = 44655, upload-time = "2025-08-11T07:25:49.083Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/fb/86/dd6e5db36df29e76c7a7699123569a4a18c1623ce68d826ed96c62643cae/mdit_py_plugins-0.5.0-py3-none-any.whl", hash = "sha256:07a08422fc1936a5d26d146759e9155ea466e842f5ab2f7d2266dd084c8dab1f", size = 57205, upload-time = "2025-08-11T07:25:47.597Z" },
]
[[package]]
name = "mdurl"
version = "0.1.2"
@@ -2957,6 +2993,23 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/81/0d/13d1d239a25cbfb19e740db83143e95c772a1fe10202dda4b76792b114dd/starlette-0.52.1-py3-none-any.whl", hash = "sha256:0029d43eb3d273bc4f83a08720b4912ea4b071087a3b48db01b7c839f7954d74", size = 74272, upload-time = "2026-01-18T13:34:09.188Z" },
]
[[package]]
name = "textual"
version = "7.5.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "markdown-it-py", extra = ["linkify"] },
{ name = "mdit-py-plugins" },
{ name = "platformdirs" },
{ name = "pygments" },
{ name = "rich" },
{ name = "typing-extensions" },
]
sdist = { url = "https://files.pythonhosted.org/packages/9f/38/7d169a765993efde5095c70a668bf4f5831bb7ac099e932f2783e9b71abf/textual-7.5.0.tar.gz", hash = "sha256:c730cba1e3d704e8f1ca915b6a3af01451e3bca380114baacf6abf87e9dac8b6", size = 1592319, upload-time = "2026-01-30T13:46:39.881Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/9c/78/96ddb99933e11d91bc6e05edae23d2687e44213066bcbaca338898c73c47/textual-7.5.0-py3-none-any.whl", hash = "sha256:849dfee9d705eab3b2d07b33152b7bd74fb1f5056e002873cc448bce500c6374", size = 718164, upload-time = "2026-01-30T13:46:37.635Z" },
]
[[package]]
name = "tiktoken"
version = "0.12.0"
@@ -3215,6 +3268,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/c7/b0/003792df09decd6849a5e39c28b513c06e84436a54440380862b5aeff25d/tzdata-2025.3-py2.py3-none-any.whl", hash = "sha256:06a47e5700f3081aab02b2e513160914ff0694bce9947d6b76ebd6bf57cfc5d1", size = 348521, upload-time = "2025-12-13T17:45:33.889Z" },
]
[[package]]
name = "uc-micro-py"
version = "1.0.3"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/91/7a/146a99696aee0609e3712f2b44c6274566bc368dfe8375191278045186b8/uc-micro-py-1.0.3.tar.gz", hash = "sha256:d321b92cff673ec58027c04015fcaa8bb1e005478643ff4a500882eaab88c48a", size = 6043, upload-time = "2024-02-09T16:52:01.654Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/37/87/1f677586e8ac487e29672e4b17455758fce261de06a0d086167bb760361a/uc_micro_py-1.0.3-py3-none-any.whl", hash = "sha256:db1dffff340817673d7b466ec86114a9dc0e9d4d9b5ba229d9d60e5c12600cd5", size = 6229, upload-time = "2024-02-09T16:52:00.371Z" },
]
[[package]]
name = "urllib3"
version = "2.6.3"