feat: integrate agent builder with tui
This commit is contained in:
@@ -267,7 +267,7 @@ This returns JSON with all the goal, nodes, edges, and MCP server configurations
|
||||
- NOT: `{"first-node-id": ["input_keys"]}` (WRONG)
|
||||
- NOT: `{"first-node-id"}` (WRONG - this is a set)
|
||||
|
||||
**Use the example agent** at `.claude/skills/building-agents-construction/examples/online_research_agent/` as a template for file structure and patterns.
|
||||
**Use the example agent** at `.claude/skills/building-agents-construction/examples/deep_research_agent/` as a template for file structure and patterns. It demonstrates: STEP 1/STEP 2 prompts, client-facing nodes, feedback loops, nullable_output_keys, and data tools.
|
||||
|
||||
**AFTER writing all files, tell the user:**
|
||||
|
||||
@@ -354,7 +354,7 @@ mcp__agent-builder__get_session_status()
|
||||
|
||||
## REFERENCE: System Prompt Best Practice
|
||||
|
||||
For event_loop nodes, instruct the LLM to use `set_output` for structured outputs:
|
||||
For **internal** event_loop nodes (not client-facing), instruct the LLM to use `set_output`:
|
||||
|
||||
```
|
||||
Use set_output(key, value) to store your results. For example:
|
||||
@@ -363,71 +363,55 @@ Use set_output(key, value) to store your results. For example:
|
||||
Do NOT return raw JSON. Use the set_output tool to produce outputs.
|
||||
```
|
||||
|
||||
For **client-facing** event_loop nodes, use the STEP 1/STEP 2 pattern:
|
||||
|
||||
```
|
||||
**STEP 1 — Respond to the user (text only, NO tool calls):**
|
||||
[Present information, ask questions, etc.]
|
||||
|
||||
**STEP 2 — After the user responds, call set_output:**
|
||||
- set_output("key", "value based on user's response")
|
||||
```
|
||||
|
||||
This prevents the LLM from calling `set_output` before the user has had a chance to respond. The "NO tool calls" instruction in STEP 1 ensures the node blocks for user input before proceeding.
|
||||
|
||||
---
|
||||
|
||||
## CRITICAL: EventLoopNode Registration
|
||||
## EventLoopNode Runtime
|
||||
|
||||
**`AgentRuntime` does NOT support `event_loop` nodes.** The `AgentRuntime` / `create_agent_runtime()` path creates `GraphExecutor` instances internally without passing a `node_registry`, causing all `event_loop` nodes to fail at runtime with:
|
||||
|
||||
```
|
||||
EventLoopNode 'node-id' not found in registry. Register it with executor.register_node() before execution.
|
||||
```
|
||||
|
||||
**The correct pattern**: Use `GraphExecutor` directly with a `node_registry` dict containing `EventLoopNode` instances:
|
||||
EventLoopNodes are **auto-created** by `GraphExecutor` at runtime. Both direct `GraphExecutor` and `AgentRuntime` / `create_agent_runtime()` handle event_loop nodes automatically. No manual `node_registry` setup is needed.
|
||||
|
||||
```python
|
||||
from framework.graph.executor import GraphExecutor, ExecutionResult
|
||||
from framework.graph.event_loop_node import EventLoopNode, LoopConfig
|
||||
from framework.runtime.event_bus import EventBus
|
||||
from framework.runtime.core import Runtime # REQUIRED - executor calls runtime.start_run()
|
||||
# Direct execution
|
||||
from framework.graph.executor import GraphExecutor
|
||||
from framework.runtime.core import Runtime
|
||||
|
||||
# 1. Build node_registry with EventLoopNode instances
|
||||
event_bus = EventBus()
|
||||
node_registry = {}
|
||||
for node_spec in nodes:
|
||||
if node_spec.node_type == "event_loop":
|
||||
node_registry[node_spec.id] = EventLoopNode(
|
||||
event_bus=event_bus,
|
||||
judge=None, # implicit judge: accepts when output_keys are filled
|
||||
config=LoopConfig(
|
||||
max_iterations=50,
|
||||
max_tool_calls_per_turn=15,
|
||||
stall_detection_threshold=3,
|
||||
max_history_tokens=32000,
|
||||
),
|
||||
tool_executor=tool_executor,
|
||||
)
|
||||
|
||||
# 2. Create Runtime for run tracking (GraphExecutor calls runtime.start_run())
|
||||
storage_path = Path.home() / ".hive" / "my_agent"
|
||||
storage_path.mkdir(parents=True, exist_ok=True)
|
||||
runtime = Runtime(storage_path)
|
||||
|
||||
# 3. Create GraphExecutor WITH node_registry and runtime
|
||||
executor = GraphExecutor(
|
||||
runtime=runtime, # NOT None - executor needs this for run tracking
|
||||
runtime=runtime,
|
||||
llm=llm,
|
||||
tools=tools,
|
||||
tool_executor=tool_executor,
|
||||
node_registry=node_registry, # EventLoopNode instances
|
||||
storage_path=storage_path,
|
||||
)
|
||||
|
||||
# 4. Execute
|
||||
result = await executor.execute(graph=graph, goal=goal, input_data=input_data)
|
||||
```
|
||||
|
||||
**DO NOT use `AgentRuntime` or `create_agent_runtime()` for agents with `event_loop` nodes.**
|
||||
|
||||
**DO NOT pass `runtime=None` to `GraphExecutor`** — it will crash with `'NoneType' object has no attribute 'start_run'`.
|
||||
|
||||
---
|
||||
|
||||
## COMMON MISTAKES TO AVOID
|
||||
|
||||
1. **Using `AgentRuntime` with event_loop nodes** - `AgentRuntime` does not register EventLoopNodes. Use `GraphExecutor` directly with `node_registry`
|
||||
2. **Passing `runtime=None` to GraphExecutor** - The executor calls `runtime.start_run()` internally. Always provide a `Runtime(storage_path)` instance
|
||||
3. **Using tools that don't exist** - Always check `mcp__agent-builder__list_mcp_tools()` first
|
||||
4. **Wrong entry_points format** - Must be `{"start": "node-id"}`, NOT a set or list
|
||||
5. **Skipping validation** - Always validate nodes and graph before proceeding
|
||||
6. **Not waiting for approval** - Always ask user before major steps
|
||||
7. **Displaying this file** - Execute the steps, don't show documentation
|
||||
1. **Using tools that don't exist** - Always check `mcp__agent-builder__list_mcp_tools()` first
|
||||
2. **Wrong entry_points format** - Must be `{"start": "node-id"}`, NOT a set or list
|
||||
3. **Skipping validation** - Always validate nodes and graph before proceeding
|
||||
4. **Not waiting for approval** - Always ask user before major steps
|
||||
5. **Displaying this file** - Execute the steps, don't show documentation
|
||||
6. **Too many thin nodes** - Prefer fewer, richer nodes (4 nodes > 8 nodes)
|
||||
7. **Missing STEP 1/STEP 2 in client-facing prompts** - Client-facing nodes need explicit phases to prevent premature set_output
|
||||
8. **Forgetting nullable_output_keys** - Mark input_keys that only arrive on certain edges (e.g., feedback) as nullable on the receiving node
|
||||
9. **Adding framework gating for LLM behavior** - Fix prompts or use judges, not ad-hoc code
|
||||
|
||||
@@ -0,0 +1,24 @@
|
||||
"""
|
||||
Deep Research Agent - Interactive, rigorous research with TUI conversation.
|
||||
|
||||
Research any topic through multi-source web search, quality evaluation,
|
||||
and synthesis. Features client-facing TUI interaction at key checkpoints
|
||||
for user guidance and iterative deepening.
|
||||
"""
|
||||
|
||||
from .agent import DeepResearchAgent, default_agent, goal, nodes, edges
|
||||
from .config import RuntimeConfig, AgentMetadata, default_config, metadata
|
||||
|
||||
__version__ = "1.0.0"
|
||||
|
||||
__all__ = [
|
||||
"DeepResearchAgent",
|
||||
"default_agent",
|
||||
"goal",
|
||||
"nodes",
|
||||
"edges",
|
||||
"RuntimeConfig",
|
||||
"AgentMetadata",
|
||||
"default_config",
|
||||
"metadata",
|
||||
]
|
||||
+96
-18
@@ -1,5 +1,5 @@
|
||||
"""
|
||||
CLI entry point for Online Research Agent.
|
||||
CLI entry point for Deep Research Agent.
|
||||
|
||||
Uses AgentRuntime for multi-entrypoint support with HITL pause/resume.
|
||||
"""
|
||||
@@ -10,7 +10,7 @@ import logging
|
||||
import sys
|
||||
import click
|
||||
|
||||
from .agent import default_agent, OnlineResearchAgent
|
||||
from .agent import default_agent, DeepResearchAgent
|
||||
|
||||
|
||||
def setup_logging(verbose=False, debug=False):
|
||||
@@ -28,7 +28,7 @@ def setup_logging(verbose=False, debug=False):
|
||||
@click.group()
|
||||
@click.version_option(version="1.0.0")
|
||||
def cli():
|
||||
"""Online Research Agent - Deep-dive research with narrative reports."""
|
||||
"""Deep Research Agent - Interactive, rigorous research with TUI conversation."""
|
||||
pass
|
||||
|
||||
|
||||
@@ -59,6 +59,83 @@ def run(topic, mock, quiet, verbose, debug):
|
||||
sys.exit(0 if result.success else 1)
|
||||
|
||||
|
||||
@cli.command()
|
||||
@click.option("--mock", is_flag=True, help="Run in mock mode")
|
||||
@click.option("--verbose", "-v", is_flag=True, help="Show execution details")
|
||||
@click.option("--debug", is_flag=True, help="Show debug logging")
|
||||
def tui(mock, verbose, debug):
|
||||
"""Launch the TUI dashboard for interactive research."""
|
||||
setup_logging(verbose=verbose, debug=debug)
|
||||
|
||||
try:
|
||||
from framework.tui.app import AdenTUI
|
||||
except ImportError:
|
||||
click.echo("TUI requires the 'textual' package. Install with: pip install textual")
|
||||
sys.exit(1)
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
from framework.llm import LiteLLMProvider
|
||||
from framework.runner.tool_registry import ToolRegistry
|
||||
from framework.runtime.agent_runtime import create_agent_runtime
|
||||
from framework.runtime.event_bus import EventBus
|
||||
from framework.runtime.execution_stream import EntryPointSpec
|
||||
|
||||
async def run_with_tui():
|
||||
agent = DeepResearchAgent()
|
||||
|
||||
# Build graph and tools
|
||||
agent._event_bus = EventBus()
|
||||
agent._tool_registry = ToolRegistry()
|
||||
|
||||
mcp_config_path = Path(__file__).parent / "mcp_servers.json"
|
||||
if mcp_config_path.exists():
|
||||
agent._tool_registry.load_mcp_config(mcp_config_path)
|
||||
|
||||
llm = None
|
||||
if not mock:
|
||||
llm = LiteLLMProvider(
|
||||
model=agent.config.model,
|
||||
api_key=agent.config.api_key,
|
||||
api_base=agent.config.api_base,
|
||||
)
|
||||
|
||||
tools = list(agent._tool_registry.get_tools().values())
|
||||
tool_executor = agent._tool_registry.get_executor()
|
||||
graph = agent._build_graph()
|
||||
|
||||
storage_path = Path.home() / ".hive" / "deep_research_agent"
|
||||
storage_path.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
runtime = create_agent_runtime(
|
||||
graph=graph,
|
||||
goal=agent.goal,
|
||||
storage_path=storage_path,
|
||||
entry_points=[
|
||||
EntryPointSpec(
|
||||
id="start",
|
||||
name="Start Research",
|
||||
entry_node="intake",
|
||||
trigger_type="manual",
|
||||
isolation_level="isolated",
|
||||
),
|
||||
],
|
||||
llm=llm,
|
||||
tools=tools,
|
||||
tool_executor=tool_executor,
|
||||
)
|
||||
|
||||
await runtime.start()
|
||||
|
||||
try:
|
||||
app = AdenTUI(runtime)
|
||||
await app.run_async()
|
||||
finally:
|
||||
await runtime.stop()
|
||||
|
||||
asyncio.run(run_with_tui())
|
||||
|
||||
|
||||
@cli.command()
|
||||
@click.option("--json", "output_json", is_flag=True)
|
||||
def info(output_json):
|
||||
@@ -71,6 +148,7 @@ def info(output_json):
|
||||
click.echo(f"Version: {info_data['version']}")
|
||||
click.echo(f"Description: {info_data['description']}")
|
||||
click.echo(f"\nNodes: {', '.join(info_data['nodes'])}")
|
||||
click.echo(f"Client-facing: {', '.join(info_data['client_facing_nodes'])}")
|
||||
click.echo(f"Entry: {info_data['entry_node']}")
|
||||
click.echo(f"Terminal: {', '.join(info_data['terminal_nodes'])}")
|
||||
|
||||
@@ -81,6 +159,9 @@ def validate():
|
||||
validation = default_agent.validate()
|
||||
if validation["valid"]:
|
||||
click.echo("Agent is valid")
|
||||
if validation["warnings"]:
|
||||
for warning in validation["warnings"]:
|
||||
click.echo(f" WARNING: {warning}")
|
||||
else:
|
||||
click.echo("Agent has errors:")
|
||||
for error in validation["errors"]:
|
||||
@@ -91,7 +172,7 @@ def validate():
|
||||
@cli.command()
|
||||
@click.option("--verbose", "-v", is_flag=True)
|
||||
def shell(verbose):
|
||||
"""Interactive research session."""
|
||||
"""Interactive research session (CLI, no TUI)."""
|
||||
asyncio.run(_interactive_shell(verbose))
|
||||
|
||||
|
||||
@@ -99,10 +180,10 @@ async def _interactive_shell(verbose=False):
|
||||
"""Async interactive shell."""
|
||||
setup_logging(verbose=verbose)
|
||||
|
||||
click.echo("=== Online Research Agent ===")
|
||||
click.echo("=== Deep Research Agent ===")
|
||||
click.echo("Enter a topic to research (or 'quit' to exit):\n")
|
||||
|
||||
agent = OnlineResearchAgent()
|
||||
agent = DeepResearchAgent()
|
||||
await agent.start()
|
||||
|
||||
try:
|
||||
@@ -118,7 +199,7 @@ async def _interactive_shell(verbose=False):
|
||||
if not topic.strip():
|
||||
continue
|
||||
|
||||
click.echo("\nResearching... (this may take a few minutes)\n")
|
||||
click.echo("\nResearching...\n")
|
||||
|
||||
result = await agent.trigger_and_wait("start", {"topic": topic})
|
||||
|
||||
@@ -128,16 +209,14 @@ async def _interactive_shell(verbose=False):
|
||||
|
||||
if result.success:
|
||||
output = result.output
|
||||
if "file_path" in output:
|
||||
click.echo(f"\nReport saved to: {output['file_path']}\n")
|
||||
if "final_report" in output:
|
||||
click.echo("\n--- Report Preview ---\n")
|
||||
preview = (
|
||||
output["final_report"][:500] + "..."
|
||||
if len(output.get("final_report", "")) > 500
|
||||
else output.get("final_report", "")
|
||||
)
|
||||
click.echo(preview)
|
||||
if "report_content" in output:
|
||||
click.echo("\n--- Report ---\n")
|
||||
click.echo(output["report_content"])
|
||||
click.echo("\n")
|
||||
if "references" in output:
|
||||
click.echo("--- References ---\n")
|
||||
for ref in output.get("references", []):
|
||||
click.echo(f" [{ref.get('number', '?')}] {ref.get('title', '')} - {ref.get('url', '')}")
|
||||
click.echo("\n")
|
||||
else:
|
||||
click.echo(f"\nResearch failed: {result.error}\n")
|
||||
@@ -148,7 +227,6 @@ async def _interactive_shell(verbose=False):
|
||||
except Exception as e:
|
||||
click.echo(f"Error: {e}", err=True)
|
||||
import traceback
|
||||
|
||||
traceback.print_exc()
|
||||
finally:
|
||||
await agent.stop()
|
||||
+72
-127
@@ -1,9 +1,8 @@
|
||||
"""Agent graph construction for Online Research Agent."""
|
||||
"""Agent graph construction for Deep Research Agent."""
|
||||
|
||||
from framework.graph import EdgeSpec, EdgeCondition, Goal, SuccessCriterion, Constraint
|
||||
from framework.graph.edge import GraphSpec
|
||||
from framework.graph.executor import ExecutionResult, GraphExecutor
|
||||
from framework.graph.event_loop_node import EventLoopNode, LoopConfig
|
||||
from framework.runtime.event_bus import EventBus
|
||||
from framework.runtime.core import Runtime
|
||||
from framework.llm import LiteLLMProvider
|
||||
@@ -11,164 +10,132 @@ from framework.runner.tool_registry import ToolRegistry
|
||||
|
||||
from .config import default_config, metadata
|
||||
from .nodes import (
|
||||
parse_query_node,
|
||||
search_sources_node,
|
||||
fetch_content_node,
|
||||
evaluate_sources_node,
|
||||
synthesize_findings_node,
|
||||
write_report_node,
|
||||
quality_check_node,
|
||||
save_report_node,
|
||||
intake_node,
|
||||
research_node,
|
||||
review_node,
|
||||
report_node,
|
||||
)
|
||||
|
||||
# Goal definition
|
||||
goal = Goal(
|
||||
id="comprehensive-online-research",
|
||||
name="Comprehensive Online Research",
|
||||
description="Research any topic by searching multiple sources, synthesizing information, and producing a well-structured narrative report with citations.",
|
||||
id="rigorous-interactive-research",
|
||||
name="Rigorous Interactive Research",
|
||||
description=(
|
||||
"Research any topic by searching diverse sources, analyzing findings, "
|
||||
"and producing a cited report — with user checkpoints to guide direction."
|
||||
),
|
||||
success_criteria=[
|
||||
SuccessCriterion(
|
||||
id="source-coverage",
|
||||
description="Query 10+ diverse sources",
|
||||
id="source-diversity",
|
||||
description="Use multiple diverse, authoritative sources",
|
||||
metric="source_count",
|
||||
target=">=10",
|
||||
weight=0.20,
|
||||
),
|
||||
SuccessCriterion(
|
||||
id="relevance",
|
||||
description="All sources directly address the query",
|
||||
metric="relevance_score",
|
||||
target="90%",
|
||||
target=">=5",
|
||||
weight=0.25,
|
||||
),
|
||||
SuccessCriterion(
|
||||
id="synthesis",
|
||||
description="Synthesize findings into coherent narrative",
|
||||
metric="coherence_score",
|
||||
target="85%",
|
||||
weight=0.25,
|
||||
),
|
||||
SuccessCriterion(
|
||||
id="citations",
|
||||
description="Include citations for all claims",
|
||||
id="citation-coverage",
|
||||
description="Every factual claim in the report cites its source",
|
||||
metric="citation_coverage",
|
||||
target="100%",
|
||||
weight=0.15,
|
||||
weight=0.25,
|
||||
),
|
||||
SuccessCriterion(
|
||||
id="actionable",
|
||||
description="Report answers the user's question",
|
||||
metric="answer_completeness",
|
||||
id="user-satisfaction",
|
||||
description="User reviews findings before report generation",
|
||||
metric="user_approval",
|
||||
target="true",
|
||||
weight=0.25,
|
||||
),
|
||||
SuccessCriterion(
|
||||
id="report-completeness",
|
||||
description="Final report answers the original research questions",
|
||||
metric="question_coverage",
|
||||
target="90%",
|
||||
weight=0.15,
|
||||
weight=0.25,
|
||||
),
|
||||
],
|
||||
constraints=[
|
||||
Constraint(
|
||||
id="no-hallucination",
|
||||
description="Only include information found in sources",
|
||||
description="Only include information found in fetched sources",
|
||||
constraint_type="quality",
|
||||
category="accuracy",
|
||||
),
|
||||
Constraint(
|
||||
id="source-attribution",
|
||||
description="Every factual claim must cite its source",
|
||||
description="Every claim must cite its source with a numbered reference",
|
||||
constraint_type="quality",
|
||||
category="accuracy",
|
||||
),
|
||||
Constraint(
|
||||
id="recency-preference",
|
||||
description="Prefer recent sources when relevant",
|
||||
constraint_type="quality",
|
||||
category="relevance",
|
||||
),
|
||||
Constraint(
|
||||
id="no-paywalled",
|
||||
description="Avoid sources that require payment to access",
|
||||
id="user-checkpoint",
|
||||
description="Present findings to the user before writing the final report",
|
||||
constraint_type="functional",
|
||||
category="accessibility",
|
||||
category="interaction",
|
||||
),
|
||||
],
|
||||
)
|
||||
|
||||
# Node list
|
||||
nodes = [
|
||||
parse_query_node,
|
||||
search_sources_node,
|
||||
fetch_content_node,
|
||||
evaluate_sources_node,
|
||||
synthesize_findings_node,
|
||||
write_report_node,
|
||||
quality_check_node,
|
||||
save_report_node,
|
||||
intake_node,
|
||||
research_node,
|
||||
review_node,
|
||||
report_node,
|
||||
]
|
||||
|
||||
# Edge definitions
|
||||
edges = [
|
||||
# intake -> research
|
||||
EdgeSpec(
|
||||
id="parse-to-search",
|
||||
source="parse-query",
|
||||
target="search-sources",
|
||||
id="intake-to-research",
|
||||
source="intake",
|
||||
target="research",
|
||||
condition=EdgeCondition.ON_SUCCESS,
|
||||
priority=1,
|
||||
),
|
||||
# research -> review
|
||||
EdgeSpec(
|
||||
id="search-to-fetch",
|
||||
source="search-sources",
|
||||
target="fetch-content",
|
||||
id="research-to-review",
|
||||
source="research",
|
||||
target="review",
|
||||
condition=EdgeCondition.ON_SUCCESS,
|
||||
priority=1,
|
||||
),
|
||||
# review -> research (feedback loop)
|
||||
EdgeSpec(
|
||||
id="fetch-to-evaluate",
|
||||
source="fetch-content",
|
||||
target="evaluate-sources",
|
||||
condition=EdgeCondition.ON_SUCCESS,
|
||||
id="review-to-research-feedback",
|
||||
source="review",
|
||||
target="research",
|
||||
condition=EdgeCondition.CONDITIONAL,
|
||||
condition_expr="needs_more_research == True",
|
||||
priority=1,
|
||||
),
|
||||
# review -> report (user satisfied)
|
||||
EdgeSpec(
|
||||
id="evaluate-to-synthesize",
|
||||
source="evaluate-sources",
|
||||
target="synthesize-findings",
|
||||
condition=EdgeCondition.ON_SUCCESS,
|
||||
priority=1,
|
||||
),
|
||||
EdgeSpec(
|
||||
id="synthesize-to-write",
|
||||
source="synthesize-findings",
|
||||
target="write-report",
|
||||
condition=EdgeCondition.ON_SUCCESS,
|
||||
priority=1,
|
||||
),
|
||||
EdgeSpec(
|
||||
id="write-to-quality",
|
||||
source="write-report",
|
||||
target="quality-check",
|
||||
condition=EdgeCondition.ON_SUCCESS,
|
||||
priority=1,
|
||||
),
|
||||
EdgeSpec(
|
||||
id="quality-to-save",
|
||||
source="quality-check",
|
||||
target="save-report",
|
||||
condition=EdgeCondition.ON_SUCCESS,
|
||||
priority=1,
|
||||
id="review-to-report",
|
||||
source="review",
|
||||
target="report",
|
||||
condition=EdgeCondition.CONDITIONAL,
|
||||
condition_expr="needs_more_research == False",
|
||||
priority=2,
|
||||
),
|
||||
]
|
||||
|
||||
# Graph configuration
|
||||
entry_node = "parse-query"
|
||||
entry_points = {"start": "parse-query"}
|
||||
entry_node = "intake"
|
||||
entry_points = {"start": "intake"}
|
||||
pause_nodes = []
|
||||
terminal_nodes = ["save-report"]
|
||||
terminal_nodes = ["report"]
|
||||
|
||||
|
||||
class OnlineResearchAgent:
|
||||
class DeepResearchAgent:
|
||||
"""
|
||||
Online Research Agent - Deep-dive research with narrative reports.
|
||||
Deep Research Agent — 4-node pipeline with user checkpoints.
|
||||
|
||||
Uses GraphExecutor directly with EventLoopNode instances registered
|
||||
in the node_registry for multi-turn tool execution.
|
||||
Flow: intake -> research -> review -> report
|
||||
^ |
|
||||
+-- feedback loop (if user wants more)
|
||||
"""
|
||||
|
||||
def __init__(self, config=None):
|
||||
@@ -188,7 +155,7 @@ class OnlineResearchAgent:
|
||||
def _build_graph(self) -> GraphSpec:
|
||||
"""Build the GraphSpec."""
|
||||
return GraphSpec(
|
||||
id="online-research-agent-graph",
|
||||
id="deep-research-agent-graph",
|
||||
goal_id=self.goal.id,
|
||||
version="1.0.0",
|
||||
entry_node=self.entry_node,
|
||||
@@ -201,29 +168,11 @@ class OnlineResearchAgent:
|
||||
max_tokens=self.config.max_tokens,
|
||||
)
|
||||
|
||||
def _build_node_registry(self, tool_executor=None) -> dict:
|
||||
"""Create EventLoopNode instances for all event_loop nodes."""
|
||||
registry = {}
|
||||
for node_spec in self.nodes:
|
||||
if node_spec.node_type == "event_loop":
|
||||
registry[node_spec.id] = EventLoopNode(
|
||||
event_bus=self._event_bus,
|
||||
judge=None, # implicit judge: accept when output_keys are filled
|
||||
config=LoopConfig(
|
||||
max_iterations=50,
|
||||
max_tool_calls_per_turn=15,
|
||||
stall_detection_threshold=3,
|
||||
max_history_tokens=32000,
|
||||
),
|
||||
tool_executor=tool_executor,
|
||||
)
|
||||
return registry
|
||||
|
||||
def _setup(self, mock_mode=False) -> GraphExecutor:
|
||||
"""Set up the executor with all components."""
|
||||
from pathlib import Path
|
||||
|
||||
storage_path = Path.home() / ".hive" / "online_research_agent"
|
||||
storage_path = Path.home() / ".hive" / "deep_research_agent"
|
||||
storage_path.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
self._event_bus = EventBus()
|
||||
@@ -245,7 +194,6 @@ class OnlineResearchAgent:
|
||||
tools = list(self._tool_registry.get_tools().values())
|
||||
|
||||
self._graph = self._build_graph()
|
||||
node_registry = self._build_node_registry(tool_executor=tool_executor)
|
||||
runtime = Runtime(storage_path)
|
||||
|
||||
self._executor = GraphExecutor(
|
||||
@@ -253,7 +201,8 @@ class OnlineResearchAgent:
|
||||
llm=llm,
|
||||
tools=tools,
|
||||
tool_executor=tool_executor,
|
||||
node_registry=node_registry,
|
||||
event_bus=self._event_bus,
|
||||
storage_path=storage_path,
|
||||
)
|
||||
|
||||
return self._executor
|
||||
@@ -317,7 +266,7 @@ class OnlineResearchAgent:
|
||||
"entry_points": self.entry_points,
|
||||
"pause_nodes": self.pause_nodes,
|
||||
"terminal_nodes": self.terminal_nodes,
|
||||
"multi_entrypoint": True,
|
||||
"client_facing_nodes": [n.id for n in self.nodes if n.client_facing],
|
||||
}
|
||||
|
||||
def validate(self):
|
||||
@@ -339,10 +288,6 @@ class OnlineResearchAgent:
|
||||
if terminal not in node_ids:
|
||||
errors.append(f"Terminal node '{terminal}' not found")
|
||||
|
||||
for pause in self.pause_nodes:
|
||||
if pause not in node_ids:
|
||||
errors.append(f"Pause node '{pause}' not found")
|
||||
|
||||
for ep_id, node_id in self.entry_points.items():
|
||||
if node_id not in node_ids:
|
||||
errors.append(
|
||||
@@ -357,4 +302,4 @@ class OnlineResearchAgent:
|
||||
|
||||
|
||||
# Create default instance
|
||||
default_agent = OnlineResearchAgent()
|
||||
default_agent = DeepResearchAgent()
|
||||
+6
-3
@@ -32,12 +32,15 @@ class RuntimeConfig:
|
||||
default_config = RuntimeConfig()
|
||||
|
||||
|
||||
# Agent metadata
|
||||
@dataclass
|
||||
class AgentMetadata:
|
||||
name: str = "Online Research Agent"
|
||||
name: str = "Deep Research Agent"
|
||||
version: str = "1.0.0"
|
||||
description: str = "Research any topic by searching multiple sources, synthesizing information, and producing a well-structured narrative report with citations."
|
||||
description: str = (
|
||||
"Interactive research agent that rigorously investigates topics through "
|
||||
"multi-source search, quality evaluation, and synthesis - with TUI conversation "
|
||||
"at key checkpoints for user guidance and feedback."
|
||||
)
|
||||
|
||||
|
||||
metadata = AgentMetadata()
|
||||
+147
@@ -0,0 +1,147 @@
|
||||
"""Node definitions for Deep Research Agent."""
|
||||
|
||||
from framework.graph import NodeSpec
|
||||
|
||||
# Node 1: Intake (client-facing)
|
||||
# Brief conversation to clarify what the user wants researched.
|
||||
intake_node = NodeSpec(
|
||||
id="intake",
|
||||
name="Research Intake",
|
||||
description="Discuss the research topic with the user, clarify scope, and confirm direction",
|
||||
node_type="event_loop",
|
||||
client_facing=True,
|
||||
input_keys=["topic"],
|
||||
output_keys=["research_brief"],
|
||||
system_prompt="""\
|
||||
You are a research intake specialist. The user wants to research a topic.
|
||||
Have a brief conversation to clarify what they need.
|
||||
|
||||
**STEP 1 — Read and respond (text only, NO tool calls):**
|
||||
1. Read the topic provided
|
||||
2. If it's vague, ask 1-2 clarifying questions (scope, angle, depth)
|
||||
3. If it's already clear, confirm your understanding and ask the user to confirm
|
||||
|
||||
Keep it short. Don't over-ask.
|
||||
|
||||
**STEP 2 — After the user confirms, call set_output:**
|
||||
- set_output("research_brief", "A clear paragraph describing exactly what to research, \
|
||||
what questions to answer, what scope to cover, and how deep to go.")
|
||||
""",
|
||||
tools=[],
|
||||
)
|
||||
|
||||
# Node 2: Research
|
||||
# The workhorse — searches the web, fetches content, analyzes sources.
|
||||
# One node with both tools avoids the context-passing overhead of 5 separate nodes.
|
||||
research_node = NodeSpec(
|
||||
id="research",
|
||||
name="Research",
|
||||
description="Search the web, fetch source content, and compile findings",
|
||||
node_type="event_loop",
|
||||
max_node_visits=3,
|
||||
input_keys=["research_brief", "feedback"],
|
||||
output_keys=["findings", "sources", "gaps"],
|
||||
nullable_output_keys=["feedback"],
|
||||
system_prompt="""\
|
||||
You are a research agent. Given a research brief, find and analyze sources.
|
||||
|
||||
If feedback is provided, this is a follow-up round — focus on the gaps identified.
|
||||
|
||||
Work in phases:
|
||||
1. **Search**: Use web_search with 3-5 diverse queries covering different angles.
|
||||
Prioritize authoritative sources (.edu, .gov, established publications).
|
||||
2. **Fetch**: Use web_scrape on the most promising URLs (aim for 5-8 sources).
|
||||
Skip URLs that fail. Extract the substantive content.
|
||||
3. **Analyze**: Review what you've collected. Identify key findings, themes,
|
||||
and any contradictions between sources.
|
||||
|
||||
Important:
|
||||
- Work in batches of 3-4 tool calls at a time to manage context
|
||||
- After each batch, assess whether you have enough material
|
||||
- Prefer quality over quantity — 5 good sources beat 15 thin ones
|
||||
- Track which URL each finding comes from (you'll need citations later)
|
||||
|
||||
When done, use set_output:
|
||||
- set_output("findings", "Structured summary: key findings with source URLs for each claim. \
|
||||
Include themes, contradictions, and confidence levels.")
|
||||
- set_output("sources", [{"url": "...", "title": "...", "summary": "..."}])
|
||||
- set_output("gaps", "What aspects of the research brief are NOT well-covered yet, if any.")
|
||||
""",
|
||||
tools=["web_search", "web_scrape", "load_data", "save_data", "list_data_files"],
|
||||
)
|
||||
|
||||
# Node 3: Review (client-facing)
|
||||
# Shows the user what was found and asks whether to dig deeper or proceed.
|
||||
review_node = NodeSpec(
|
||||
id="review",
|
||||
name="Review Findings",
|
||||
description="Present findings to user and decide whether to research more or write the report",
|
||||
node_type="event_loop",
|
||||
client_facing=True,
|
||||
max_node_visits=3,
|
||||
input_keys=["findings", "sources", "gaps", "research_brief"],
|
||||
output_keys=["needs_more_research", "feedback"],
|
||||
system_prompt="""\
|
||||
Present the research findings to the user clearly and concisely.
|
||||
|
||||
**STEP 1 — Present (your first message, text only, NO tool calls):**
|
||||
1. **Summary** (2-3 sentences of what was found)
|
||||
2. **Key Findings** (bulleted, with confidence levels)
|
||||
3. **Sources Used** (count and quality assessment)
|
||||
4. **Gaps** (what's still unclear or under-covered)
|
||||
|
||||
End by asking: Are they satisfied, or do they want deeper research? \
|
||||
Should we proceed to writing the final report?
|
||||
|
||||
**STEP 2 — After the user responds, call set_output:**
|
||||
- set_output("needs_more_research", "true") — if they want more
|
||||
- set_output("needs_more_research", "false") — if they're satisfied
|
||||
- set_output("feedback", "What the user wants explored further, or empty string")
|
||||
""",
|
||||
tools=[],
|
||||
)
|
||||
|
||||
# Node 4: Report (client-facing)
|
||||
# Writes the final report and presents it to the user.
|
||||
report_node = NodeSpec(
|
||||
id="report",
|
||||
name="Write & Deliver Report",
|
||||
description="Write a cited report from the findings and present it to the user",
|
||||
node_type="event_loop",
|
||||
client_facing=True,
|
||||
input_keys=["findings", "sources", "research_brief"],
|
||||
output_keys=["delivery_status"],
|
||||
system_prompt="""\
|
||||
Write a comprehensive research report and present it to the user.
|
||||
|
||||
**STEP 1 — Write and present the report (text only, NO tool calls):**
|
||||
|
||||
Report structure:
|
||||
1. **Executive Summary** (2-3 paragraphs)
|
||||
2. **Findings** (organized by theme, with [n] citations)
|
||||
3. **Analysis** (synthesis, implications, areas of debate)
|
||||
4. **Conclusion** (key takeaways, confidence assessment)
|
||||
5. **References** (numbered list of sources cited)
|
||||
|
||||
Requirements:
|
||||
- Every factual claim must cite its source with [n] notation
|
||||
- Be objective — present multiple viewpoints where sources disagree
|
||||
- Distinguish well-supported conclusions from speculation
|
||||
- Answer the original research questions from the brief
|
||||
|
||||
End by asking the user if they have questions or want to save the report.
|
||||
|
||||
**STEP 2 — After the user responds:**
|
||||
- Answer follow-up questions from the research material
|
||||
- If they want to save, use write_to_file tool
|
||||
- When the user is satisfied: set_output("delivery_status", "completed")
|
||||
""",
|
||||
tools=["write_to_file"],
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"intake_node",
|
||||
"research_node",
|
||||
"review_node",
|
||||
"report_node",
|
||||
]
|
||||
@@ -1,80 +0,0 @@
|
||||
# Online Research Agent
|
||||
|
||||
Deep-dive research agent that searches 10+ sources and produces comprehensive narrative reports with citations.
|
||||
|
||||
## Features
|
||||
|
||||
- Generates multiple search queries from a topic
|
||||
- Searches and fetches 15+ web sources
|
||||
- Evaluates and ranks sources by relevance
|
||||
- Synthesizes findings into themes
|
||||
- Writes narrative report with numbered citations
|
||||
- Quality checks for uncited claims
|
||||
- Saves report to local markdown file
|
||||
|
||||
## Usage
|
||||
|
||||
### CLI
|
||||
|
||||
```bash
|
||||
# Show agent info
|
||||
python -m online_research_agent info
|
||||
|
||||
# Validate structure
|
||||
python -m online_research_agent validate
|
||||
|
||||
# Run research on a topic
|
||||
python -m online_research_agent run --topic "impact of AI on healthcare"
|
||||
|
||||
# Interactive shell
|
||||
python -m online_research_agent shell
|
||||
```
|
||||
|
||||
### Python API
|
||||
|
||||
```python
|
||||
from online_research_agent import default_agent
|
||||
|
||||
# Simple usage
|
||||
result = await default_agent.run({"topic": "climate change solutions"})
|
||||
|
||||
# Check output
|
||||
if result.success:
|
||||
print(f"Report saved to: {result.output['file_path']}")
|
||||
print(result.output['final_report'])
|
||||
```
|
||||
|
||||
## Workflow
|
||||
|
||||
```
|
||||
parse-query → search-sources → fetch-content → evaluate-sources
|
||||
↓
|
||||
write-report ← synthesize-findings
|
||||
↓
|
||||
quality-check → save-report
|
||||
```
|
||||
|
||||
## Output
|
||||
|
||||
Reports are saved to `./research_reports/` as markdown files with:
|
||||
|
||||
1. Executive Summary
|
||||
2. Introduction
|
||||
3. Key Findings (by theme)
|
||||
4. Analysis
|
||||
5. Conclusion
|
||||
6. References
|
||||
|
||||
## Requirements
|
||||
|
||||
- Python 3.11+
|
||||
- LLM provider API key (Groq, Cerebras, etc.)
|
||||
- Internet access for web search/fetch
|
||||
|
||||
## Configuration
|
||||
|
||||
Edit `config.py` to change:
|
||||
|
||||
- `model`: LLM model (default: groq/moonshotai/kimi-k2-instruct-0905)
|
||||
- `temperature`: Generation temperature (default: 0.7)
|
||||
- `max_tokens`: Max tokens per response (default: 16384)
|
||||
-23
@@ -1,23 +0,0 @@
|
||||
"""
|
||||
Online Research Agent - Deep-dive research with narrative reports.
|
||||
|
||||
Research any topic by searching multiple sources, synthesizing information,
|
||||
and producing a well-structured narrative report with citations.
|
||||
"""
|
||||
|
||||
from .agent import OnlineResearchAgent, default_agent, goal, nodes, edges
|
||||
from .config import RuntimeConfig, AgentMetadata, default_config, metadata
|
||||
|
||||
__version__ = "1.0.0"
|
||||
|
||||
__all__ = [
|
||||
"OnlineResearchAgent",
|
||||
"default_agent",
|
||||
"goal",
|
||||
"nodes",
|
||||
"edges",
|
||||
"RuntimeConfig",
|
||||
"AgentMetadata",
|
||||
"default_config",
|
||||
"metadata",
|
||||
]
|
||||
-232
@@ -1,232 +0,0 @@
|
||||
"""Node definitions for Online Research Agent."""
|
||||
|
||||
from framework.graph import NodeSpec
|
||||
|
||||
# Node 1: Parse Query
|
||||
parse_query_node = NodeSpec(
|
||||
id="parse-query",
|
||||
name="Parse Query",
|
||||
description="Analyze the research topic and generate 3-5 diverse search queries to cover different aspects",
|
||||
node_type="event_loop",
|
||||
input_keys=["topic"],
|
||||
output_keys=["search_queries", "research_focus", "key_aspects"],
|
||||
system_prompt="""\
|
||||
You are a research query strategist. Given a research topic, analyze it and generate search queries.
|
||||
|
||||
Your task:
|
||||
1. Understand the core research question
|
||||
2. Identify 3-5 key aspects to investigate
|
||||
3. Generate 3-5 diverse search queries that will find comprehensive information
|
||||
|
||||
Use set_output to store each result:
|
||||
- set_output("research_focus", "Brief statement of what we're researching")
|
||||
- set_output("key_aspects", ["aspect1", "aspect2", "aspect3"])
|
||||
- set_output("search_queries", ["query 1", "query 2", "query 3", "query 4", "query 5"])
|
||||
""",
|
||||
tools=[],
|
||||
)
|
||||
|
||||
# Node 2: Search Sources
|
||||
search_sources_node = NodeSpec(
|
||||
id="search-sources",
|
||||
name="Search Sources",
|
||||
description="Execute web searches using the generated queries to find 15+ source URLs",
|
||||
node_type="event_loop",
|
||||
input_keys=["search_queries", "research_focus"],
|
||||
output_keys=["source_urls", "search_results_summary"],
|
||||
system_prompt="""\
|
||||
You are a research assistant executing web searches. Use the web_search tool to find sources.
|
||||
|
||||
Your task:
|
||||
1. Execute each search query using web_search tool
|
||||
2. Collect URLs from search results
|
||||
3. Aim for 15+ diverse sources
|
||||
|
||||
After searching, use set_output to store results:
|
||||
- set_output("source_urls", ["url1", "url2", ...])
|
||||
- set_output("search_results_summary", "Brief summary of what was found")
|
||||
""",
|
||||
tools=["web_search"],
|
||||
)
|
||||
|
||||
# Node 3: Fetch Content
|
||||
fetch_content_node = NodeSpec(
|
||||
id="fetch-content",
|
||||
name="Fetch Content",
|
||||
description="Fetch and extract content from the discovered source URLs",
|
||||
node_type="event_loop",
|
||||
input_keys=["source_urls", "research_focus"],
|
||||
output_keys=["fetched_sources", "fetch_errors"],
|
||||
system_prompt="""\
|
||||
You are a content fetcher. Use web_scrape tool to retrieve content from URLs.
|
||||
|
||||
Your task:
|
||||
1. Fetch content from each source URL using web_scrape tool
|
||||
2. Extract the main content relevant to the research focus
|
||||
3. Track any URLs that failed to fetch
|
||||
|
||||
After fetching, use set_output to store results:
|
||||
- set_output("fetched_sources", [{"url": "...", "title": "...", "content": "..."}])
|
||||
- set_output("fetch_errors", ["url that failed", ...])
|
||||
""",
|
||||
tools=["web_scrape"],
|
||||
)
|
||||
|
||||
# Node 4: Evaluate Sources
|
||||
evaluate_sources_node = NodeSpec(
|
||||
id="evaluate-sources",
|
||||
name="Evaluate Sources",
|
||||
description="Score sources for relevance and quality, filter to top 10",
|
||||
node_type="event_loop",
|
||||
input_keys=["fetched_sources", "research_focus", "key_aspects"],
|
||||
output_keys=["ranked_sources", "source_analysis"],
|
||||
system_prompt="""\
|
||||
You are a source evaluator. Assess each source for quality and relevance.
|
||||
|
||||
Scoring criteria:
|
||||
- Relevance to research focus (1-10)
|
||||
- Source credibility (1-10)
|
||||
- Information depth (1-10)
|
||||
- Recency if relevant (1-10)
|
||||
|
||||
Your task:
|
||||
1. Score each source
|
||||
2. Rank by combined score
|
||||
3. Select top 10 sources
|
||||
4. Note what each source uniquely contributes
|
||||
|
||||
Use set_output to store results:
|
||||
- set_output("ranked_sources", [{"url": "...", "title": "...", "score": 8.5}])
|
||||
- set_output("source_analysis", "Overview of source quality and coverage")
|
||||
""",
|
||||
tools=[],
|
||||
)
|
||||
|
||||
# Node 5: Synthesize Findings
|
||||
synthesize_findings_node = NodeSpec(
|
||||
id="synthesize-findings",
|
||||
name="Synthesize Findings",
|
||||
description="Extract key facts from sources and identify common themes",
|
||||
node_type="event_loop",
|
||||
input_keys=["ranked_sources", "research_focus", "key_aspects"],
|
||||
output_keys=["key_findings", "themes", "source_citations"],
|
||||
system_prompt="""\
|
||||
You are a research synthesizer. Analyze multiple sources to extract insights.
|
||||
|
||||
Your task:
|
||||
1. Identify key facts from each source
|
||||
2. Find common themes across sources
|
||||
3. Note contradictions or debates
|
||||
4. Build a citation map (fact -> source URL)
|
||||
|
||||
Use set_output to store each result:
|
||||
- set_output("key_findings", [{"finding": "...", "sources": ["url1"], "confidence": "high"}])
|
||||
- set_output("themes", [{"theme": "...", "description": "...", "supporting_sources": [...]}])
|
||||
- set_output("source_citations", {"fact or claim": ["url1", "url2"]})
|
||||
""",
|
||||
tools=[],
|
||||
)
|
||||
|
||||
# Node 6: Write Report
|
||||
write_report_node = NodeSpec(
|
||||
id="write-report",
|
||||
name="Write Report",
|
||||
description="Generate a narrative report with proper citations",
|
||||
node_type="event_loop",
|
||||
input_keys=[
|
||||
"key_findings",
|
||||
"themes",
|
||||
"source_citations",
|
||||
"research_focus",
|
||||
"ranked_sources",
|
||||
],
|
||||
output_keys=["report_content", "references"],
|
||||
system_prompt="""\
|
||||
You are a research report writer. Create a well-structured narrative report.
|
||||
|
||||
Report structure:
|
||||
1. Executive Summary (2-3 paragraphs)
|
||||
2. Introduction (context and scope)
|
||||
3. Key Findings (organized by theme)
|
||||
4. Analysis (synthesis and implications)
|
||||
5. Conclusion
|
||||
6. References (numbered list of all sources)
|
||||
|
||||
Citation format: Use numbered citations like [1], [2] that correspond to the References section.
|
||||
|
||||
IMPORTANT:
|
||||
- Every factual claim MUST have a citation
|
||||
- Write in clear, professional prose
|
||||
- Be objective and balanced
|
||||
- Highlight areas of consensus and debate
|
||||
|
||||
Use set_output to store results:
|
||||
- set_output("report_content", "Full markdown report text with citations...")
|
||||
- set_output("references", [{"number": 1, "url": "...", "title": "..."}])
|
||||
""",
|
||||
tools=[],
|
||||
)
|
||||
|
||||
# Node 7: Quality Check
|
||||
quality_check_node = NodeSpec(
|
||||
id="quality-check",
|
||||
name="Quality Check",
|
||||
description="Verify all claims have citations and report is coherent",
|
||||
node_type="event_loop",
|
||||
input_keys=["report_content", "references", "source_citations"],
|
||||
output_keys=["quality_score", "issues", "final_report"],
|
||||
system_prompt="""\
|
||||
You are a quality assurance reviewer. Check the research report for issues.
|
||||
|
||||
Check for:
|
||||
1. Uncited claims (factual statements without [n] citation)
|
||||
2. Broken citations (references to non-existent numbers)
|
||||
3. Coherence (logical flow between sections)
|
||||
4. Completeness (all key aspects covered)
|
||||
5. Accuracy (claims match source content)
|
||||
|
||||
If issues found, fix them in the final report.
|
||||
|
||||
Use set_output to store results:
|
||||
- set_output("quality_score", 0.95)
|
||||
- set_output("issues", [{"type": "uncited_claim", "location": "...", "fixed": true}])
|
||||
- set_output("final_report", "Corrected full report with all issues fixed...")
|
||||
""",
|
||||
tools=[],
|
||||
)
|
||||
|
||||
# Node 8: Save Report
|
||||
save_report_node = NodeSpec(
|
||||
id="save-report",
|
||||
name="Save Report",
|
||||
description="Write the final report to a local markdown file",
|
||||
node_type="event_loop",
|
||||
input_keys=["final_report", "references", "research_focus"],
|
||||
output_keys=["file_path", "save_status"],
|
||||
system_prompt="""\
|
||||
You are a file manager. Save the research report to disk.
|
||||
|
||||
Your task:
|
||||
1. Generate a filename from the research focus (slugified, with date)
|
||||
2. Use the write_to_file tool to save the report as markdown
|
||||
3. Save to the ./research_reports/ directory
|
||||
|
||||
Filename format: research_YYYY-MM-DD_topic-slug.md
|
||||
|
||||
Use set_output to store results:
|
||||
- set_output("file_path", "research_reports/research_2026-01-23_topic-name.md")
|
||||
- set_output("save_status", "success")
|
||||
""",
|
||||
tools=["write_to_file"],
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"parse_query_node",
|
||||
"search_sources_node",
|
||||
"fetch_content_node",
|
||||
"evaluate_sources_node",
|
||||
"synthesize_findings_node",
|
||||
"write_report_node",
|
||||
"quality_check_node",
|
||||
"save_report_node",
|
||||
]
|
||||
@@ -158,6 +158,43 @@ intake_node = NodeSpec(
|
||||
|
||||
> **Legacy Note:** The old `pause_nodes` / `entry_points` pattern still works but `client_facing=True` is preferred for new agents.
|
||||
|
||||
**STEP 1 / STEP 2 Prompt Pattern:** For client-facing nodes, structure the system prompt with two explicit phases:
|
||||
|
||||
```python
|
||||
system_prompt="""\
|
||||
**STEP 1 — Respond to the user (text only, NO tool calls):**
|
||||
[Present information, ask questions, etc.]
|
||||
|
||||
**STEP 2 — After the user responds, call set_output:**
|
||||
[Call set_output with the structured outputs]
|
||||
"""
|
||||
```
|
||||
|
||||
This prevents the LLM from calling `set_output` prematurely before the user has had a chance to respond.
|
||||
|
||||
### Node Design: Fewer, Richer Nodes
|
||||
|
||||
Prefer fewer nodes that do more work over many thin single-purpose nodes:
|
||||
|
||||
- **Bad**: 8 thin nodes (parse query → search → fetch → evaluate → synthesize → write → check → save)
|
||||
- **Good**: 4 rich nodes (intake → research → review → report)
|
||||
|
||||
Why: Each node boundary requires serializing outputs and passing context. Fewer nodes means the LLM retains full context of its work within the node. A research node that searches, fetches, and analyzes keeps all the source material in its conversation history.
|
||||
|
||||
### nullable_output_keys for Cross-Edge Inputs
|
||||
|
||||
When a node receives inputs that only arrive on certain edges (e.g., `feedback` only comes from a review → research feedback loop, not from intake → research), mark those keys as `nullable_output_keys`:
|
||||
|
||||
```python
|
||||
research_node = NodeSpec(
|
||||
id="research",
|
||||
input_keys=["research_brief", "feedback"],
|
||||
nullable_output_keys=["feedback"], # Not present on first visit
|
||||
max_node_visits=3,
|
||||
...
|
||||
)
|
||||
```
|
||||
|
||||
## Event Loop Architecture Concepts
|
||||
|
||||
### How EventLoopNode Works
|
||||
@@ -169,40 +206,30 @@ An event loop node runs a multi-turn loop:
|
||||
4. Judge evaluates: ACCEPT (exit loop), RETRY (loop again), or ESCALATE
|
||||
5. Repeat until judge ACCEPTs or max_iterations reached
|
||||
|
||||
### CRITICAL: EventLoopNode Runtime Requirements
|
||||
### EventLoopNode Runtime
|
||||
|
||||
EventLoopNodes are **not auto-created** by the graph executor. They must be explicitly instantiated and registered in a `node_registry` dict before execution.
|
||||
|
||||
**Required components:**
|
||||
1. **`EventLoopNode` instances** — One per event_loop NodeSpec, registered in `node_registry`
|
||||
2. **`Runtime` instance** — `GraphExecutor` calls `runtime.start_run()` internally. Passing `None` crashes the executor
|
||||
3. **`GraphExecutor` (not `AgentRuntime`)** — `AgentRuntime`/`create_agent_runtime()` does NOT pass `node_registry` to the internal `GraphExecutor`, so all event_loop nodes fail with "not found in registry"
|
||||
EventLoopNodes are **auto-created** by `GraphExecutor` at runtime. You do NOT need to manually register them. Both `GraphExecutor` (direct) and `AgentRuntime` / `create_agent_runtime()` handle event_loop nodes automatically.
|
||||
|
||||
```python
|
||||
# Direct execution — executor auto-creates EventLoopNodes
|
||||
from framework.graph.executor import GraphExecutor
|
||||
from framework.graph.event_loop_node import EventLoopNode, LoopConfig
|
||||
from framework.runtime.event_bus import EventBus
|
||||
from framework.runtime.core import Runtime
|
||||
|
||||
# Build node_registry
|
||||
event_bus = EventBus()
|
||||
node_registry = {}
|
||||
for node_spec in nodes:
|
||||
if node_spec.node_type == "event_loop":
|
||||
node_registry[node_spec.id] = EventLoopNode(
|
||||
event_bus=event_bus,
|
||||
config=LoopConfig(max_iterations=50, max_tool_calls_per_turn=15),
|
||||
tool_executor=tool_executor,
|
||||
)
|
||||
|
||||
# Create executor with Runtime and node_registry
|
||||
runtime = Runtime(storage_path)
|
||||
executor = GraphExecutor(
|
||||
runtime=runtime,
|
||||
llm=llm,
|
||||
tools=tools,
|
||||
tool_executor=tool_executor,
|
||||
node_registry=node_registry,
|
||||
storage_path=storage_path,
|
||||
)
|
||||
result = await executor.execute(graph=graph, goal=goal, input_data=input_data)
|
||||
|
||||
# TUI execution — AgentRuntime also works
|
||||
from framework.runtime.agent_runtime import create_agent_runtime
|
||||
runtime = create_agent_runtime(
|
||||
graph=graph, goal=goal, storage_path=storage_path,
|
||||
entry_points=[...], llm=llm, tools=tools, tool_executor=tool_executor,
|
||||
)
|
||||
```
|
||||
|
||||
@@ -210,8 +237,12 @@ executor = GraphExecutor(
|
||||
|
||||
Nodes produce structured outputs by calling `set_output(key, value)` — a synthetic tool injected by the framework. When the LLM calls `set_output`, the value is stored in the output accumulator and made available to downstream nodes via shared memory.
|
||||
|
||||
`set_output` is NOT a real tool — it is excluded from `real_tool_results`. For client-facing nodes, this means a turn where the LLM only calls `set_output` (no other tools) is treated as a conversational boundary and will block for user input.
|
||||
|
||||
### JudgeProtocol
|
||||
|
||||
**The judge is the SOLE mechanism for acceptance decisions.** Do not add ad-hoc framework gating, output rollback, or premature rejection logic. If the LLM calls `set_output` too early, fix it with better prompts or a custom judge — not framework-level guards.
|
||||
|
||||
The judge controls when a node's loop exits:
|
||||
- **Implicit judge** (default, no judge configured): ACCEPTs when the LLM finishes with no tool calls and all required output keys are set
|
||||
- **SchemaJudge**: Validates outputs against a Pydantic model
|
||||
@@ -225,6 +256,23 @@ Controls loop behavior:
|
||||
- `stall_detection_threshold` (default 3) — detects repeated identical responses
|
||||
- `max_history_tokens` (default 32000) — triggers conversation compaction
|
||||
|
||||
### Data Tools (Spillover Management)
|
||||
|
||||
When tool results exceed the context window, the framework automatically saves them to a spillover directory and truncates with a hint. Nodes that produce or consume large data should include the data tools:
|
||||
|
||||
- `save_data(filename, data, data_dir)` — Write data to a file in the data directory
|
||||
- `load_data(filename, data_dir, offset=0, limit=50)` — Read data with line-based pagination
|
||||
- `list_data_files(data_dir)` — List available data files
|
||||
|
||||
These are real MCP tools (not synthetic). Add them to nodes that handle large tool results:
|
||||
|
||||
```python
|
||||
research_node = NodeSpec(
|
||||
...
|
||||
tools=["web_search", "web_scrape", "load_data", "save_data", "list_data_files"],
|
||||
)
|
||||
```
|
||||
|
||||
### Fan-Out / Fan-In
|
||||
|
||||
Multiple ON_SUCCESS edges from the same source create parallel execution. All branches run concurrently via `asyncio.gather()`. Parallel event_loop nodes must have disjoint `output_keys`.
|
||||
|
||||
@@ -61,28 +61,38 @@ For agents needing multi-turn conversations with users, use `client_facing=True`
|
||||
A client-facing node streams LLM output to the user and blocks for user input between conversational turns. This replaces the old pause/resume pattern.
|
||||
|
||||
```python
|
||||
# Client-facing node blocks for user input
|
||||
# Client-facing node with STEP 1/STEP 2 prompt pattern
|
||||
intake_node = NodeSpec(
|
||||
id="intake",
|
||||
name="Intake",
|
||||
description="Gather requirements from the user",
|
||||
node_type="event_loop",
|
||||
client_facing=True,
|
||||
input_keys=[],
|
||||
output_keys=["repo_url", "project_url"],
|
||||
system_prompt="You are the intake agent. Ask the user for their repo URL and project URL. When you have both, call set_output for each.",
|
||||
input_keys=["topic"],
|
||||
output_keys=["research_brief"],
|
||||
system_prompt="""\
|
||||
You are an intake specialist.
|
||||
|
||||
**STEP 1 — Read and respond (text only, NO tool calls):**
|
||||
1. Read the topic provided
|
||||
2. If it's vague, ask 1-2 clarifying questions
|
||||
3. If it's clear, confirm your understanding
|
||||
|
||||
**STEP 2 — After the user confirms, call set_output:**
|
||||
- set_output("research_brief", "Clear description of what to research")
|
||||
""",
|
||||
)
|
||||
|
||||
# Internal node runs without user interaction
|
||||
scanner_node = NodeSpec(
|
||||
id="scanner",
|
||||
name="Scanner",
|
||||
description="Scan the repository",
|
||||
research_node = NodeSpec(
|
||||
id="research",
|
||||
name="Research",
|
||||
description="Search and analyze sources",
|
||||
node_type="event_loop",
|
||||
input_keys=["repo_url"],
|
||||
output_keys=["scan_results"],
|
||||
system_prompt="Scan the repository at {repo_url}...",
|
||||
tools=["scan_github_repo"],
|
||||
input_keys=["research_brief"],
|
||||
output_keys=["findings", "sources"],
|
||||
system_prompt="Research the topic using web_search and web_scrape...",
|
||||
tools=["web_search", "web_scrape", "load_data", "save_data"],
|
||||
)
|
||||
```
|
||||
|
||||
@@ -91,6 +101,9 @@ scanner_node = NodeSpec(
|
||||
- User input is injected via `node.inject_event(text)`
|
||||
- When the LLM calls `set_output` to produce structured outputs, the judge evaluates and ACCEPTs
|
||||
- Internal nodes (non-client-facing) run their entire loop without blocking
|
||||
- `set_output` is a synthetic tool — a turn with only `set_output` calls (no real tools) triggers user input blocking
|
||||
|
||||
**STEP 1/STEP 2 pattern:** Always structure client-facing prompts with explicit phases. STEP 1 is text-only conversation. STEP 2 calls `set_output` after user confirmation. This prevents the LLM from calling `set_output` prematurely before the user responds.
|
||||
|
||||
### When to Use client_facing
|
||||
|
||||
@@ -160,6 +173,12 @@ EdgeSpec(
|
||||
|
||||
## Judge Patterns
|
||||
|
||||
**Core Principle: The judge is the SOLE mechanism for acceptance decisions.** Never add ad-hoc framework gating to compensate for LLM behavior. If the LLM calls `set_output` prematurely, fix the system prompt or use a custom judge. Anti-patterns to avoid:
|
||||
- Output rollback logic
|
||||
- `_user_has_responded` flags
|
||||
- Premature set_output rejection
|
||||
- Interaction protocol injection into system prompts
|
||||
|
||||
Judges control when an event_loop node's loop exits. Choose based on validation needs.
|
||||
|
||||
### Implicit Judge (Default)
|
||||
@@ -241,15 +260,34 @@ EventLoopNode automatically manages context window usage with tiered compaction:
|
||||
|
||||
### Spillover Pattern
|
||||
|
||||
For large tool results, use `save_data()` to write to disk and pass the filename through `set_output`. This keeps the LLM context window small.
|
||||
The framework automatically truncates large tool results and saves full content to a spillover directory. The LLM receives a truncation message with instructions to use `load_data` to read the full result.
|
||||
|
||||
```
|
||||
LLM calls save_data(filename, large_data) → file written to spillover/
|
||||
LLM calls set_output("results_file", filename) → filename stored in output
|
||||
Downstream node calls load_data(filename) → reads from spillover/
|
||||
For explicit data management, use the data tools (real MCP tools, not synthetic):
|
||||
|
||||
```python
|
||||
# save_data, load_data, list_data_files are real MCP tools
|
||||
# Each takes a data_dir parameter since the MCP server is shared
|
||||
|
||||
# Saving large results
|
||||
save_data(filename="sources.json", data=large_json_string, data_dir="/path/to/spillover")
|
||||
|
||||
# Reading with pagination (line-based offset/limit)
|
||||
load_data(filename="sources.json", data_dir="/path/to/spillover", offset=0, limit=50)
|
||||
|
||||
# Listing available files
|
||||
list_data_files(data_dir="/path/to/spillover")
|
||||
```
|
||||
|
||||
The `load_data()` tool supports `offset` and `limit` parameters for paginated reading of large files.
|
||||
Add data tools to nodes that handle large tool results:
|
||||
|
||||
```python
|
||||
research_node = NodeSpec(
|
||||
...
|
||||
tools=["web_search", "web_scrape", "load_data", "save_data", "list_data_files"],
|
||||
)
|
||||
```
|
||||
|
||||
The `data_dir` is passed by the framework (from the node's spillover directory). The LLM sees `data_dir` in truncation messages and uses it when calling `load_data`.
|
||||
|
||||
## Anti-Patterns
|
||||
|
||||
@@ -259,6 +297,29 @@ The `load_data()` tool supports `offset` and `limit` parameters for paginated re
|
||||
- **Don't hide code in session** — Write to files as components are approved
|
||||
- **Don't wait to write files** — Agent visible from first step
|
||||
- **Don't batch everything** — Write incrementally, one component at a time
|
||||
- **Don't create too many thin nodes** — Prefer fewer, richer nodes (see below)
|
||||
- **Don't add framework gating for LLM behavior** — Fix prompts or use judges instead
|
||||
|
||||
### Fewer, Richer Nodes
|
||||
|
||||
A common mistake is splitting work into too many small single-purpose nodes. Each node boundary requires serializing outputs, losing in-context information, and adding edge complexity.
|
||||
|
||||
| Bad (8 thin nodes) | Good (4 rich nodes) |
|
||||
|---------------------|---------------------|
|
||||
| parse-query | intake (client-facing) |
|
||||
| search-sources | research (search + fetch + analyze) |
|
||||
| fetch-content | review (client-facing) |
|
||||
| evaluate-sources | report (write + deliver) |
|
||||
| synthesize-findings | |
|
||||
| write-report | |
|
||||
| quality-check | |
|
||||
| save-report | |
|
||||
|
||||
**Why fewer nodes are better:**
|
||||
- The LLM retains full context of its work within a single node
|
||||
- A research node that searches, fetches, and analyzes keeps all source material in its conversation history
|
||||
- Fewer edges means simpler graph and fewer failure points
|
||||
- Data tools (`save_data`/`load_data`) handle context window limits within a single node
|
||||
|
||||
### MCP Tools - Correct Usage
|
||||
|
||||
|
||||
@@ -144,19 +144,19 @@ class EventLoopNode(NodeProtocol):
|
||||
1. Try to restore from durable state (crash recovery)
|
||||
2. If no prior state, init from NodeSpec.system_prompt + input_keys
|
||||
3. Loop: drain injection queue -> stream LLM -> execute tools
|
||||
-> if client_facing + no tools: block for user input (inject_event)
|
||||
-> if not client_facing or tools present: judge evaluates
|
||||
-> if client_facing + no real tools: block for user input
|
||||
-> judge evaluates (acceptance criteria)
|
||||
(each add_* and set_output writes through to store immediately)
|
||||
4. Publish events to EventBus at each stage
|
||||
5. Write cursor after each iteration
|
||||
6. Terminate when judge returns ACCEPT, shutdown signaled, or max iterations
|
||||
7. Build output dict from OutputAccumulator
|
||||
|
||||
Client-facing blocking: When ``client_facing=True`` and the LLM produces
|
||||
text without tool calls (a natural conversational turn), the node blocks
|
||||
via ``_await_user_input()`` until ``inject_event()`` or ``signal_shutdown()``
|
||||
is called. This separates blocking (node concern) from output evaluation
|
||||
(judge concern).
|
||||
Client-facing blocking: When ``client_facing=True`` and the LLM finishes
|
||||
without real tool calls (stop_reason != tool_call), the node blocks via
|
||||
``_await_user_input()`` until ``inject_event()`` or ``signal_shutdown()``
|
||||
is called. After user input, the judge evaluates — the judge is the
|
||||
sole mechanism for acceptance decisions.
|
||||
|
||||
Always returns NodeResult with retryable=False semantics. The executor
|
||||
must NOT retry event loop nodes -- retry is handled internally by the
|
||||
@@ -212,8 +212,10 @@ class EventLoopNode(NodeProtocol):
|
||||
# 2. Restore or create new conversation + accumulator
|
||||
conversation, accumulator, start_iteration = await self._restore(ctx)
|
||||
if conversation is None:
|
||||
system_prompt = ctx.node_spec.system_prompt or ""
|
||||
|
||||
conversation = NodeConversation(
|
||||
system_prompt=ctx.node_spec.system_prompt or "",
|
||||
system_prompt=system_prompt,
|
||||
max_history_tokens=self._config.max_history_tokens,
|
||||
output_keys=ctx.node_spec.output_keys or None,
|
||||
store=self._conversation_store,
|
||||
@@ -276,15 +278,20 @@ class EventLoopNode(NodeProtocol):
|
||||
iteration,
|
||||
len(conversation.messages),
|
||||
)
|
||||
assistant_text, tool_results_list, turn_tokens = await self._run_single_turn(
|
||||
ctx, conversation, tools, iteration, accumulator
|
||||
)
|
||||
(
|
||||
assistant_text,
|
||||
real_tool_results,
|
||||
outputs_set,
|
||||
turn_tokens,
|
||||
) = await self._run_single_turn(ctx, conversation, tools, iteration, accumulator)
|
||||
logger.info(
|
||||
"[%s] iter=%d: LLM done — text=%d chars, tool_calls=%d, tokens=%s, accumulator=%s",
|
||||
"[%s] iter=%d: LLM done — text=%d chars, real_tools=%d, "
|
||||
"outputs_set=%s, tokens=%s, accumulator=%s",
|
||||
node_id,
|
||||
iteration,
|
||||
len(assistant_text),
|
||||
len(tool_results_list),
|
||||
len(real_tool_results),
|
||||
outputs_set or "[]",
|
||||
turn_tokens,
|
||||
{k: ("set" if v is not None else "None") for k, v in accumulator.to_dict().items()},
|
||||
)
|
||||
@@ -300,6 +307,31 @@ class EventLoopNode(NodeProtocol):
|
||||
if conversation.needs_compaction():
|
||||
await self._compact_tiered(ctx, conversation, accumulator)
|
||||
|
||||
# 6e'''. Empty response guard — if the LLM returned nothing
|
||||
# (no text, no real tools, no set_output) and all required
|
||||
# outputs are already set, accept immediately. This prevents
|
||||
# wasted iterations when the LLM has genuinely finished its
|
||||
# work (e.g. after calling set_output in a previous turn).
|
||||
truly_empty = not assistant_text and not real_tool_results and not outputs_set
|
||||
if truly_empty and accumulator is not None:
|
||||
missing = self._get_missing_output_keys(
|
||||
accumulator, ctx.node_spec.output_keys, ctx.node_spec.nullable_output_keys
|
||||
)
|
||||
if not missing:
|
||||
logger.info(
|
||||
"[%s] iter=%d: empty response but all outputs set — accepting",
|
||||
node_id,
|
||||
iteration,
|
||||
)
|
||||
await self._publish_loop_completed(stream_id, node_id, iteration + 1)
|
||||
latency_ms = int((time.time() - start_time) * 1000)
|
||||
return NodeResult(
|
||||
success=True,
|
||||
output=accumulator.to_dict(),
|
||||
tokens_used=total_input_tokens + total_output_tokens,
|
||||
latency_ms=latency_ms,
|
||||
)
|
||||
|
||||
# 6f. Stall detection
|
||||
recent_responses.append(assistant_text)
|
||||
if len(recent_responses) > self._config.stall_detection_threshold:
|
||||
@@ -321,18 +353,17 @@ class EventLoopNode(NodeProtocol):
|
||||
# 6g. Write cursor checkpoint
|
||||
await self._write_cursor(ctx, conversation, accumulator, iteration)
|
||||
|
||||
# 6h. Client-facing input wait
|
||||
logger.info(
|
||||
"[%s] iter=%d: 6h check — client_facing=%s, tool_results=%d",
|
||||
node_id,
|
||||
iteration,
|
||||
ctx.node_spec.client_facing,
|
||||
len(tool_results_list),
|
||||
)
|
||||
if ctx.node_spec.client_facing and not tool_results_list:
|
||||
# LLM finished speaking (no tool calls) on a client-facing node.
|
||||
# This is a conversational turn boundary: block for user input
|
||||
# instead of running the judge.
|
||||
# 6h. Client-facing input blocking
|
||||
#
|
||||
# For client_facing nodes, block for user input whenever the
|
||||
# LLM finishes without making real tool calls (i.e. the LLM's
|
||||
# stop_reason is not tool_call). set_output is separated from
|
||||
# real tools by _run_single_turn, so this correctly treats
|
||||
# set_output-only turns as conversational boundaries.
|
||||
#
|
||||
# After user input, always fall through to judge evaluation
|
||||
# (6i). The judge handles all acceptance decisions.
|
||||
if ctx.node_spec.client_facing and not real_tool_results:
|
||||
if self._shutdown:
|
||||
await self._publish_loop_completed(stream_id, node_id, iteration + 1)
|
||||
latency_ms = int((time.time() - start_time) * 1000)
|
||||
@@ -347,7 +378,6 @@ class EventLoopNode(NodeProtocol):
|
||||
got_input = await self._await_user_input(ctx)
|
||||
logger.info("[%s] iter=%d: unblocked, got_input=%s", node_id, iteration, got_input)
|
||||
if not got_input:
|
||||
# Shutdown signaled during wait
|
||||
await self._publish_loop_completed(stream_id, node_id, iteration + 1)
|
||||
latency_ms = int((time.time() - start_time) * 1000)
|
||||
return NodeResult(
|
||||
@@ -357,46 +387,13 @@ class EventLoopNode(NodeProtocol):
|
||||
latency_ms=latency_ms,
|
||||
)
|
||||
|
||||
# Clear stall detection — user input resets the conversation
|
||||
recent_responses.clear()
|
||||
|
||||
# For nodes with an explicit judge, fall through to judge
|
||||
# evaluation so the LLM gets structured feedback about missing
|
||||
# outputs (e.g. "Missing output keys: [...]"). Without this,
|
||||
# the LLM may generate text like "Ready to proceed!" without
|
||||
# ever calling set_output, and the judge feedback never reaches it.
|
||||
#
|
||||
# For nodes without a judge (HITL review/approval with all-
|
||||
# nullable keys), keep conversing UNLESS the LLM has already
|
||||
# set an output — in that case fall through to the implicit
|
||||
# judge which will ACCEPT and terminate the node.
|
||||
if self._judge is None:
|
||||
has_outputs = accumulator and any(
|
||||
v is not None for v in accumulator.to_dict().values()
|
||||
)
|
||||
if not has_outputs:
|
||||
logger.info(
|
||||
"[%s] iter=%d: no judge, no outputs, continuing",
|
||||
node_id,
|
||||
iteration,
|
||||
)
|
||||
continue
|
||||
logger.info(
|
||||
"[%s] iter=%d: no judge, outputs set — implicit judge",
|
||||
node_id,
|
||||
iteration,
|
||||
)
|
||||
else:
|
||||
logger.info(
|
||||
"[%s] iter=%d: has judge, falling through to 6i",
|
||||
node_id,
|
||||
iteration,
|
||||
)
|
||||
# Fall through to judge evaluation (6i)
|
||||
|
||||
# 6i. Judge evaluation
|
||||
should_judge = (
|
||||
(iteration + 1) % self._config.judge_every_n_turns == 0
|
||||
or not tool_results_list # no tool calls = natural stop
|
||||
or not real_tool_results # no real tool calls = natural stop
|
||||
)
|
||||
|
||||
logger.info("[%s] iter=%d: 6i should_judge=%s", node_id, iteration, should_judge)
|
||||
@@ -406,7 +403,7 @@ class EventLoopNode(NodeProtocol):
|
||||
conversation,
|
||||
accumulator,
|
||||
assistant_text,
|
||||
tool_results_list,
|
||||
real_tool_results,
|
||||
iteration,
|
||||
)
|
||||
fb_preview = (verdict.feedback or "")[:200]
|
||||
@@ -526,16 +523,24 @@ class EventLoopNode(NodeProtocol):
|
||||
tools: list[Tool],
|
||||
iteration: int,
|
||||
accumulator: OutputAccumulator,
|
||||
) -> tuple[str, list[dict], dict[str, int]]:
|
||||
) -> tuple[str, list[dict], list[str], dict[str, int]]:
|
||||
"""Run a single LLM turn with streaming and tool execution.
|
||||
|
||||
Returns (assistant_text, tool_results, token_counts).
|
||||
Returns (assistant_text, real_tool_results, outputs_set, token_counts).
|
||||
|
||||
``real_tool_results`` contains only results from actual tools (web_search,
|
||||
etc.), NOT from the synthetic ``set_output`` tool. ``outputs_set`` lists
|
||||
the output keys written via ``set_output`` during this turn. This
|
||||
separation lets the caller treat set_output as a framework concern
|
||||
rather than a tool-execution concern.
|
||||
"""
|
||||
stream_id = ctx.node_id
|
||||
node_id = ctx.node_id
|
||||
token_counts: dict[str, int] = {"input": 0, "output": 0}
|
||||
tool_call_count = 0
|
||||
final_text = ""
|
||||
# Track output keys set via set_output across all inner iterations
|
||||
outputs_set_this_turn: list[str] = []
|
||||
|
||||
# Inner tool loop: stream may produce tool calls requiring re-invocation
|
||||
while True:
|
||||
@@ -606,10 +611,10 @@ class EventLoopNode(NodeProtocol):
|
||||
|
||||
# If no tool calls, turn is complete
|
||||
if not tool_calls:
|
||||
return final_text, [], token_counts
|
||||
return final_text, [], outputs_set_this_turn, token_counts
|
||||
|
||||
# Execute tool calls
|
||||
tool_results: list[dict] = []
|
||||
# Execute tool calls — separate real tools from set_output
|
||||
real_tool_results: list[dict] = []
|
||||
limit_hit = False
|
||||
executed_in_batch = 0
|
||||
for tc in tool_calls:
|
||||
@@ -624,21 +629,21 @@ class EventLoopNode(NodeProtocol):
|
||||
stream_id, node_id, tc.tool_use_id, tc.tool_name, tc.tool_input
|
||||
)
|
||||
|
||||
# Handle set_output synthetic tool
|
||||
logger.info(
|
||||
"[%s] tool_call: %s(%s)",
|
||||
node_id,
|
||||
tc.tool_name,
|
||||
json.dumps(tc.tool_input)[:200],
|
||||
)
|
||||
|
||||
if tc.tool_name == "set_output":
|
||||
# --- Framework-level set_output handling ---
|
||||
result = self._handle_set_output(tc.tool_input, ctx.node_spec.output_keys)
|
||||
result = ToolResult(
|
||||
tool_use_id=tc.tool_use_id,
|
||||
content=result.content,
|
||||
is_error=result.is_error,
|
||||
)
|
||||
# Async write-through for set_output
|
||||
if not result.is_error:
|
||||
value = tc.tool_input["value"]
|
||||
# Parse JSON strings into native types so downstream
|
||||
@@ -652,26 +657,27 @@ class EventLoopNode(NodeProtocol):
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
pass
|
||||
await accumulator.set(tc.tool_input["key"], value)
|
||||
outputs_set_this_turn.append(tc.tool_input["key"])
|
||||
else:
|
||||
# Execute real tool
|
||||
# --- Real tool execution ---
|
||||
result = await self._execute_tool(tc)
|
||||
# Truncate large results to prevent context blowup
|
||||
result = self._truncate_tool_result(result, tc.tool_name)
|
||||
real_tool_results.append(
|
||||
{
|
||||
"tool_use_id": tc.tool_use_id,
|
||||
"tool_name": tc.tool_name,
|
||||
"content": result.content,
|
||||
"is_error": result.is_error,
|
||||
}
|
||||
)
|
||||
|
||||
# Record tool result in conversation (write-through)
|
||||
# Record tool result in conversation (both real and set_output
|
||||
# go into the conversation for LLM context continuity)
|
||||
await conversation.add_tool_result(
|
||||
tool_use_id=tc.tool_use_id,
|
||||
content=result.content,
|
||||
is_error=result.is_error,
|
||||
)
|
||||
tool_results.append(
|
||||
{
|
||||
"tool_use_id": tc.tool_use_id,
|
||||
"tool_name": tc.tool_name,
|
||||
"content": result.content,
|
||||
"is_error": result.is_error,
|
||||
}
|
||||
)
|
||||
|
||||
# Publish tool call completed
|
||||
await self._publish_tool_completed(
|
||||
@@ -708,7 +714,9 @@ class EventLoopNode(NodeProtocol):
|
||||
content=discard_msg,
|
||||
is_error=True,
|
||||
)
|
||||
tool_results.append(
|
||||
# Discarded calls go into real_tool_results so the
|
||||
# caller sees they were attempted (for judge context).
|
||||
real_tool_results.append(
|
||||
{
|
||||
"tool_use_id": tc.tool_use_id,
|
||||
"tool_name": tc.tool_name,
|
||||
@@ -716,9 +724,24 @@ class EventLoopNode(NodeProtocol):
|
||||
"is_error": True,
|
||||
}
|
||||
)
|
||||
# Prune old tool results NOW to prevent context bloat on the
|
||||
# next turn. The char-based token estimator underestimates
|
||||
# actual API tokens, so the standard compaction check in the
|
||||
# outer loop may not trigger in time.
|
||||
protect = max(2000, self._config.max_history_tokens // 12)
|
||||
pruned = await conversation.prune_old_tool_results(
|
||||
protect_tokens=protect,
|
||||
min_prune_tokens=max(1000, protect // 3),
|
||||
)
|
||||
if pruned > 0:
|
||||
logger.info(
|
||||
"Post-limit pruning: cleared %d old tool results (budget: %d)",
|
||||
pruned,
|
||||
self._config.max_history_tokens,
|
||||
)
|
||||
# Limit hit — return from this turn so the judge can
|
||||
# evaluate instead of looping back for another stream.
|
||||
return final_text, tool_results, token_counts
|
||||
return final_text, real_tool_results, outputs_set_this_turn, token_counts
|
||||
|
||||
# --- Mid-turn pruning: prevent context blowup within a single turn ---
|
||||
if conversation.usage_ratio() >= 0.6:
|
||||
@@ -1025,7 +1048,8 @@ class EventLoopNode(NodeProtocol):
|
||||
truncated = (
|
||||
f"[Result from {tool_name}: {len(result.content)} chars — "
|
||||
f"too large for context, saved to '{filename}'. "
|
||||
f"Use load_data('{filename}') to read the full result.]\n\n"
|
||||
f"Use load_data(filename='{filename}', data_dir='{spill_dir}') "
|
||||
f"to read the full result.]\n\n"
|
||||
f"Preview:\n{preview}…"
|
||||
)
|
||||
logger.info(
|
||||
@@ -1244,9 +1268,11 @@ class EventLoopNode(NodeProtocol):
|
||||
|
||||
# 5. Spillover files hint
|
||||
if self._config.spillover_dir:
|
||||
spill = self._config.spillover_dir
|
||||
parts.append(
|
||||
"NOTE: Large tool results were saved to files. "
|
||||
"Use load_data('<filename>') to read them."
|
||||
f"Use load_data(filename='<filename>', data_dir='{spill}') "
|
||||
"to read them."
|
||||
)
|
||||
|
||||
# 6. Tool call history (prevent re-calling tools)
|
||||
|
||||
@@ -14,6 +14,7 @@ import logging
|
||||
import warnings
|
||||
from collections.abc import Callable
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from framework.graph.edge import EdgeCondition, EdgeSpec, GraphSpec
|
||||
@@ -130,6 +131,7 @@ class GraphExecutor:
|
||||
parallel_config: ParallelExecutionConfig | None = None,
|
||||
event_bus: Any | None = None,
|
||||
stream_id: str = "",
|
||||
storage_path: str | Path | None = None,
|
||||
):
|
||||
"""
|
||||
Initialize the executor.
|
||||
@@ -146,6 +148,7 @@ class GraphExecutor:
|
||||
parallel_config: Configuration for parallel execution behavior
|
||||
event_bus: Optional event bus for emitting node lifecycle events
|
||||
stream_id: Stream ID for event correlation
|
||||
storage_path: Optional base path for conversation persistence
|
||||
"""
|
||||
self.runtime = runtime
|
||||
self.llm = llm
|
||||
@@ -157,6 +160,7 @@ class GraphExecutor:
|
||||
self.logger = logging.getLogger(__name__)
|
||||
self._event_bus = event_bus
|
||||
self._stream_id = stream_id
|
||||
self._storage_path = Path(storage_path) if storage_path else None
|
||||
|
||||
# Initialize output cleaner
|
||||
self.cleansing_config = cleansing_config or CleansingConfig()
|
||||
@@ -380,8 +384,16 @@ class GraphExecutor:
|
||||
)
|
||||
|
||||
if result.success:
|
||||
# Validate output before accepting it
|
||||
if result.output and node_spec.output_keys:
|
||||
# Validate output before accepting it.
|
||||
# Skip for event_loop nodes — their judge system is
|
||||
# the sole acceptance mechanism (see WP-8). Empty
|
||||
# strings and other flexible outputs are legitimate
|
||||
# for LLM-driven nodes that already passed the judge.
|
||||
if (
|
||||
result.output
|
||||
and node_spec.output_keys
|
||||
and node_spec.node_type != "event_loop"
|
||||
):
|
||||
validation = self.validator.validate_all(
|
||||
output=result.output,
|
||||
expected_keys=node_spec.output_keys,
|
||||
@@ -799,11 +811,43 @@ class GraphExecutor:
|
||||
)
|
||||
|
||||
if node_spec.node_type == "event_loop":
|
||||
# Event loop nodes must be pre-registered (like function nodes)
|
||||
raise RuntimeError(
|
||||
f"EventLoopNode '{node_spec.id}' not found in registry. "
|
||||
"Register it with executor.register_node() before execution."
|
||||
# Auto-create EventLoopNode with sensible defaults.
|
||||
# Custom configs can still be pre-registered via node_registry.
|
||||
from framework.graph.event_loop_node import EventLoopNode, LoopConfig
|
||||
|
||||
# Create a FileConversationStore if a storage path is available
|
||||
conv_store = None
|
||||
if self._storage_path:
|
||||
from framework.storage.conversation_store import FileConversationStore
|
||||
|
||||
store_path = self._storage_path / "conversations" / node_spec.id
|
||||
conv_store = FileConversationStore(base_path=store_path)
|
||||
|
||||
# Auto-configure spillover directory for large tool results.
|
||||
# When a tool result exceeds max_tool_result_chars, the full
|
||||
# content is written to spillover_dir and the agent gets a
|
||||
# truncated preview with instructions to use load_data().
|
||||
spillover = None
|
||||
if self._storage_path:
|
||||
spillover = str(self._storage_path / "data")
|
||||
|
||||
node = EventLoopNode(
|
||||
event_bus=self._event_bus,
|
||||
judge=None, # implicit judge: accept when output_keys are filled
|
||||
config=LoopConfig(
|
||||
max_iterations=100 if node_spec.client_facing else 50,
|
||||
max_tool_calls_per_turn=10,
|
||||
stall_detection_threshold=3,
|
||||
max_history_tokens=32000,
|
||||
max_tool_result_chars=3_000,
|
||||
spillover_dir=spillover,
|
||||
),
|
||||
tool_executor=self.tool_executor,
|
||||
conversation_store=conv_store,
|
||||
)
|
||||
# Cache so inject_event() is reachable for client-facing input
|
||||
self.node_registry[node_spec.id] = node
|
||||
return node
|
||||
|
||||
# Should never reach here due to validation above
|
||||
raise RuntimeError(f"Unhandled node type: {node_spec.node_type}")
|
||||
@@ -832,9 +876,12 @@ class GraphExecutor:
|
||||
source_node_name=current_node_spec.name if current_node_spec else current_node_id,
|
||||
target_node_name=target_node_spec.name if target_node_spec else edge.target,
|
||||
):
|
||||
# Validate and clean output before mapping inputs
|
||||
# Validate and clean output before mapping inputs.
|
||||
# Use full memory state (not just result.output) because
|
||||
# target input_keys may come from earlier nodes in the
|
||||
# graph, not only from the immediate source node.
|
||||
if self.cleansing_config.enabled and target_node_spec:
|
||||
output_to_validate = result.output
|
||||
output_to_validate = memory.read_all()
|
||||
|
||||
validation = self.output_cleaner.validate_output(
|
||||
output=output_to_validate,
|
||||
@@ -1030,10 +1077,13 @@ class GraphExecutor:
|
||||
branch.status = "running"
|
||||
|
||||
try:
|
||||
# Validate and clean output before mapping inputs (same as _follow_edges)
|
||||
# Validate and clean output before mapping inputs (same as _follow_edges).
|
||||
# Use full memory state since target input_keys may come
|
||||
# from earlier nodes, not just the immediate source.
|
||||
if self.cleansing_config.enabled and node_spec:
|
||||
mem_snapshot = memory.read_all()
|
||||
validation = self.output_cleaner.validate_output(
|
||||
output=source_result.output,
|
||||
output=mem_snapshot,
|
||||
source_node_id=source_node_spec.id if source_node_spec else "unknown",
|
||||
target_node_spec=node_spec,
|
||||
)
|
||||
@@ -1044,7 +1094,7 @@ class GraphExecutor:
|
||||
f"{branch.node_id}: {validation.errors}"
|
||||
)
|
||||
cleaned_output = self.output_cleaner.clean_output(
|
||||
output=source_result.output,
|
||||
output=mem_snapshot,
|
||||
source_node_id=source_node_spec.id if source_node_spec else "unknown",
|
||||
target_node_spec=node_spec,
|
||||
validation_errors=validation.errors,
|
||||
|
||||
@@ -144,8 +144,11 @@ class OutputCleaner:
|
||||
errors = []
|
||||
warnings = []
|
||||
|
||||
# Check 1: Required input keys present
|
||||
# Check 1: Required input keys present (skip nullable keys)
|
||||
nullable = set(getattr(target_node_spec, "nullable_output_keys", None) or [])
|
||||
for key in target_node_spec.input_keys:
|
||||
if key in nullable:
|
||||
continue
|
||||
if key not in output:
|
||||
errors.append(f"Missing required key: '{key}'")
|
||||
continue
|
||||
|
||||
@@ -572,17 +572,21 @@ class LiteLLMProvider(LLMProvider):
|
||||
# and we skip the retry path — nothing was yielded in vain.)
|
||||
has_content = accumulated_text or tool_calls_acc
|
||||
if not has_content and attempt < RATE_LIMIT_MAX_RETRIES:
|
||||
# If the conversation ends with an assistant message,
|
||||
# an empty stream is expected (nothing new to say).
|
||||
# Don't retry — just flush whatever we have.
|
||||
# If the conversation ends with an assistant or tool
|
||||
# message, an empty stream is expected — the LLM has
|
||||
# nothing new to say. Don't burn retries on this;
|
||||
# let the caller (EventLoopNode) decide what to do.
|
||||
# Typical case: client_facing node where the LLM set
|
||||
# all outputs via set_output tool calls, and the tool
|
||||
# results are the last messages.
|
||||
last_role = next(
|
||||
(m["role"] for m in reversed(full_messages) if m.get("role") != "system"),
|
||||
None,
|
||||
)
|
||||
if last_role == "assistant":
|
||||
if last_role in ("assistant", "tool"):
|
||||
logger.debug(
|
||||
"[stream] Empty response after assistant message — "
|
||||
"expected, not retrying."
|
||||
"[stream] Empty response after %s message — expected, not retrying.",
|
||||
last_role,
|
||||
)
|
||||
for event in tail_events:
|
||||
yield event
|
||||
|
||||
@@ -280,7 +280,16 @@ class ToolRegistry:
|
||||
return
|
||||
|
||||
base_dir = config_path.parent
|
||||
for server_config in config.get("servers", []):
|
||||
|
||||
# Support both formats:
|
||||
# {"servers": [{"name": "x", ...}]} (list format)
|
||||
# {"server-name": {"transport": ...}, ...} (dict format)
|
||||
server_list = config.get("servers", [])
|
||||
if not server_list and "servers" not in config:
|
||||
# Treat top-level keys as server names
|
||||
server_list = [{"name": name, **cfg} for name, cfg in config.items()]
|
||||
|
||||
for server_config in server_list:
|
||||
cwd = server_config.get("cwd")
|
||||
if cwd and not Path(cwd).is_absolute():
|
||||
server_config["cwd"] = str((base_dir / cwd).resolve())
|
||||
|
||||
@@ -296,6 +296,25 @@ class AgentRuntime:
|
||||
raise ValueError(f"Entry point '{entry_point_id}' not found")
|
||||
return await stream.wait_for_completion(exec_id, timeout)
|
||||
|
||||
async def inject_input(self, node_id: str, content: str) -> bool:
|
||||
"""Inject user input into a running client-facing node.
|
||||
|
||||
Routes input to the EventLoopNode identified by ``node_id``
|
||||
across all active streams. Used by the TUI ChatRepl to deliver
|
||||
user responses during client-facing node execution.
|
||||
|
||||
Args:
|
||||
node_id: The node currently waiting for input
|
||||
content: The user's input text
|
||||
|
||||
Returns:
|
||||
True if input was delivered, False if no matching node found
|
||||
"""
|
||||
for stream in self._streams.values():
|
||||
if await stream.inject_input(node_id, content):
|
||||
return True
|
||||
return False
|
||||
|
||||
async def get_goal_progress(self) -> dict[str, Any]:
|
||||
"""
|
||||
Evaluate goal progress across all streams.
|
||||
|
||||
@@ -153,6 +153,7 @@ class ExecutionStream:
|
||||
# Execution tracking
|
||||
self._active_executions: dict[str, ExecutionContext] = {}
|
||||
self._execution_tasks: dict[str, asyncio.Task] = {}
|
||||
self._active_executors: dict[str, GraphExecutor] = {}
|
||||
self._execution_results: OrderedDict[str, ExecutionResult] = OrderedDict()
|
||||
self._execution_result_times: dict[str, float] = {}
|
||||
self._completion_events: dict[str, asyncio.Event] = {}
|
||||
@@ -237,6 +238,21 @@ class ExecutionStream:
|
||||
)
|
||||
)
|
||||
|
||||
async def inject_input(self, node_id: str, content: str) -> bool:
|
||||
"""Inject user input into a running client-facing EventLoopNode.
|
||||
|
||||
Searches active executors for a node matching ``node_id`` and calls
|
||||
its ``inject_event()`` method to unblock ``_await_user_input()``.
|
||||
|
||||
Returns True if input was delivered, False otherwise.
|
||||
"""
|
||||
for executor in self._active_executors.values():
|
||||
node = executor.node_registry.get(node_id)
|
||||
if node is not None and hasattr(node, "inject_event"):
|
||||
await node.inject_event(content)
|
||||
return True
|
||||
return False
|
||||
|
||||
async def execute(
|
||||
self,
|
||||
input_data: dict[str, Any],
|
||||
@@ -314,7 +330,10 @@ class ExecutionStream:
|
||||
# Create runtime adapter for this execution
|
||||
runtime_adapter = StreamRuntimeAdapter(self._runtime, execution_id)
|
||||
|
||||
# Create executor for this execution
|
||||
# Create executor for this execution.
|
||||
# Scope storage by execution_id so each execution gets
|
||||
# fresh conversations and spillover directories.
|
||||
exec_storage = self._storage.base_path / "sessions" / execution_id
|
||||
executor = GraphExecutor(
|
||||
runtime=runtime_adapter,
|
||||
llm=self._llm,
|
||||
@@ -322,7 +341,10 @@ class ExecutionStream:
|
||||
tool_executor=self._tool_executor,
|
||||
event_bus=self._event_bus,
|
||||
stream_id=self.stream_id,
|
||||
storage_path=exec_storage,
|
||||
)
|
||||
# Track executor so inject_input() can reach EventLoopNode instances
|
||||
self._active_executors[execution_id] = executor
|
||||
|
||||
# Create modified graph with entry point
|
||||
# We need to override the entry_node to use our entry point
|
||||
@@ -336,6 +358,9 @@ class ExecutionStream:
|
||||
session_state=ctx.session_state,
|
||||
)
|
||||
|
||||
# Clean up executor reference
|
||||
self._active_executors.pop(execution_id, None)
|
||||
|
||||
# Store result with retention
|
||||
self._record_execution_result(execution_id, result)
|
||||
|
||||
|
||||
@@ -372,7 +372,9 @@ class AdenTUI(App):
|
||||
elif et == EventType.EXECUTION_FAILED:
|
||||
self.chat_repl.handle_execution_failed(event.data.get("error", "Unknown error"))
|
||||
elif et == EventType.CLIENT_INPUT_REQUESTED:
|
||||
self.chat_repl.handle_input_requested(event.data.get("prompt", ""))
|
||||
self.chat_repl.handle_input_requested(
|
||||
event.node_id or event.data.get("node_id", ""),
|
||||
)
|
||||
|
||||
# --- Graph view events ---
|
||||
if et in (
|
||||
|
||||
@@ -6,6 +6,12 @@ Streaming display approach:
|
||||
(Label.update() replaces text in-place, unlike RichLog which is append-only).
|
||||
- On EXECUTION_COMPLETED, the final output is written to RichLog as permanent history.
|
||||
- Tool events are written directly to RichLog as discrete status lines.
|
||||
|
||||
Client-facing input:
|
||||
- When a client_facing=True EventLoopNode emits CLIENT_INPUT_REQUESTED, the
|
||||
ChatRepl transitions to "waiting for input" state: input is re-enabled and
|
||||
subsequent submissions are routed to runtime.inject_input() instead of
|
||||
starting a new execution.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
@@ -66,6 +72,8 @@ class ChatRepl(Vertical):
|
||||
self.runtime = runtime
|
||||
self._current_exec_id: str | None = None
|
||||
self._streaming_snapshot: str = ""
|
||||
self._waiting_for_input: bool = False
|
||||
self._input_node_id: str | None = None
|
||||
|
||||
# Dedicated event loop for agent execution.
|
||||
# Keeps blocking runtime code (LLM calls, MCP tools) off
|
||||
@@ -97,11 +105,38 @@ class ChatRepl(Vertical):
|
||||
history.write("[bold cyan]Chat REPL Ready[/bold cyan] — Type your input below\n")
|
||||
|
||||
async def on_input_submitted(self, message: Input.Submitted) -> None:
|
||||
"""Handle input submission — fire-and-forget via trigger()."""
|
||||
"""Handle input submission — either start new execution or inject input."""
|
||||
user_input = message.value.strip()
|
||||
if not user_input:
|
||||
return
|
||||
|
||||
# Client-facing input: route to the waiting node
|
||||
if self._waiting_for_input and self._input_node_id:
|
||||
self._write_history(f"[bold green]You:[/bold green] {user_input}")
|
||||
message.input.value = ""
|
||||
|
||||
# Disable input while agent processes the response
|
||||
chat_input = self.query_one("#chat-input", Input)
|
||||
chat_input.disabled = True
|
||||
chat_input.placeholder = "Enter input for agent..."
|
||||
self._waiting_for_input = False
|
||||
|
||||
indicator = self.query_one("#processing-indicator", Label)
|
||||
indicator.update("Thinking...")
|
||||
|
||||
node_id = self._input_node_id
|
||||
self._input_node_id = None
|
||||
|
||||
try:
|
||||
future = asyncio.run_coroutine_threadsafe(
|
||||
self.runtime.inject_input(node_id, user_input),
|
||||
self._agent_loop,
|
||||
)
|
||||
await asyncio.wrap_future(future)
|
||||
except Exception as e:
|
||||
self._write_history(f"[bold red]Error delivering input:[/bold red] {e}")
|
||||
return
|
||||
|
||||
# Double-submit guard: reject input while an execution is in-flight
|
||||
if self._current_exec_id is not None:
|
||||
self._write_history("[dim]Agent is still running — please wait.[/dim]")
|
||||
@@ -206,17 +241,23 @@ class ChatRepl(Vertical):
|
||||
indicator = self.query_one("#processing-indicator", Label)
|
||||
indicator.display = False
|
||||
|
||||
# Write the final output to permanent history
|
||||
output_str = str(output.get("output_string", output))
|
||||
self._write_history(f"[bold blue]Agent:[/bold blue] {output_str}")
|
||||
# Write the final streaming snapshot to permanent history (if any)
|
||||
if self._streaming_snapshot:
|
||||
self._write_history(f"[bold blue]Agent:[/bold blue] {self._streaming_snapshot}")
|
||||
else:
|
||||
output_str = str(output.get("output_string", output))
|
||||
self._write_history(f"[bold blue]Agent:[/bold blue] {output_str}")
|
||||
self._write_history("") # separator
|
||||
|
||||
self._current_exec_id = None
|
||||
self._streaming_snapshot = ""
|
||||
self._waiting_for_input = False
|
||||
self._input_node_id = None
|
||||
|
||||
# Re-enable input
|
||||
chat_input = self.query_one("#chat-input", Input)
|
||||
chat_input.disabled = False
|
||||
chat_input.placeholder = "Enter input for agent..."
|
||||
chat_input.focus()
|
||||
|
||||
def handle_execution_failed(self, error: str) -> None:
|
||||
@@ -229,17 +270,34 @@ class ChatRepl(Vertical):
|
||||
|
||||
self._current_exec_id = None
|
||||
self._streaming_snapshot = ""
|
||||
self._waiting_for_input = False
|
||||
self._input_node_id = None
|
||||
|
||||
# Re-enable input
|
||||
chat_input = self.query_one("#chat-input", Input)
|
||||
chat_input.disabled = False
|
||||
chat_input.placeholder = "Enter input for agent..."
|
||||
chat_input.focus()
|
||||
|
||||
def handle_input_requested(self, prompt: str) -> None:
|
||||
"""Handle the agent requesting input from the user."""
|
||||
def handle_input_requested(self, node_id: str) -> None:
|
||||
"""Handle a client-facing node requesting user input.
|
||||
|
||||
Transitions to 'waiting for input' state: flushes the current
|
||||
streaming snapshot to history, re-enables the input widget,
|
||||
and sets a flag so the next submission routes to inject_input().
|
||||
"""
|
||||
# Flush accumulated streaming text as agent output
|
||||
if self._streaming_snapshot:
|
||||
self._write_history(f"[bold blue]Agent:[/bold blue] {self._streaming_snapshot}")
|
||||
self._streaming_snapshot = ""
|
||||
|
||||
self._waiting_for_input = True
|
||||
self._input_node_id = node_id or None
|
||||
|
||||
indicator = self.query_one("#processing-indicator", Label)
|
||||
indicator.update(f"Input requested: {prompt}")
|
||||
indicator.update("Waiting for your input...")
|
||||
|
||||
chat_input = self.query_one("#chat-input", Input)
|
||||
chat_input.placeholder = prompt or "Agent is waiting for input..."
|
||||
chat_input.disabled = False
|
||||
chat_input.placeholder = "Type your response..."
|
||||
chat_input.focus()
|
||||
|
||||
@@ -26,6 +26,7 @@ from .email_tool import register_tools as register_email
|
||||
from .example_tool import register_tools as register_example
|
||||
from .file_system_toolkits.apply_diff import register_tools as register_apply_diff
|
||||
from .file_system_toolkits.apply_patch import register_tools as register_apply_patch
|
||||
from .file_system_toolkits.data_tools import register_tools as register_data_tools
|
||||
from .file_system_toolkits.execute_command_tool import (
|
||||
register_tools as register_execute_command,
|
||||
)
|
||||
@@ -82,6 +83,7 @@ def register_all_tools(
|
||||
register_apply_patch(mcp)
|
||||
register_grep_search(mcp)
|
||||
register_execute_command(mcp)
|
||||
register_data_tools(mcp)
|
||||
register_csv(mcp)
|
||||
|
||||
return [
|
||||
@@ -97,6 +99,9 @@ def register_all_tools(
|
||||
"apply_patch",
|
||||
"grep_search",
|
||||
"execute_command_tool",
|
||||
"load_data",
|
||||
"save_data",
|
||||
"list_data_files",
|
||||
"csv_read",
|
||||
"csv_write",
|
||||
"csv_append",
|
||||
|
||||
@@ -0,0 +1,3 @@
|
||||
from .data_tools import register_tools
|
||||
|
||||
__all__ = ["register_tools"]
|
||||
@@ -0,0 +1,179 @@
|
||||
"""
|
||||
Data Tools - Load, save, and list data files for agent pipelines.
|
||||
|
||||
These tools let agents store large intermediate results in files and
|
||||
retrieve them with pagination, keeping the LLM conversation context small.
|
||||
Used in conjunction with the spillover system: when a tool result is too
|
||||
large, the framework writes it to a file and the agent can load it back
|
||||
with load_data().
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from pathlib import Path
|
||||
|
||||
from mcp.server.fastmcp import FastMCP
|
||||
|
||||
|
||||
def register_tools(mcp: FastMCP) -> None:
|
||||
"""Register data management tools with the MCP server."""
|
||||
|
||||
@mcp.tool()
|
||||
def save_data(filename: str, data: str, data_dir: str) -> dict:
|
||||
"""
|
||||
Purpose
|
||||
Save data to a file for later retrieval by this or downstream nodes.
|
||||
|
||||
When to use
|
||||
Store large results (search results, profiles, analysis) instead
|
||||
of passing them inline through set_output.
|
||||
Returns a brief summary with the filename to reference later.
|
||||
|
||||
Rules & Constraints
|
||||
filename must be a simple name like 'results.json' — no paths or '..'
|
||||
data_dir must be the absolute path to the data directory
|
||||
|
||||
Args:
|
||||
filename: Simple filename like 'github_users.json'. No paths or '..'.
|
||||
data: The string data to write (typically JSON).
|
||||
data_dir: Absolute path to the data directory.
|
||||
|
||||
Returns:
|
||||
Dict with success status and file metadata, or error dict
|
||||
"""
|
||||
if not filename or ".." in filename or "/" in filename or "\\" in filename:
|
||||
return {"error": "Invalid filename. Use simple names like 'users.json'"}
|
||||
if not data_dir:
|
||||
return {"error": "data_dir is required"}
|
||||
|
||||
try:
|
||||
dir_path = Path(data_dir)
|
||||
dir_path.mkdir(parents=True, exist_ok=True)
|
||||
path = dir_path / filename
|
||||
path.write_text(data, encoding="utf-8")
|
||||
lines = data.count("\n") + 1
|
||||
return {
|
||||
"success": True,
|
||||
"filename": filename,
|
||||
"size_bytes": len(data.encode("utf-8")),
|
||||
"lines": lines,
|
||||
"preview": data[:200] + ("..." if len(data) > 200 else ""),
|
||||
}
|
||||
except Exception as e:
|
||||
return {"error": f"Failed to save data: {str(e)}"}
|
||||
|
||||
@mcp.tool()
|
||||
def load_data(
|
||||
filename: str,
|
||||
data_dir: str,
|
||||
offset: int = 0,
|
||||
limit: int = 50,
|
||||
) -> dict:
|
||||
"""
|
||||
Purpose
|
||||
Load data from a previously saved file with pagination.
|
||||
|
||||
When to use
|
||||
Retrieve large tool results that were spilled to disk.
|
||||
Read data saved by save_data or by the spillover system.
|
||||
Page through large files without loading everything into context.
|
||||
|
||||
Rules & Constraints
|
||||
filename must match a file in data_dir
|
||||
Returns a page of lines with metadata about the full file
|
||||
|
||||
Args:
|
||||
filename: The filename to load (as shown in spillover messages or save_data results).
|
||||
data_dir: Absolute path to the data directory.
|
||||
offset: 0-based line number to start reading from. Default 0.
|
||||
limit: Max number of lines to return. Default 50.
|
||||
|
||||
Returns:
|
||||
Dict with content, pagination info, and metadata
|
||||
|
||||
Examples:
|
||||
load_data('users.json', '/path/to/data') # first 50 lines
|
||||
load_data('users.json', '/path/to/data', offset=50, limit=50) # next 50
|
||||
load_data('users.json', '/path/to/data', limit=200) # first 200 lines
|
||||
"""
|
||||
if not filename or ".." in filename or "/" in filename or "\\" in filename:
|
||||
return {"error": "Invalid filename"}
|
||||
if not data_dir:
|
||||
return {"error": "data_dir is required"}
|
||||
|
||||
try:
|
||||
offset = int(offset)
|
||||
limit = int(limit)
|
||||
path = Path(data_dir) / filename
|
||||
if not path.exists():
|
||||
return {"error": f"File not found: {filename}"}
|
||||
|
||||
content = path.read_text(encoding="utf-8")
|
||||
size_bytes = len(content.encode("utf-8"))
|
||||
|
||||
# If content is a single long line, try to pretty-print JSON so
|
||||
# line-based pagination actually works.
|
||||
all_lines = content.split("\n")
|
||||
if len(all_lines) <= 2 and size_bytes > 500:
|
||||
try:
|
||||
parsed = json.loads(content)
|
||||
content = json.dumps(parsed, indent=2, ensure_ascii=False)
|
||||
all_lines = content.split("\n")
|
||||
except (json.JSONDecodeError, TypeError, ValueError):
|
||||
pass
|
||||
|
||||
total = len(all_lines)
|
||||
start = min(offset, total)
|
||||
end = min(start + limit, total)
|
||||
sliced = all_lines[start:end]
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"filename": filename,
|
||||
"content": "\n".join(sliced),
|
||||
"total_lines": total,
|
||||
"size_bytes": size_bytes,
|
||||
"offset": start,
|
||||
"lines_returned": len(sliced),
|
||||
"has_more": end < total,
|
||||
}
|
||||
except Exception as e:
|
||||
return {"error": f"Failed to load data: {str(e)}"}
|
||||
|
||||
@mcp.tool()
|
||||
def list_data_files(data_dir: str) -> dict:
|
||||
"""
|
||||
Purpose
|
||||
List all data files in the data directory.
|
||||
|
||||
When to use
|
||||
Discover what intermediate results or spillover files are available.
|
||||
Check what data was saved by previous nodes in the pipeline.
|
||||
|
||||
Args:
|
||||
data_dir: Absolute path to the data directory.
|
||||
|
||||
Returns:
|
||||
Dict with list of files and their sizes
|
||||
"""
|
||||
if not data_dir:
|
||||
return {"error": "data_dir is required"}
|
||||
|
||||
try:
|
||||
dir_path = Path(data_dir)
|
||||
if not dir_path.exists():
|
||||
return {"files": []}
|
||||
|
||||
files = []
|
||||
for f in sorted(dir_path.iterdir()):
|
||||
if f.is_file():
|
||||
files.append(
|
||||
{
|
||||
"filename": f.name,
|
||||
"size_bytes": f.stat().st_size,
|
||||
}
|
||||
)
|
||||
return {"files": files}
|
||||
except Exception as e:
|
||||
return {"error": f"Failed to list data files: {str(e)}"}
|
||||
@@ -769,6 +769,11 @@ dependencies = [
|
||||
{ name = "tools" },
|
||||
]
|
||||
|
||||
[package.optional-dependencies]
|
||||
tui = [
|
||||
{ name = "textual" },
|
||||
]
|
||||
|
||||
[package.dev-dependencies]
|
||||
dev = [
|
||||
{ name = "ruff" },
|
||||
@@ -786,8 +791,10 @@ requires-dist = [
|
||||
{ name = "pytest", specifier = ">=8.0" },
|
||||
{ name = "pytest-asyncio", specifier = ">=0.23" },
|
||||
{ name = "pytest-xdist", specifier = ">=3.0" },
|
||||
{ name = "textual", marker = "extra == 'tui'", specifier = ">=0.75.0" },
|
||||
{ name = "tools", editable = "tools" },
|
||||
]
|
||||
provides-extras = ["tui"]
|
||||
|
||||
[package.metadata.requires-dev]
|
||||
dev = [
|
||||
@@ -1310,6 +1317,18 @@ wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/81/db/e655086b7f3a705df045bf0933bdd9c2f79bb3c97bfef1384598bb79a217/keyring-25.7.0-py3-none-any.whl", hash = "sha256:be4a0b195f149690c166e850609a477c532ddbfbaed96a404d4e43f8d5e2689f", size = 39160, upload-time = "2025-11-16T16:26:08.402Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "linkify-it-py"
|
||||
version = "2.0.3"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
dependencies = [
|
||||
{ name = "uc-micro-py" },
|
||||
]
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/2a/ae/bb56c6828e4797ba5a4821eec7c43b8bf40f69cda4d4f5f8c8a2810ec96a/linkify-it-py-2.0.3.tar.gz", hash = "sha256:68cda27e162e9215c17d786649d1da0021a451bdc436ef9e0fa0ba5234b9b048", size = 27946, upload-time = "2024-02-04T14:48:04.179Z" }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/04/1e/b832de447dee8b582cac175871d2f6c3d5077cc56d5575cadba1fd1cccfa/linkify_it_py-2.0.3-py3-none-any.whl", hash = "sha256:6bcbc417b0ac14323382aef5c5192c0075bf8a9d6b41820a2b66371eac6b6d79", size = 19820, upload-time = "2024-02-04T14:48:02.496Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "litellm"
|
||||
version = "1.81.7"
|
||||
@@ -1408,6 +1427,11 @@ wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/94/54/e7d793b573f298e1c9013b8c4dade17d481164aa517d1d7148619c2cedbf/markdown_it_py-4.0.0-py3-none-any.whl", hash = "sha256:87327c59b172c5011896038353a81343b6754500a08cd7a4973bb48c6d578147", size = 87321, upload-time = "2025-08-11T12:57:51.923Z" },
|
||||
]
|
||||
|
||||
[package.optional-dependencies]
|
||||
linkify = [
|
||||
{ name = "linkify-it-py" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "markupsafe"
|
||||
version = "3.0.3"
|
||||
@@ -1507,6 +1531,18 @@ wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/fd/d9/eaa1f80170d2b7c5ba23f3b59f766f3a0bb41155fbc32a69adfa1adaaef9/mcp-1.26.0-py3-none-any.whl", hash = "sha256:904a21c33c25aa98ddbeb47273033c435e595bbacfdb177f4bd87f6dceebe1ca", size = 233615, upload-time = "2026-01-24T19:40:30.652Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "mdit-py-plugins"
|
||||
version = "0.5.0"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
dependencies = [
|
||||
{ name = "markdown-it-py" },
|
||||
]
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/b2/fd/a756d36c0bfba5f6e39a1cdbdbfdd448dc02692467d83816dff4592a1ebc/mdit_py_plugins-0.5.0.tar.gz", hash = "sha256:f4918cb50119f50446560513a8e311d574ff6aaed72606ddae6d35716fe809c6", size = 44655, upload-time = "2025-08-11T07:25:49.083Z" }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/fb/86/dd6e5db36df29e76c7a7699123569a4a18c1623ce68d826ed96c62643cae/mdit_py_plugins-0.5.0-py3-none-any.whl", hash = "sha256:07a08422fc1936a5d26d146759e9155ea466e842f5ab2f7d2266dd084c8dab1f", size = 57205, upload-time = "2025-08-11T07:25:47.597Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "mdurl"
|
||||
version = "0.1.2"
|
||||
@@ -2957,6 +2993,23 @@ wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/81/0d/13d1d239a25cbfb19e740db83143e95c772a1fe10202dda4b76792b114dd/starlette-0.52.1-py3-none-any.whl", hash = "sha256:0029d43eb3d273bc4f83a08720b4912ea4b071087a3b48db01b7c839f7954d74", size = 74272, upload-time = "2026-01-18T13:34:09.188Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "textual"
|
||||
version = "7.5.0"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
dependencies = [
|
||||
{ name = "markdown-it-py", extra = ["linkify"] },
|
||||
{ name = "mdit-py-plugins" },
|
||||
{ name = "platformdirs" },
|
||||
{ name = "pygments" },
|
||||
{ name = "rich" },
|
||||
{ name = "typing-extensions" },
|
||||
]
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/9f/38/7d169a765993efde5095c70a668bf4f5831bb7ac099e932f2783e9b71abf/textual-7.5.0.tar.gz", hash = "sha256:c730cba1e3d704e8f1ca915b6a3af01451e3bca380114baacf6abf87e9dac8b6", size = 1592319, upload-time = "2026-01-30T13:46:39.881Z" }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/9c/78/96ddb99933e11d91bc6e05edae23d2687e44213066bcbaca338898c73c47/textual-7.5.0-py3-none-any.whl", hash = "sha256:849dfee9d705eab3b2d07b33152b7bd74fb1f5056e002873cc448bce500c6374", size = 718164, upload-time = "2026-01-30T13:46:37.635Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tiktoken"
|
||||
version = "0.12.0"
|
||||
@@ -3215,6 +3268,15 @@ wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/c7/b0/003792df09decd6849a5e39c28b513c06e84436a54440380862b5aeff25d/tzdata-2025.3-py2.py3-none-any.whl", hash = "sha256:06a47e5700f3081aab02b2e513160914ff0694bce9947d6b76ebd6bf57cfc5d1", size = 348521, upload-time = "2025-12-13T17:45:33.889Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "uc-micro-py"
|
||||
version = "1.0.3"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/91/7a/146a99696aee0609e3712f2b44c6274566bc368dfe8375191278045186b8/uc-micro-py-1.0.3.tar.gz", hash = "sha256:d321b92cff673ec58027c04015fcaa8bb1e005478643ff4a500882eaab88c48a", size = 6043, upload-time = "2024-02-09T16:52:01.654Z" }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/37/87/1f677586e8ac487e29672e4b17455758fce261de06a0d086167bb760361a/uc_micro_py-1.0.3-py3-none-any.whl", hash = "sha256:db1dffff340817673d7b466ec86114a9dc0e9d4d9b5ba229d9d60e5c12600cd5", size = 6229, upload-time = "2024-02-09T16:52:00.371Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "urllib3"
|
||||
version = "2.6.3"
|
||||
|
||||
Reference in New Issue
Block a user