From 3e2a11d60d4d0981225558b298bad7326468db0d Mon Sep 17 00:00:00 2001 From: Timothy Date: Wed, 4 Feb 2026 14:47:28 -0800 Subject: [PATCH] feat: integrate agent builder with tui --- .../building-agents-construction/SKILL.md | 76 +++--- .../examples/deep_research_agent/__init__.py | 24 ++ .../__main__.py | 114 +++++++-- .../agent.py | 199 ++++++--------- .../config.py | 9 +- .../mcp_servers.json | 0 .../deep_research_agent/nodes/__init__.py | 147 +++++++++++ .../examples/online_research_agent/README.md | 80 ------ .../online_research_agent/__init__.py | 23 -- .../online_research_agent/nodes/__init__.py | 232 ------------------ .claude/skills/building-agents-core/SKILL.md | 92 +++++-- .../skills/building-agents-patterns/SKILL.md | 97 ++++++-- core/framework/graph/event_loop_node.py | 194 ++++++++------- core/framework/graph/executor.py | 72 +++++- core/framework/graph/output_cleaner.py | 5 +- core/framework/llm/litellm.py | 16 +- core/framework/runner/tool_registry.py | 11 +- core/framework/runtime/agent_runtime.py | 19 ++ core/framework/runtime/execution_stream.py | 27 +- core/framework/tui/app.py | 4 +- core/framework/tui/widgets/chat_repl.py | 74 +++++- tools/src/aden_tools/tools/__init__.py | 5 + .../data_tools/__init__.py | 3 + .../data_tools/data_tools.py | 179 ++++++++++++++ uv.lock | 62 +++++ 25 files changed, 1082 insertions(+), 682 deletions(-) create mode 100644 .claude/skills/building-agents-construction/examples/deep_research_agent/__init__.py rename .claude/skills/building-agents-construction/examples/{online_research_agent => deep_research_agent}/__main__.py (53%) rename .claude/skills/building-agents-construction/examples/{online_research_agent => deep_research_agent}/agent.py (59%) rename .claude/skills/building-agents-construction/examples/{online_research_agent => deep_research_agent}/config.py (76%) rename .claude/skills/building-agents-construction/examples/{online_research_agent => deep_research_agent}/mcp_servers.json (100%) create mode 100644 .claude/skills/building-agents-construction/examples/deep_research_agent/nodes/__init__.py delete mode 100644 .claude/skills/building-agents-construction/examples/online_research_agent/README.md delete mode 100644 .claude/skills/building-agents-construction/examples/online_research_agent/__init__.py delete mode 100644 .claude/skills/building-agents-construction/examples/online_research_agent/nodes/__init__.py create mode 100644 tools/src/aden_tools/tools/file_system_toolkits/data_tools/__init__.py create mode 100644 tools/src/aden_tools/tools/file_system_toolkits/data_tools/data_tools.py diff --git a/.claude/skills/building-agents-construction/SKILL.md b/.claude/skills/building-agents-construction/SKILL.md index f8c29e0b..ed446e01 100644 --- a/.claude/skills/building-agents-construction/SKILL.md +++ b/.claude/skills/building-agents-construction/SKILL.md @@ -267,7 +267,7 @@ This returns JSON with all the goal, nodes, edges, and MCP server configurations - NOT: `{"first-node-id": ["input_keys"]}` (WRONG) - NOT: `{"first-node-id"}` (WRONG - this is a set) -**Use the example agent** at `.claude/skills/building-agents-construction/examples/online_research_agent/` as a template for file structure and patterns. +**Use the example agent** at `.claude/skills/building-agents-construction/examples/deep_research_agent/` as a template for file structure and patterns. It demonstrates: STEP 1/STEP 2 prompts, client-facing nodes, feedback loops, nullable_output_keys, and data tools. **AFTER writing all files, tell the user:** @@ -354,7 +354,7 @@ mcp__agent-builder__get_session_status() ## REFERENCE: System Prompt Best Practice -For event_loop nodes, instruct the LLM to use `set_output` for structured outputs: +For **internal** event_loop nodes (not client-facing), instruct the LLM to use `set_output`: ``` Use set_output(key, value) to store your results. For example: @@ -363,71 +363,55 @@ Use set_output(key, value) to store your results. For example: Do NOT return raw JSON. Use the set_output tool to produce outputs. ``` +For **client-facing** event_loop nodes, use the STEP 1/STEP 2 pattern: + +``` +**STEP 1 — Respond to the user (text only, NO tool calls):** +[Present information, ask questions, etc.] + +**STEP 2 — After the user responds, call set_output:** +- set_output("key", "value based on user's response") +``` + +This prevents the LLM from calling `set_output` before the user has had a chance to respond. The "NO tool calls" instruction in STEP 1 ensures the node blocks for user input before proceeding. + --- -## CRITICAL: EventLoopNode Registration +## EventLoopNode Runtime -**`AgentRuntime` does NOT support `event_loop` nodes.** The `AgentRuntime` / `create_agent_runtime()` path creates `GraphExecutor` instances internally without passing a `node_registry`, causing all `event_loop` nodes to fail at runtime with: - -``` -EventLoopNode 'node-id' not found in registry. Register it with executor.register_node() before execution. -``` - -**The correct pattern**: Use `GraphExecutor` directly with a `node_registry` dict containing `EventLoopNode` instances: +EventLoopNodes are **auto-created** by `GraphExecutor` at runtime. Both direct `GraphExecutor` and `AgentRuntime` / `create_agent_runtime()` handle event_loop nodes automatically. No manual `node_registry` setup is needed. ```python -from framework.graph.executor import GraphExecutor, ExecutionResult -from framework.graph.event_loop_node import EventLoopNode, LoopConfig -from framework.runtime.event_bus import EventBus -from framework.runtime.core import Runtime # REQUIRED - executor calls runtime.start_run() +# Direct execution +from framework.graph.executor import GraphExecutor +from framework.runtime.core import Runtime -# 1. Build node_registry with EventLoopNode instances -event_bus = EventBus() -node_registry = {} -for node_spec in nodes: - if node_spec.node_type == "event_loop": - node_registry[node_spec.id] = EventLoopNode( - event_bus=event_bus, - judge=None, # implicit judge: accepts when output_keys are filled - config=LoopConfig( - max_iterations=50, - max_tool_calls_per_turn=15, - stall_detection_threshold=3, - max_history_tokens=32000, - ), - tool_executor=tool_executor, - ) - -# 2. Create Runtime for run tracking (GraphExecutor calls runtime.start_run()) storage_path = Path.home() / ".hive" / "my_agent" storage_path.mkdir(parents=True, exist_ok=True) runtime = Runtime(storage_path) -# 3. Create GraphExecutor WITH node_registry and runtime executor = GraphExecutor( - runtime=runtime, # NOT None - executor needs this for run tracking + runtime=runtime, llm=llm, tools=tools, tool_executor=tool_executor, - node_registry=node_registry, # EventLoopNode instances + storage_path=storage_path, ) - -# 4. Execute result = await executor.execute(graph=graph, goal=goal, input_data=input_data) ``` -**DO NOT use `AgentRuntime` or `create_agent_runtime()` for agents with `event_loop` nodes.** - **DO NOT pass `runtime=None` to `GraphExecutor`** — it will crash with `'NoneType' object has no attribute 'start_run'`. --- ## COMMON MISTAKES TO AVOID -1. **Using `AgentRuntime` with event_loop nodes** - `AgentRuntime` does not register EventLoopNodes. Use `GraphExecutor` directly with `node_registry` -2. **Passing `runtime=None` to GraphExecutor** - The executor calls `runtime.start_run()` internally. Always provide a `Runtime(storage_path)` instance -3. **Using tools that don't exist** - Always check `mcp__agent-builder__list_mcp_tools()` first -4. **Wrong entry_points format** - Must be `{"start": "node-id"}`, NOT a set or list -5. **Skipping validation** - Always validate nodes and graph before proceeding -6. **Not waiting for approval** - Always ask user before major steps -7. **Displaying this file** - Execute the steps, don't show documentation +1. **Using tools that don't exist** - Always check `mcp__agent-builder__list_mcp_tools()` first +2. **Wrong entry_points format** - Must be `{"start": "node-id"}`, NOT a set or list +3. **Skipping validation** - Always validate nodes and graph before proceeding +4. **Not waiting for approval** - Always ask user before major steps +5. **Displaying this file** - Execute the steps, don't show documentation +6. **Too many thin nodes** - Prefer fewer, richer nodes (4 nodes > 8 nodes) +7. **Missing STEP 1/STEP 2 in client-facing prompts** - Client-facing nodes need explicit phases to prevent premature set_output +8. **Forgetting nullable_output_keys** - Mark input_keys that only arrive on certain edges (e.g., feedback) as nullable on the receiving node +9. **Adding framework gating for LLM behavior** - Fix prompts or use judges, not ad-hoc code diff --git a/.claude/skills/building-agents-construction/examples/deep_research_agent/__init__.py b/.claude/skills/building-agents-construction/examples/deep_research_agent/__init__.py new file mode 100644 index 00000000..0ef7757b --- /dev/null +++ b/.claude/skills/building-agents-construction/examples/deep_research_agent/__init__.py @@ -0,0 +1,24 @@ +""" +Deep Research Agent - Interactive, rigorous research with TUI conversation. + +Research any topic through multi-source web search, quality evaluation, +and synthesis. Features client-facing TUI interaction at key checkpoints +for user guidance and iterative deepening. +""" + +from .agent import DeepResearchAgent, default_agent, goal, nodes, edges +from .config import RuntimeConfig, AgentMetadata, default_config, metadata + +__version__ = "1.0.0" + +__all__ = [ + "DeepResearchAgent", + "default_agent", + "goal", + "nodes", + "edges", + "RuntimeConfig", + "AgentMetadata", + "default_config", + "metadata", +] diff --git a/.claude/skills/building-agents-construction/examples/online_research_agent/__main__.py b/.claude/skills/building-agents-construction/examples/deep_research_agent/__main__.py similarity index 53% rename from .claude/skills/building-agents-construction/examples/online_research_agent/__main__.py rename to .claude/skills/building-agents-construction/examples/deep_research_agent/__main__.py index 8fa5985a..2e3429f1 100644 --- a/.claude/skills/building-agents-construction/examples/online_research_agent/__main__.py +++ b/.claude/skills/building-agents-construction/examples/deep_research_agent/__main__.py @@ -1,5 +1,5 @@ """ -CLI entry point for Online Research Agent. +CLI entry point for Deep Research Agent. Uses AgentRuntime for multi-entrypoint support with HITL pause/resume. """ @@ -10,7 +10,7 @@ import logging import sys import click -from .agent import default_agent, OnlineResearchAgent +from .agent import default_agent, DeepResearchAgent def setup_logging(verbose=False, debug=False): @@ -28,7 +28,7 @@ def setup_logging(verbose=False, debug=False): @click.group() @click.version_option(version="1.0.0") def cli(): - """Online Research Agent - Deep-dive research with narrative reports.""" + """Deep Research Agent - Interactive, rigorous research with TUI conversation.""" pass @@ -59,6 +59,83 @@ def run(topic, mock, quiet, verbose, debug): sys.exit(0 if result.success else 1) +@cli.command() +@click.option("--mock", is_flag=True, help="Run in mock mode") +@click.option("--verbose", "-v", is_flag=True, help="Show execution details") +@click.option("--debug", is_flag=True, help="Show debug logging") +def tui(mock, verbose, debug): + """Launch the TUI dashboard for interactive research.""" + setup_logging(verbose=verbose, debug=debug) + + try: + from framework.tui.app import AdenTUI + except ImportError: + click.echo("TUI requires the 'textual' package. Install with: pip install textual") + sys.exit(1) + + from pathlib import Path + + from framework.llm import LiteLLMProvider + from framework.runner.tool_registry import ToolRegistry + from framework.runtime.agent_runtime import create_agent_runtime + from framework.runtime.event_bus import EventBus + from framework.runtime.execution_stream import EntryPointSpec + + async def run_with_tui(): + agent = DeepResearchAgent() + + # Build graph and tools + agent._event_bus = EventBus() + agent._tool_registry = ToolRegistry() + + mcp_config_path = Path(__file__).parent / "mcp_servers.json" + if mcp_config_path.exists(): + agent._tool_registry.load_mcp_config(mcp_config_path) + + llm = None + if not mock: + llm = LiteLLMProvider( + model=agent.config.model, + api_key=agent.config.api_key, + api_base=agent.config.api_base, + ) + + tools = list(agent._tool_registry.get_tools().values()) + tool_executor = agent._tool_registry.get_executor() + graph = agent._build_graph() + + storage_path = Path.home() / ".hive" / "deep_research_agent" + storage_path.mkdir(parents=True, exist_ok=True) + + runtime = create_agent_runtime( + graph=graph, + goal=agent.goal, + storage_path=storage_path, + entry_points=[ + EntryPointSpec( + id="start", + name="Start Research", + entry_node="intake", + trigger_type="manual", + isolation_level="isolated", + ), + ], + llm=llm, + tools=tools, + tool_executor=tool_executor, + ) + + await runtime.start() + + try: + app = AdenTUI(runtime) + await app.run_async() + finally: + await runtime.stop() + + asyncio.run(run_with_tui()) + + @cli.command() @click.option("--json", "output_json", is_flag=True) def info(output_json): @@ -71,6 +148,7 @@ def info(output_json): click.echo(f"Version: {info_data['version']}") click.echo(f"Description: {info_data['description']}") click.echo(f"\nNodes: {', '.join(info_data['nodes'])}") + click.echo(f"Client-facing: {', '.join(info_data['client_facing_nodes'])}") click.echo(f"Entry: {info_data['entry_node']}") click.echo(f"Terminal: {', '.join(info_data['terminal_nodes'])}") @@ -81,6 +159,9 @@ def validate(): validation = default_agent.validate() if validation["valid"]: click.echo("Agent is valid") + if validation["warnings"]: + for warning in validation["warnings"]: + click.echo(f" WARNING: {warning}") else: click.echo("Agent has errors:") for error in validation["errors"]: @@ -91,7 +172,7 @@ def validate(): @cli.command() @click.option("--verbose", "-v", is_flag=True) def shell(verbose): - """Interactive research session.""" + """Interactive research session (CLI, no TUI).""" asyncio.run(_interactive_shell(verbose)) @@ -99,10 +180,10 @@ async def _interactive_shell(verbose=False): """Async interactive shell.""" setup_logging(verbose=verbose) - click.echo("=== Online Research Agent ===") + click.echo("=== Deep Research Agent ===") click.echo("Enter a topic to research (or 'quit' to exit):\n") - agent = OnlineResearchAgent() + agent = DeepResearchAgent() await agent.start() try: @@ -118,7 +199,7 @@ async def _interactive_shell(verbose=False): if not topic.strip(): continue - click.echo("\nResearching... (this may take a few minutes)\n") + click.echo("\nResearching...\n") result = await agent.trigger_and_wait("start", {"topic": topic}) @@ -128,16 +209,14 @@ async def _interactive_shell(verbose=False): if result.success: output = result.output - if "file_path" in output: - click.echo(f"\nReport saved to: {output['file_path']}\n") - if "final_report" in output: - click.echo("\n--- Report Preview ---\n") - preview = ( - output["final_report"][:500] + "..." - if len(output.get("final_report", "")) > 500 - else output.get("final_report", "") - ) - click.echo(preview) + if "report_content" in output: + click.echo("\n--- Report ---\n") + click.echo(output["report_content"]) + click.echo("\n") + if "references" in output: + click.echo("--- References ---\n") + for ref in output.get("references", []): + click.echo(f" [{ref.get('number', '?')}] {ref.get('title', '')} - {ref.get('url', '')}") click.echo("\n") else: click.echo(f"\nResearch failed: {result.error}\n") @@ -148,7 +227,6 @@ async def _interactive_shell(verbose=False): except Exception as e: click.echo(f"Error: {e}", err=True) import traceback - traceback.print_exc() finally: await agent.stop() diff --git a/.claude/skills/building-agents-construction/examples/online_research_agent/agent.py b/.claude/skills/building-agents-construction/examples/deep_research_agent/agent.py similarity index 59% rename from .claude/skills/building-agents-construction/examples/online_research_agent/agent.py rename to .claude/skills/building-agents-construction/examples/deep_research_agent/agent.py index fbc726e6..c0346e4b 100644 --- a/.claude/skills/building-agents-construction/examples/online_research_agent/agent.py +++ b/.claude/skills/building-agents-construction/examples/deep_research_agent/agent.py @@ -1,9 +1,8 @@ -"""Agent graph construction for Online Research Agent.""" +"""Agent graph construction for Deep Research Agent.""" from framework.graph import EdgeSpec, EdgeCondition, Goal, SuccessCriterion, Constraint from framework.graph.edge import GraphSpec from framework.graph.executor import ExecutionResult, GraphExecutor -from framework.graph.event_loop_node import EventLoopNode, LoopConfig from framework.runtime.event_bus import EventBus from framework.runtime.core import Runtime from framework.llm import LiteLLMProvider @@ -11,164 +10,132 @@ from framework.runner.tool_registry import ToolRegistry from .config import default_config, metadata from .nodes import ( - parse_query_node, - search_sources_node, - fetch_content_node, - evaluate_sources_node, - synthesize_findings_node, - write_report_node, - quality_check_node, - save_report_node, + intake_node, + research_node, + review_node, + report_node, ) # Goal definition goal = Goal( - id="comprehensive-online-research", - name="Comprehensive Online Research", - description="Research any topic by searching multiple sources, synthesizing information, and producing a well-structured narrative report with citations.", + id="rigorous-interactive-research", + name="Rigorous Interactive Research", + description=( + "Research any topic by searching diverse sources, analyzing findings, " + "and producing a cited report — with user checkpoints to guide direction." + ), success_criteria=[ SuccessCriterion( - id="source-coverage", - description="Query 10+ diverse sources", + id="source-diversity", + description="Use multiple diverse, authoritative sources", metric="source_count", - target=">=10", - weight=0.20, - ), - SuccessCriterion( - id="relevance", - description="All sources directly address the query", - metric="relevance_score", - target="90%", + target=">=5", weight=0.25, ), SuccessCriterion( - id="synthesis", - description="Synthesize findings into coherent narrative", - metric="coherence_score", - target="85%", - weight=0.25, - ), - SuccessCriterion( - id="citations", - description="Include citations for all claims", + id="citation-coverage", + description="Every factual claim in the report cites its source", metric="citation_coverage", target="100%", - weight=0.15, + weight=0.25, ), SuccessCriterion( - id="actionable", - description="Report answers the user's question", - metric="answer_completeness", + id="user-satisfaction", + description="User reviews findings before report generation", + metric="user_approval", + target="true", + weight=0.25, + ), + SuccessCriterion( + id="report-completeness", + description="Final report answers the original research questions", + metric="question_coverage", target="90%", - weight=0.15, + weight=0.25, ), ], constraints=[ Constraint( id="no-hallucination", - description="Only include information found in sources", + description="Only include information found in fetched sources", constraint_type="quality", category="accuracy", ), Constraint( id="source-attribution", - description="Every factual claim must cite its source", + description="Every claim must cite its source with a numbered reference", constraint_type="quality", category="accuracy", ), Constraint( - id="recency-preference", - description="Prefer recent sources when relevant", - constraint_type="quality", - category="relevance", - ), - Constraint( - id="no-paywalled", - description="Avoid sources that require payment to access", + id="user-checkpoint", + description="Present findings to the user before writing the final report", constraint_type="functional", - category="accessibility", + category="interaction", ), ], ) # Node list nodes = [ - parse_query_node, - search_sources_node, - fetch_content_node, - evaluate_sources_node, - synthesize_findings_node, - write_report_node, - quality_check_node, - save_report_node, + intake_node, + research_node, + review_node, + report_node, ] # Edge definitions edges = [ + # intake -> research EdgeSpec( - id="parse-to-search", - source="parse-query", - target="search-sources", + id="intake-to-research", + source="intake", + target="research", condition=EdgeCondition.ON_SUCCESS, priority=1, ), + # research -> review EdgeSpec( - id="search-to-fetch", - source="search-sources", - target="fetch-content", + id="research-to-review", + source="research", + target="review", condition=EdgeCondition.ON_SUCCESS, priority=1, ), + # review -> research (feedback loop) EdgeSpec( - id="fetch-to-evaluate", - source="fetch-content", - target="evaluate-sources", - condition=EdgeCondition.ON_SUCCESS, + id="review-to-research-feedback", + source="review", + target="research", + condition=EdgeCondition.CONDITIONAL, + condition_expr="needs_more_research == True", priority=1, ), + # review -> report (user satisfied) EdgeSpec( - id="evaluate-to-synthesize", - source="evaluate-sources", - target="synthesize-findings", - condition=EdgeCondition.ON_SUCCESS, - priority=1, - ), - EdgeSpec( - id="synthesize-to-write", - source="synthesize-findings", - target="write-report", - condition=EdgeCondition.ON_SUCCESS, - priority=1, - ), - EdgeSpec( - id="write-to-quality", - source="write-report", - target="quality-check", - condition=EdgeCondition.ON_SUCCESS, - priority=1, - ), - EdgeSpec( - id="quality-to-save", - source="quality-check", - target="save-report", - condition=EdgeCondition.ON_SUCCESS, - priority=1, + id="review-to-report", + source="review", + target="report", + condition=EdgeCondition.CONDITIONAL, + condition_expr="needs_more_research == False", + priority=2, ), ] # Graph configuration -entry_node = "parse-query" -entry_points = {"start": "parse-query"} +entry_node = "intake" +entry_points = {"start": "intake"} pause_nodes = [] -terminal_nodes = ["save-report"] +terminal_nodes = ["report"] -class OnlineResearchAgent: +class DeepResearchAgent: """ - Online Research Agent - Deep-dive research with narrative reports. + Deep Research Agent — 4-node pipeline with user checkpoints. - Uses GraphExecutor directly with EventLoopNode instances registered - in the node_registry for multi-turn tool execution. + Flow: intake -> research -> review -> report + ^ | + +-- feedback loop (if user wants more) """ def __init__(self, config=None): @@ -188,7 +155,7 @@ class OnlineResearchAgent: def _build_graph(self) -> GraphSpec: """Build the GraphSpec.""" return GraphSpec( - id="online-research-agent-graph", + id="deep-research-agent-graph", goal_id=self.goal.id, version="1.0.0", entry_node=self.entry_node, @@ -201,29 +168,11 @@ class OnlineResearchAgent: max_tokens=self.config.max_tokens, ) - def _build_node_registry(self, tool_executor=None) -> dict: - """Create EventLoopNode instances for all event_loop nodes.""" - registry = {} - for node_spec in self.nodes: - if node_spec.node_type == "event_loop": - registry[node_spec.id] = EventLoopNode( - event_bus=self._event_bus, - judge=None, # implicit judge: accept when output_keys are filled - config=LoopConfig( - max_iterations=50, - max_tool_calls_per_turn=15, - stall_detection_threshold=3, - max_history_tokens=32000, - ), - tool_executor=tool_executor, - ) - return registry - def _setup(self, mock_mode=False) -> GraphExecutor: """Set up the executor with all components.""" from pathlib import Path - storage_path = Path.home() / ".hive" / "online_research_agent" + storage_path = Path.home() / ".hive" / "deep_research_agent" storage_path.mkdir(parents=True, exist_ok=True) self._event_bus = EventBus() @@ -245,7 +194,6 @@ class OnlineResearchAgent: tools = list(self._tool_registry.get_tools().values()) self._graph = self._build_graph() - node_registry = self._build_node_registry(tool_executor=tool_executor) runtime = Runtime(storage_path) self._executor = GraphExecutor( @@ -253,7 +201,8 @@ class OnlineResearchAgent: llm=llm, tools=tools, tool_executor=tool_executor, - node_registry=node_registry, + event_bus=self._event_bus, + storage_path=storage_path, ) return self._executor @@ -317,7 +266,7 @@ class OnlineResearchAgent: "entry_points": self.entry_points, "pause_nodes": self.pause_nodes, "terminal_nodes": self.terminal_nodes, - "multi_entrypoint": True, + "client_facing_nodes": [n.id for n in self.nodes if n.client_facing], } def validate(self): @@ -339,10 +288,6 @@ class OnlineResearchAgent: if terminal not in node_ids: errors.append(f"Terminal node '{terminal}' not found") - for pause in self.pause_nodes: - if pause not in node_ids: - errors.append(f"Pause node '{pause}' not found") - for ep_id, node_id in self.entry_points.items(): if node_id not in node_ids: errors.append( @@ -357,4 +302,4 @@ class OnlineResearchAgent: # Create default instance -default_agent = OnlineResearchAgent() +default_agent = DeepResearchAgent() diff --git a/.claude/skills/building-agents-construction/examples/online_research_agent/config.py b/.claude/skills/building-agents-construction/examples/deep_research_agent/config.py similarity index 76% rename from .claude/skills/building-agents-construction/examples/online_research_agent/config.py rename to .claude/skills/building-agents-construction/examples/deep_research_agent/config.py index f9ded4d8..715bc492 100644 --- a/.claude/skills/building-agents-construction/examples/online_research_agent/config.py +++ b/.claude/skills/building-agents-construction/examples/deep_research_agent/config.py @@ -32,12 +32,15 @@ class RuntimeConfig: default_config = RuntimeConfig() -# Agent metadata @dataclass class AgentMetadata: - name: str = "Online Research Agent" + name: str = "Deep Research Agent" version: str = "1.0.0" - description: str = "Research any topic by searching multiple sources, synthesizing information, and producing a well-structured narrative report with citations." + description: str = ( + "Interactive research agent that rigorously investigates topics through " + "multi-source search, quality evaluation, and synthesis - with TUI conversation " + "at key checkpoints for user guidance and feedback." + ) metadata = AgentMetadata() diff --git a/.claude/skills/building-agents-construction/examples/online_research_agent/mcp_servers.json b/.claude/skills/building-agents-construction/examples/deep_research_agent/mcp_servers.json similarity index 100% rename from .claude/skills/building-agents-construction/examples/online_research_agent/mcp_servers.json rename to .claude/skills/building-agents-construction/examples/deep_research_agent/mcp_servers.json diff --git a/.claude/skills/building-agents-construction/examples/deep_research_agent/nodes/__init__.py b/.claude/skills/building-agents-construction/examples/deep_research_agent/nodes/__init__.py new file mode 100644 index 00000000..24c38914 --- /dev/null +++ b/.claude/skills/building-agents-construction/examples/deep_research_agent/nodes/__init__.py @@ -0,0 +1,147 @@ +"""Node definitions for Deep Research Agent.""" + +from framework.graph import NodeSpec + +# Node 1: Intake (client-facing) +# Brief conversation to clarify what the user wants researched. +intake_node = NodeSpec( + id="intake", + name="Research Intake", + description="Discuss the research topic with the user, clarify scope, and confirm direction", + node_type="event_loop", + client_facing=True, + input_keys=["topic"], + output_keys=["research_brief"], + system_prompt="""\ +You are a research intake specialist. The user wants to research a topic. +Have a brief conversation to clarify what they need. + +**STEP 1 — Read and respond (text only, NO tool calls):** +1. Read the topic provided +2. If it's vague, ask 1-2 clarifying questions (scope, angle, depth) +3. If it's already clear, confirm your understanding and ask the user to confirm + +Keep it short. Don't over-ask. + +**STEP 2 — After the user confirms, call set_output:** +- set_output("research_brief", "A clear paragraph describing exactly what to research, \ +what questions to answer, what scope to cover, and how deep to go.") +""", + tools=[], +) + +# Node 2: Research +# The workhorse — searches the web, fetches content, analyzes sources. +# One node with both tools avoids the context-passing overhead of 5 separate nodes. +research_node = NodeSpec( + id="research", + name="Research", + description="Search the web, fetch source content, and compile findings", + node_type="event_loop", + max_node_visits=3, + input_keys=["research_brief", "feedback"], + output_keys=["findings", "sources", "gaps"], + nullable_output_keys=["feedback"], + system_prompt="""\ +You are a research agent. Given a research brief, find and analyze sources. + +If feedback is provided, this is a follow-up round — focus on the gaps identified. + +Work in phases: +1. **Search**: Use web_search with 3-5 diverse queries covering different angles. + Prioritize authoritative sources (.edu, .gov, established publications). +2. **Fetch**: Use web_scrape on the most promising URLs (aim for 5-8 sources). + Skip URLs that fail. Extract the substantive content. +3. **Analyze**: Review what you've collected. Identify key findings, themes, + and any contradictions between sources. + +Important: +- Work in batches of 3-4 tool calls at a time to manage context +- After each batch, assess whether you have enough material +- Prefer quality over quantity — 5 good sources beat 15 thin ones +- Track which URL each finding comes from (you'll need citations later) + +When done, use set_output: +- set_output("findings", "Structured summary: key findings with source URLs for each claim. \ +Include themes, contradictions, and confidence levels.") +- set_output("sources", [{"url": "...", "title": "...", "summary": "..."}]) +- set_output("gaps", "What aspects of the research brief are NOT well-covered yet, if any.") +""", + tools=["web_search", "web_scrape", "load_data", "save_data", "list_data_files"], +) + +# Node 3: Review (client-facing) +# Shows the user what was found and asks whether to dig deeper or proceed. +review_node = NodeSpec( + id="review", + name="Review Findings", + description="Present findings to user and decide whether to research more or write the report", + node_type="event_loop", + client_facing=True, + max_node_visits=3, + input_keys=["findings", "sources", "gaps", "research_brief"], + output_keys=["needs_more_research", "feedback"], + system_prompt="""\ +Present the research findings to the user clearly and concisely. + +**STEP 1 — Present (your first message, text only, NO tool calls):** +1. **Summary** (2-3 sentences of what was found) +2. **Key Findings** (bulleted, with confidence levels) +3. **Sources Used** (count and quality assessment) +4. **Gaps** (what's still unclear or under-covered) + +End by asking: Are they satisfied, or do they want deeper research? \ +Should we proceed to writing the final report? + +**STEP 2 — After the user responds, call set_output:** +- set_output("needs_more_research", "true") — if they want more +- set_output("needs_more_research", "false") — if they're satisfied +- set_output("feedback", "What the user wants explored further, or empty string") +""", + tools=[], +) + +# Node 4: Report (client-facing) +# Writes the final report and presents it to the user. +report_node = NodeSpec( + id="report", + name="Write & Deliver Report", + description="Write a cited report from the findings and present it to the user", + node_type="event_loop", + client_facing=True, + input_keys=["findings", "sources", "research_brief"], + output_keys=["delivery_status"], + system_prompt="""\ +Write a comprehensive research report and present it to the user. + +**STEP 1 — Write and present the report (text only, NO tool calls):** + +Report structure: +1. **Executive Summary** (2-3 paragraphs) +2. **Findings** (organized by theme, with [n] citations) +3. **Analysis** (synthesis, implications, areas of debate) +4. **Conclusion** (key takeaways, confidence assessment) +5. **References** (numbered list of sources cited) + +Requirements: +- Every factual claim must cite its source with [n] notation +- Be objective — present multiple viewpoints where sources disagree +- Distinguish well-supported conclusions from speculation +- Answer the original research questions from the brief + +End by asking the user if they have questions or want to save the report. + +**STEP 2 — After the user responds:** +- Answer follow-up questions from the research material +- If they want to save, use write_to_file tool +- When the user is satisfied: set_output("delivery_status", "completed") +""", + tools=["write_to_file"], +) + +__all__ = [ + "intake_node", + "research_node", + "review_node", + "report_node", +] diff --git a/.claude/skills/building-agents-construction/examples/online_research_agent/README.md b/.claude/skills/building-agents-construction/examples/online_research_agent/README.md deleted file mode 100644 index a4f27b9e..00000000 --- a/.claude/skills/building-agents-construction/examples/online_research_agent/README.md +++ /dev/null @@ -1,80 +0,0 @@ -# Online Research Agent - -Deep-dive research agent that searches 10+ sources and produces comprehensive narrative reports with citations. - -## Features - -- Generates multiple search queries from a topic -- Searches and fetches 15+ web sources -- Evaluates and ranks sources by relevance -- Synthesizes findings into themes -- Writes narrative report with numbered citations -- Quality checks for uncited claims -- Saves report to local markdown file - -## Usage - -### CLI - -```bash -# Show agent info -python -m online_research_agent info - -# Validate structure -python -m online_research_agent validate - -# Run research on a topic -python -m online_research_agent run --topic "impact of AI on healthcare" - -# Interactive shell -python -m online_research_agent shell -``` - -### Python API - -```python -from online_research_agent import default_agent - -# Simple usage -result = await default_agent.run({"topic": "climate change solutions"}) - -# Check output -if result.success: - print(f"Report saved to: {result.output['file_path']}") - print(result.output['final_report']) -``` - -## Workflow - -``` -parse-query → search-sources → fetch-content → evaluate-sources - ↓ - write-report ← synthesize-findings - ↓ - quality-check → save-report -``` - -## Output - -Reports are saved to `./research_reports/` as markdown files with: - -1. Executive Summary -2. Introduction -3. Key Findings (by theme) -4. Analysis -5. Conclusion -6. References - -## Requirements - -- Python 3.11+ -- LLM provider API key (Groq, Cerebras, etc.) -- Internet access for web search/fetch - -## Configuration - -Edit `config.py` to change: - -- `model`: LLM model (default: groq/moonshotai/kimi-k2-instruct-0905) -- `temperature`: Generation temperature (default: 0.7) -- `max_tokens`: Max tokens per response (default: 16384) diff --git a/.claude/skills/building-agents-construction/examples/online_research_agent/__init__.py b/.claude/skills/building-agents-construction/examples/online_research_agent/__init__.py deleted file mode 100644 index 175bd280..00000000 --- a/.claude/skills/building-agents-construction/examples/online_research_agent/__init__.py +++ /dev/null @@ -1,23 +0,0 @@ -""" -Online Research Agent - Deep-dive research with narrative reports. - -Research any topic by searching multiple sources, synthesizing information, -and producing a well-structured narrative report with citations. -""" - -from .agent import OnlineResearchAgent, default_agent, goal, nodes, edges -from .config import RuntimeConfig, AgentMetadata, default_config, metadata - -__version__ = "1.0.0" - -__all__ = [ - "OnlineResearchAgent", - "default_agent", - "goal", - "nodes", - "edges", - "RuntimeConfig", - "AgentMetadata", - "default_config", - "metadata", -] diff --git a/.claude/skills/building-agents-construction/examples/online_research_agent/nodes/__init__.py b/.claude/skills/building-agents-construction/examples/online_research_agent/nodes/__init__.py deleted file mode 100644 index ee6c2a28..00000000 --- a/.claude/skills/building-agents-construction/examples/online_research_agent/nodes/__init__.py +++ /dev/null @@ -1,232 +0,0 @@ -"""Node definitions for Online Research Agent.""" - -from framework.graph import NodeSpec - -# Node 1: Parse Query -parse_query_node = NodeSpec( - id="parse-query", - name="Parse Query", - description="Analyze the research topic and generate 3-5 diverse search queries to cover different aspects", - node_type="event_loop", - input_keys=["topic"], - output_keys=["search_queries", "research_focus", "key_aspects"], - system_prompt="""\ -You are a research query strategist. Given a research topic, analyze it and generate search queries. - -Your task: -1. Understand the core research question -2. Identify 3-5 key aspects to investigate -3. Generate 3-5 diverse search queries that will find comprehensive information - -Use set_output to store each result: -- set_output("research_focus", "Brief statement of what we're researching") -- set_output("key_aspects", ["aspect1", "aspect2", "aspect3"]) -- set_output("search_queries", ["query 1", "query 2", "query 3", "query 4", "query 5"]) -""", - tools=[], -) - -# Node 2: Search Sources -search_sources_node = NodeSpec( - id="search-sources", - name="Search Sources", - description="Execute web searches using the generated queries to find 15+ source URLs", - node_type="event_loop", - input_keys=["search_queries", "research_focus"], - output_keys=["source_urls", "search_results_summary"], - system_prompt="""\ -You are a research assistant executing web searches. Use the web_search tool to find sources. - -Your task: -1. Execute each search query using web_search tool -2. Collect URLs from search results -3. Aim for 15+ diverse sources - -After searching, use set_output to store results: -- set_output("source_urls", ["url1", "url2", ...]) -- set_output("search_results_summary", "Brief summary of what was found") -""", - tools=["web_search"], -) - -# Node 3: Fetch Content -fetch_content_node = NodeSpec( - id="fetch-content", - name="Fetch Content", - description="Fetch and extract content from the discovered source URLs", - node_type="event_loop", - input_keys=["source_urls", "research_focus"], - output_keys=["fetched_sources", "fetch_errors"], - system_prompt="""\ -You are a content fetcher. Use web_scrape tool to retrieve content from URLs. - -Your task: -1. Fetch content from each source URL using web_scrape tool -2. Extract the main content relevant to the research focus -3. Track any URLs that failed to fetch - -After fetching, use set_output to store results: -- set_output("fetched_sources", [{"url": "...", "title": "...", "content": "..."}]) -- set_output("fetch_errors", ["url that failed", ...]) -""", - tools=["web_scrape"], -) - -# Node 4: Evaluate Sources -evaluate_sources_node = NodeSpec( - id="evaluate-sources", - name="Evaluate Sources", - description="Score sources for relevance and quality, filter to top 10", - node_type="event_loop", - input_keys=["fetched_sources", "research_focus", "key_aspects"], - output_keys=["ranked_sources", "source_analysis"], - system_prompt="""\ -You are a source evaluator. Assess each source for quality and relevance. - -Scoring criteria: -- Relevance to research focus (1-10) -- Source credibility (1-10) -- Information depth (1-10) -- Recency if relevant (1-10) - -Your task: -1. Score each source -2. Rank by combined score -3. Select top 10 sources -4. Note what each source uniquely contributes - -Use set_output to store results: -- set_output("ranked_sources", [{"url": "...", "title": "...", "score": 8.5}]) -- set_output("source_analysis", "Overview of source quality and coverage") -""", - tools=[], -) - -# Node 5: Synthesize Findings -synthesize_findings_node = NodeSpec( - id="synthesize-findings", - name="Synthesize Findings", - description="Extract key facts from sources and identify common themes", - node_type="event_loop", - input_keys=["ranked_sources", "research_focus", "key_aspects"], - output_keys=["key_findings", "themes", "source_citations"], - system_prompt="""\ -You are a research synthesizer. Analyze multiple sources to extract insights. - -Your task: -1. Identify key facts from each source -2. Find common themes across sources -3. Note contradictions or debates -4. Build a citation map (fact -> source URL) - -Use set_output to store each result: -- set_output("key_findings", [{"finding": "...", "sources": ["url1"], "confidence": "high"}]) -- set_output("themes", [{"theme": "...", "description": "...", "supporting_sources": [...]}]) -- set_output("source_citations", {"fact or claim": ["url1", "url2"]}) -""", - tools=[], -) - -# Node 6: Write Report -write_report_node = NodeSpec( - id="write-report", - name="Write Report", - description="Generate a narrative report with proper citations", - node_type="event_loop", - input_keys=[ - "key_findings", - "themes", - "source_citations", - "research_focus", - "ranked_sources", - ], - output_keys=["report_content", "references"], - system_prompt="""\ -You are a research report writer. Create a well-structured narrative report. - -Report structure: -1. Executive Summary (2-3 paragraphs) -2. Introduction (context and scope) -3. Key Findings (organized by theme) -4. Analysis (synthesis and implications) -5. Conclusion -6. References (numbered list of all sources) - -Citation format: Use numbered citations like [1], [2] that correspond to the References section. - -IMPORTANT: -- Every factual claim MUST have a citation -- Write in clear, professional prose -- Be objective and balanced -- Highlight areas of consensus and debate - -Use set_output to store results: -- set_output("report_content", "Full markdown report text with citations...") -- set_output("references", [{"number": 1, "url": "...", "title": "..."}]) -""", - tools=[], -) - -# Node 7: Quality Check -quality_check_node = NodeSpec( - id="quality-check", - name="Quality Check", - description="Verify all claims have citations and report is coherent", - node_type="event_loop", - input_keys=["report_content", "references", "source_citations"], - output_keys=["quality_score", "issues", "final_report"], - system_prompt="""\ -You are a quality assurance reviewer. Check the research report for issues. - -Check for: -1. Uncited claims (factual statements without [n] citation) -2. Broken citations (references to non-existent numbers) -3. Coherence (logical flow between sections) -4. Completeness (all key aspects covered) -5. Accuracy (claims match source content) - -If issues found, fix them in the final report. - -Use set_output to store results: -- set_output("quality_score", 0.95) -- set_output("issues", [{"type": "uncited_claim", "location": "...", "fixed": true}]) -- set_output("final_report", "Corrected full report with all issues fixed...") -""", - tools=[], -) - -# Node 8: Save Report -save_report_node = NodeSpec( - id="save-report", - name="Save Report", - description="Write the final report to a local markdown file", - node_type="event_loop", - input_keys=["final_report", "references", "research_focus"], - output_keys=["file_path", "save_status"], - system_prompt="""\ -You are a file manager. Save the research report to disk. - -Your task: -1. Generate a filename from the research focus (slugified, with date) -2. Use the write_to_file tool to save the report as markdown -3. Save to the ./research_reports/ directory - -Filename format: research_YYYY-MM-DD_topic-slug.md - -Use set_output to store results: -- set_output("file_path", "research_reports/research_2026-01-23_topic-name.md") -- set_output("save_status", "success") -""", - tools=["write_to_file"], -) - -__all__ = [ - "parse_query_node", - "search_sources_node", - "fetch_content_node", - "evaluate_sources_node", - "synthesize_findings_node", - "write_report_node", - "quality_check_node", - "save_report_node", -] diff --git a/.claude/skills/building-agents-core/SKILL.md b/.claude/skills/building-agents-core/SKILL.md index 2ff9b632..db8c825a 100644 --- a/.claude/skills/building-agents-core/SKILL.md +++ b/.claude/skills/building-agents-core/SKILL.md @@ -158,6 +158,43 @@ intake_node = NodeSpec( > **Legacy Note:** The old `pause_nodes` / `entry_points` pattern still works but `client_facing=True` is preferred for new agents. +**STEP 1 / STEP 2 Prompt Pattern:** For client-facing nodes, structure the system prompt with two explicit phases: + +```python +system_prompt="""\ +**STEP 1 — Respond to the user (text only, NO tool calls):** +[Present information, ask questions, etc.] + +**STEP 2 — After the user responds, call set_output:** +[Call set_output with the structured outputs] +""" +``` + +This prevents the LLM from calling `set_output` prematurely before the user has had a chance to respond. + +### Node Design: Fewer, Richer Nodes + +Prefer fewer nodes that do more work over many thin single-purpose nodes: + +- **Bad**: 8 thin nodes (parse query → search → fetch → evaluate → synthesize → write → check → save) +- **Good**: 4 rich nodes (intake → research → review → report) + +Why: Each node boundary requires serializing outputs and passing context. Fewer nodes means the LLM retains full context of its work within the node. A research node that searches, fetches, and analyzes keeps all the source material in its conversation history. + +### nullable_output_keys for Cross-Edge Inputs + +When a node receives inputs that only arrive on certain edges (e.g., `feedback` only comes from a review → research feedback loop, not from intake → research), mark those keys as `nullable_output_keys`: + +```python +research_node = NodeSpec( + id="research", + input_keys=["research_brief", "feedback"], + nullable_output_keys=["feedback"], # Not present on first visit + max_node_visits=3, + ... +) +``` + ## Event Loop Architecture Concepts ### How EventLoopNode Works @@ -169,40 +206,30 @@ An event loop node runs a multi-turn loop: 4. Judge evaluates: ACCEPT (exit loop), RETRY (loop again), or ESCALATE 5. Repeat until judge ACCEPTs or max_iterations reached -### CRITICAL: EventLoopNode Runtime Requirements +### EventLoopNode Runtime -EventLoopNodes are **not auto-created** by the graph executor. They must be explicitly instantiated and registered in a `node_registry` dict before execution. - -**Required components:** -1. **`EventLoopNode` instances** — One per event_loop NodeSpec, registered in `node_registry` -2. **`Runtime` instance** — `GraphExecutor` calls `runtime.start_run()` internally. Passing `None` crashes the executor -3. **`GraphExecutor` (not `AgentRuntime`)** — `AgentRuntime`/`create_agent_runtime()` does NOT pass `node_registry` to the internal `GraphExecutor`, so all event_loop nodes fail with "not found in registry" +EventLoopNodes are **auto-created** by `GraphExecutor` at runtime. You do NOT need to manually register them. Both `GraphExecutor` (direct) and `AgentRuntime` / `create_agent_runtime()` handle event_loop nodes automatically. ```python +# Direct execution — executor auto-creates EventLoopNodes from framework.graph.executor import GraphExecutor -from framework.graph.event_loop_node import EventLoopNode, LoopConfig -from framework.runtime.event_bus import EventBus from framework.runtime.core import Runtime -# Build node_registry -event_bus = EventBus() -node_registry = {} -for node_spec in nodes: - if node_spec.node_type == "event_loop": - node_registry[node_spec.id] = EventLoopNode( - event_bus=event_bus, - config=LoopConfig(max_iterations=50, max_tool_calls_per_turn=15), - tool_executor=tool_executor, - ) - -# Create executor with Runtime and node_registry runtime = Runtime(storage_path) executor = GraphExecutor( runtime=runtime, llm=llm, tools=tools, tool_executor=tool_executor, - node_registry=node_registry, + storage_path=storage_path, +) +result = await executor.execute(graph=graph, goal=goal, input_data=input_data) + +# TUI execution — AgentRuntime also works +from framework.runtime.agent_runtime import create_agent_runtime +runtime = create_agent_runtime( + graph=graph, goal=goal, storage_path=storage_path, + entry_points=[...], llm=llm, tools=tools, tool_executor=tool_executor, ) ``` @@ -210,8 +237,12 @@ executor = GraphExecutor( Nodes produce structured outputs by calling `set_output(key, value)` — a synthetic tool injected by the framework. When the LLM calls `set_output`, the value is stored in the output accumulator and made available to downstream nodes via shared memory. +`set_output` is NOT a real tool — it is excluded from `real_tool_results`. For client-facing nodes, this means a turn where the LLM only calls `set_output` (no other tools) is treated as a conversational boundary and will block for user input. + ### JudgeProtocol +**The judge is the SOLE mechanism for acceptance decisions.** Do not add ad-hoc framework gating, output rollback, or premature rejection logic. If the LLM calls `set_output` too early, fix it with better prompts or a custom judge — not framework-level guards. + The judge controls when a node's loop exits: - **Implicit judge** (default, no judge configured): ACCEPTs when the LLM finishes with no tool calls and all required output keys are set - **SchemaJudge**: Validates outputs against a Pydantic model @@ -225,6 +256,23 @@ Controls loop behavior: - `stall_detection_threshold` (default 3) — detects repeated identical responses - `max_history_tokens` (default 32000) — triggers conversation compaction +### Data Tools (Spillover Management) + +When tool results exceed the context window, the framework automatically saves them to a spillover directory and truncates with a hint. Nodes that produce or consume large data should include the data tools: + +- `save_data(filename, data, data_dir)` — Write data to a file in the data directory +- `load_data(filename, data_dir, offset=0, limit=50)` — Read data with line-based pagination +- `list_data_files(data_dir)` — List available data files + +These are real MCP tools (not synthetic). Add them to nodes that handle large tool results: + +```python +research_node = NodeSpec( + ... + tools=["web_search", "web_scrape", "load_data", "save_data", "list_data_files"], +) +``` + ### Fan-Out / Fan-In Multiple ON_SUCCESS edges from the same source create parallel execution. All branches run concurrently via `asyncio.gather()`. Parallel event_loop nodes must have disjoint `output_keys`. diff --git a/.claude/skills/building-agents-patterns/SKILL.md b/.claude/skills/building-agents-patterns/SKILL.md index 9fd553dc..fa9b3403 100644 --- a/.claude/skills/building-agents-patterns/SKILL.md +++ b/.claude/skills/building-agents-patterns/SKILL.md @@ -61,28 +61,38 @@ For agents needing multi-turn conversations with users, use `client_facing=True` A client-facing node streams LLM output to the user and blocks for user input between conversational turns. This replaces the old pause/resume pattern. ```python -# Client-facing node blocks for user input +# Client-facing node with STEP 1/STEP 2 prompt pattern intake_node = NodeSpec( id="intake", name="Intake", description="Gather requirements from the user", node_type="event_loop", client_facing=True, - input_keys=[], - output_keys=["repo_url", "project_url"], - system_prompt="You are the intake agent. Ask the user for their repo URL and project URL. When you have both, call set_output for each.", + input_keys=["topic"], + output_keys=["research_brief"], + system_prompt="""\ +You are an intake specialist. + +**STEP 1 — Read and respond (text only, NO tool calls):** +1. Read the topic provided +2. If it's vague, ask 1-2 clarifying questions +3. If it's clear, confirm your understanding + +**STEP 2 — After the user confirms, call set_output:** +- set_output("research_brief", "Clear description of what to research") +""", ) # Internal node runs without user interaction -scanner_node = NodeSpec( - id="scanner", - name="Scanner", - description="Scan the repository", +research_node = NodeSpec( + id="research", + name="Research", + description="Search and analyze sources", node_type="event_loop", - input_keys=["repo_url"], - output_keys=["scan_results"], - system_prompt="Scan the repository at {repo_url}...", - tools=["scan_github_repo"], + input_keys=["research_brief"], + output_keys=["findings", "sources"], + system_prompt="Research the topic using web_search and web_scrape...", + tools=["web_search", "web_scrape", "load_data", "save_data"], ) ``` @@ -91,6 +101,9 @@ scanner_node = NodeSpec( - User input is injected via `node.inject_event(text)` - When the LLM calls `set_output` to produce structured outputs, the judge evaluates and ACCEPTs - Internal nodes (non-client-facing) run their entire loop without blocking +- `set_output` is a synthetic tool — a turn with only `set_output` calls (no real tools) triggers user input blocking + +**STEP 1/STEP 2 pattern:** Always structure client-facing prompts with explicit phases. STEP 1 is text-only conversation. STEP 2 calls `set_output` after user confirmation. This prevents the LLM from calling `set_output` prematurely before the user responds. ### When to Use client_facing @@ -160,6 +173,12 @@ EdgeSpec( ## Judge Patterns +**Core Principle: The judge is the SOLE mechanism for acceptance decisions.** Never add ad-hoc framework gating to compensate for LLM behavior. If the LLM calls `set_output` prematurely, fix the system prompt or use a custom judge. Anti-patterns to avoid: +- Output rollback logic +- `_user_has_responded` flags +- Premature set_output rejection +- Interaction protocol injection into system prompts + Judges control when an event_loop node's loop exits. Choose based on validation needs. ### Implicit Judge (Default) @@ -241,15 +260,34 @@ EventLoopNode automatically manages context window usage with tiered compaction: ### Spillover Pattern -For large tool results, use `save_data()` to write to disk and pass the filename through `set_output`. This keeps the LLM context window small. +The framework automatically truncates large tool results and saves full content to a spillover directory. The LLM receives a truncation message with instructions to use `load_data` to read the full result. -``` -LLM calls save_data(filename, large_data) → file written to spillover/ -LLM calls set_output("results_file", filename) → filename stored in output -Downstream node calls load_data(filename) → reads from spillover/ +For explicit data management, use the data tools (real MCP tools, not synthetic): + +```python +# save_data, load_data, list_data_files are real MCP tools +# Each takes a data_dir parameter since the MCP server is shared + +# Saving large results +save_data(filename="sources.json", data=large_json_string, data_dir="/path/to/spillover") + +# Reading with pagination (line-based offset/limit) +load_data(filename="sources.json", data_dir="/path/to/spillover", offset=0, limit=50) + +# Listing available files +list_data_files(data_dir="/path/to/spillover") ``` -The `load_data()` tool supports `offset` and `limit` parameters for paginated reading of large files. +Add data tools to nodes that handle large tool results: + +```python +research_node = NodeSpec( + ... + tools=["web_search", "web_scrape", "load_data", "save_data", "list_data_files"], +) +``` + +The `data_dir` is passed by the framework (from the node's spillover directory). The LLM sees `data_dir` in truncation messages and uses it when calling `load_data`. ## Anti-Patterns @@ -259,6 +297,29 @@ The `load_data()` tool supports `offset` and `limit` parameters for paginated re - **Don't hide code in session** — Write to files as components are approved - **Don't wait to write files** — Agent visible from first step - **Don't batch everything** — Write incrementally, one component at a time +- **Don't create too many thin nodes** — Prefer fewer, richer nodes (see below) +- **Don't add framework gating for LLM behavior** — Fix prompts or use judges instead + +### Fewer, Richer Nodes + +A common mistake is splitting work into too many small single-purpose nodes. Each node boundary requires serializing outputs, losing in-context information, and adding edge complexity. + +| Bad (8 thin nodes) | Good (4 rich nodes) | +|---------------------|---------------------| +| parse-query | intake (client-facing) | +| search-sources | research (search + fetch + analyze) | +| fetch-content | review (client-facing) | +| evaluate-sources | report (write + deliver) | +| synthesize-findings | | +| write-report | | +| quality-check | | +| save-report | | + +**Why fewer nodes are better:** +- The LLM retains full context of its work within a single node +- A research node that searches, fetches, and analyzes keeps all source material in its conversation history +- Fewer edges means simpler graph and fewer failure points +- Data tools (`save_data`/`load_data`) handle context window limits within a single node ### MCP Tools - Correct Usage diff --git a/core/framework/graph/event_loop_node.py b/core/framework/graph/event_loop_node.py index 84729be8..f1df2a3d 100644 --- a/core/framework/graph/event_loop_node.py +++ b/core/framework/graph/event_loop_node.py @@ -144,19 +144,19 @@ class EventLoopNode(NodeProtocol): 1. Try to restore from durable state (crash recovery) 2. If no prior state, init from NodeSpec.system_prompt + input_keys 3. Loop: drain injection queue -> stream LLM -> execute tools - -> if client_facing + no tools: block for user input (inject_event) - -> if not client_facing or tools present: judge evaluates + -> if client_facing + no real tools: block for user input + -> judge evaluates (acceptance criteria) (each add_* and set_output writes through to store immediately) 4. Publish events to EventBus at each stage 5. Write cursor after each iteration 6. Terminate when judge returns ACCEPT, shutdown signaled, or max iterations 7. Build output dict from OutputAccumulator - Client-facing blocking: When ``client_facing=True`` and the LLM produces - text without tool calls (a natural conversational turn), the node blocks - via ``_await_user_input()`` until ``inject_event()`` or ``signal_shutdown()`` - is called. This separates blocking (node concern) from output evaluation - (judge concern). + Client-facing blocking: When ``client_facing=True`` and the LLM finishes + without real tool calls (stop_reason != tool_call), the node blocks via + ``_await_user_input()`` until ``inject_event()`` or ``signal_shutdown()`` + is called. After user input, the judge evaluates — the judge is the + sole mechanism for acceptance decisions. Always returns NodeResult with retryable=False semantics. The executor must NOT retry event loop nodes -- retry is handled internally by the @@ -212,8 +212,10 @@ class EventLoopNode(NodeProtocol): # 2. Restore or create new conversation + accumulator conversation, accumulator, start_iteration = await self._restore(ctx) if conversation is None: + system_prompt = ctx.node_spec.system_prompt or "" + conversation = NodeConversation( - system_prompt=ctx.node_spec.system_prompt or "", + system_prompt=system_prompt, max_history_tokens=self._config.max_history_tokens, output_keys=ctx.node_spec.output_keys or None, store=self._conversation_store, @@ -276,15 +278,20 @@ class EventLoopNode(NodeProtocol): iteration, len(conversation.messages), ) - assistant_text, tool_results_list, turn_tokens = await self._run_single_turn( - ctx, conversation, tools, iteration, accumulator - ) + ( + assistant_text, + real_tool_results, + outputs_set, + turn_tokens, + ) = await self._run_single_turn(ctx, conversation, tools, iteration, accumulator) logger.info( - "[%s] iter=%d: LLM done — text=%d chars, tool_calls=%d, tokens=%s, accumulator=%s", + "[%s] iter=%d: LLM done — text=%d chars, real_tools=%d, " + "outputs_set=%s, tokens=%s, accumulator=%s", node_id, iteration, len(assistant_text), - len(tool_results_list), + len(real_tool_results), + outputs_set or "[]", turn_tokens, {k: ("set" if v is not None else "None") for k, v in accumulator.to_dict().items()}, ) @@ -300,6 +307,31 @@ class EventLoopNode(NodeProtocol): if conversation.needs_compaction(): await self._compact_tiered(ctx, conversation, accumulator) + # 6e'''. Empty response guard — if the LLM returned nothing + # (no text, no real tools, no set_output) and all required + # outputs are already set, accept immediately. This prevents + # wasted iterations when the LLM has genuinely finished its + # work (e.g. after calling set_output in a previous turn). + truly_empty = not assistant_text and not real_tool_results and not outputs_set + if truly_empty and accumulator is not None: + missing = self._get_missing_output_keys( + accumulator, ctx.node_spec.output_keys, ctx.node_spec.nullable_output_keys + ) + if not missing: + logger.info( + "[%s] iter=%d: empty response but all outputs set — accepting", + node_id, + iteration, + ) + await self._publish_loop_completed(stream_id, node_id, iteration + 1) + latency_ms = int((time.time() - start_time) * 1000) + return NodeResult( + success=True, + output=accumulator.to_dict(), + tokens_used=total_input_tokens + total_output_tokens, + latency_ms=latency_ms, + ) + # 6f. Stall detection recent_responses.append(assistant_text) if len(recent_responses) > self._config.stall_detection_threshold: @@ -321,18 +353,17 @@ class EventLoopNode(NodeProtocol): # 6g. Write cursor checkpoint await self._write_cursor(ctx, conversation, accumulator, iteration) - # 6h. Client-facing input wait - logger.info( - "[%s] iter=%d: 6h check — client_facing=%s, tool_results=%d", - node_id, - iteration, - ctx.node_spec.client_facing, - len(tool_results_list), - ) - if ctx.node_spec.client_facing and not tool_results_list: - # LLM finished speaking (no tool calls) on a client-facing node. - # This is a conversational turn boundary: block for user input - # instead of running the judge. + # 6h. Client-facing input blocking + # + # For client_facing nodes, block for user input whenever the + # LLM finishes without making real tool calls (i.e. the LLM's + # stop_reason is not tool_call). set_output is separated from + # real tools by _run_single_turn, so this correctly treats + # set_output-only turns as conversational boundaries. + # + # After user input, always fall through to judge evaluation + # (6i). The judge handles all acceptance decisions. + if ctx.node_spec.client_facing and not real_tool_results: if self._shutdown: await self._publish_loop_completed(stream_id, node_id, iteration + 1) latency_ms = int((time.time() - start_time) * 1000) @@ -347,7 +378,6 @@ class EventLoopNode(NodeProtocol): got_input = await self._await_user_input(ctx) logger.info("[%s] iter=%d: unblocked, got_input=%s", node_id, iteration, got_input) if not got_input: - # Shutdown signaled during wait await self._publish_loop_completed(stream_id, node_id, iteration + 1) latency_ms = int((time.time() - start_time) * 1000) return NodeResult( @@ -357,46 +387,13 @@ class EventLoopNode(NodeProtocol): latency_ms=latency_ms, ) - # Clear stall detection — user input resets the conversation recent_responses.clear() - - # For nodes with an explicit judge, fall through to judge - # evaluation so the LLM gets structured feedback about missing - # outputs (e.g. "Missing output keys: [...]"). Without this, - # the LLM may generate text like "Ready to proceed!" without - # ever calling set_output, and the judge feedback never reaches it. - # - # For nodes without a judge (HITL review/approval with all- - # nullable keys), keep conversing UNLESS the LLM has already - # set an output — in that case fall through to the implicit - # judge which will ACCEPT and terminate the node. - if self._judge is None: - has_outputs = accumulator and any( - v is not None for v in accumulator.to_dict().values() - ) - if not has_outputs: - logger.info( - "[%s] iter=%d: no judge, no outputs, continuing", - node_id, - iteration, - ) - continue - logger.info( - "[%s] iter=%d: no judge, outputs set — implicit judge", - node_id, - iteration, - ) - else: - logger.info( - "[%s] iter=%d: has judge, falling through to 6i", - node_id, - iteration, - ) + # Fall through to judge evaluation (6i) # 6i. Judge evaluation should_judge = ( (iteration + 1) % self._config.judge_every_n_turns == 0 - or not tool_results_list # no tool calls = natural stop + or not real_tool_results # no real tool calls = natural stop ) logger.info("[%s] iter=%d: 6i should_judge=%s", node_id, iteration, should_judge) @@ -406,7 +403,7 @@ class EventLoopNode(NodeProtocol): conversation, accumulator, assistant_text, - tool_results_list, + real_tool_results, iteration, ) fb_preview = (verdict.feedback or "")[:200] @@ -526,16 +523,24 @@ class EventLoopNode(NodeProtocol): tools: list[Tool], iteration: int, accumulator: OutputAccumulator, - ) -> tuple[str, list[dict], dict[str, int]]: + ) -> tuple[str, list[dict], list[str], dict[str, int]]: """Run a single LLM turn with streaming and tool execution. - Returns (assistant_text, tool_results, token_counts). + Returns (assistant_text, real_tool_results, outputs_set, token_counts). + + ``real_tool_results`` contains only results from actual tools (web_search, + etc.), NOT from the synthetic ``set_output`` tool. ``outputs_set`` lists + the output keys written via ``set_output`` during this turn. This + separation lets the caller treat set_output as a framework concern + rather than a tool-execution concern. """ stream_id = ctx.node_id node_id = ctx.node_id token_counts: dict[str, int] = {"input": 0, "output": 0} tool_call_count = 0 final_text = "" + # Track output keys set via set_output across all inner iterations + outputs_set_this_turn: list[str] = [] # Inner tool loop: stream may produce tool calls requiring re-invocation while True: @@ -606,10 +611,10 @@ class EventLoopNode(NodeProtocol): # If no tool calls, turn is complete if not tool_calls: - return final_text, [], token_counts + return final_text, [], outputs_set_this_turn, token_counts - # Execute tool calls - tool_results: list[dict] = [] + # Execute tool calls — separate real tools from set_output + real_tool_results: list[dict] = [] limit_hit = False executed_in_batch = 0 for tc in tool_calls: @@ -624,21 +629,21 @@ class EventLoopNode(NodeProtocol): stream_id, node_id, tc.tool_use_id, tc.tool_name, tc.tool_input ) - # Handle set_output synthetic tool logger.info( "[%s] tool_call: %s(%s)", node_id, tc.tool_name, json.dumps(tc.tool_input)[:200], ) + if tc.tool_name == "set_output": + # --- Framework-level set_output handling --- result = self._handle_set_output(tc.tool_input, ctx.node_spec.output_keys) result = ToolResult( tool_use_id=tc.tool_use_id, content=result.content, is_error=result.is_error, ) - # Async write-through for set_output if not result.is_error: value = tc.tool_input["value"] # Parse JSON strings into native types so downstream @@ -652,26 +657,27 @@ class EventLoopNode(NodeProtocol): except (json.JSONDecodeError, TypeError): pass await accumulator.set(tc.tool_input["key"], value) + outputs_set_this_turn.append(tc.tool_input["key"]) else: - # Execute real tool + # --- Real tool execution --- result = await self._execute_tool(tc) - # Truncate large results to prevent context blowup result = self._truncate_tool_result(result, tc.tool_name) + real_tool_results.append( + { + "tool_use_id": tc.tool_use_id, + "tool_name": tc.tool_name, + "content": result.content, + "is_error": result.is_error, + } + ) - # Record tool result in conversation (write-through) + # Record tool result in conversation (both real and set_output + # go into the conversation for LLM context continuity) await conversation.add_tool_result( tool_use_id=tc.tool_use_id, content=result.content, is_error=result.is_error, ) - tool_results.append( - { - "tool_use_id": tc.tool_use_id, - "tool_name": tc.tool_name, - "content": result.content, - "is_error": result.is_error, - } - ) # Publish tool call completed await self._publish_tool_completed( @@ -708,7 +714,9 @@ class EventLoopNode(NodeProtocol): content=discard_msg, is_error=True, ) - tool_results.append( + # Discarded calls go into real_tool_results so the + # caller sees they were attempted (for judge context). + real_tool_results.append( { "tool_use_id": tc.tool_use_id, "tool_name": tc.tool_name, @@ -716,9 +724,24 @@ class EventLoopNode(NodeProtocol): "is_error": True, } ) + # Prune old tool results NOW to prevent context bloat on the + # next turn. The char-based token estimator underestimates + # actual API tokens, so the standard compaction check in the + # outer loop may not trigger in time. + protect = max(2000, self._config.max_history_tokens // 12) + pruned = await conversation.prune_old_tool_results( + protect_tokens=protect, + min_prune_tokens=max(1000, protect // 3), + ) + if pruned > 0: + logger.info( + "Post-limit pruning: cleared %d old tool results (budget: %d)", + pruned, + self._config.max_history_tokens, + ) # Limit hit — return from this turn so the judge can # evaluate instead of looping back for another stream. - return final_text, tool_results, token_counts + return final_text, real_tool_results, outputs_set_this_turn, token_counts # --- Mid-turn pruning: prevent context blowup within a single turn --- if conversation.usage_ratio() >= 0.6: @@ -1025,7 +1048,8 @@ class EventLoopNode(NodeProtocol): truncated = ( f"[Result from {tool_name}: {len(result.content)} chars — " f"too large for context, saved to '{filename}'. " - f"Use load_data('{filename}') to read the full result.]\n\n" + f"Use load_data(filename='{filename}', data_dir='{spill_dir}') " + f"to read the full result.]\n\n" f"Preview:\n{preview}…" ) logger.info( @@ -1244,9 +1268,11 @@ class EventLoopNode(NodeProtocol): # 5. Spillover files hint if self._config.spillover_dir: + spill = self._config.spillover_dir parts.append( "NOTE: Large tool results were saved to files. " - "Use load_data('') to read them." + f"Use load_data(filename='', data_dir='{spill}') " + "to read them." ) # 6. Tool call history (prevent re-calling tools) diff --git a/core/framework/graph/executor.py b/core/framework/graph/executor.py index f1e972da..68710d06 100644 --- a/core/framework/graph/executor.py +++ b/core/framework/graph/executor.py @@ -14,6 +14,7 @@ import logging import warnings from collections.abc import Callable from dataclasses import dataclass, field +from pathlib import Path from typing import Any from framework.graph.edge import EdgeCondition, EdgeSpec, GraphSpec @@ -130,6 +131,7 @@ class GraphExecutor: parallel_config: ParallelExecutionConfig | None = None, event_bus: Any | None = None, stream_id: str = "", + storage_path: str | Path | None = None, ): """ Initialize the executor. @@ -146,6 +148,7 @@ class GraphExecutor: parallel_config: Configuration for parallel execution behavior event_bus: Optional event bus for emitting node lifecycle events stream_id: Stream ID for event correlation + storage_path: Optional base path for conversation persistence """ self.runtime = runtime self.llm = llm @@ -157,6 +160,7 @@ class GraphExecutor: self.logger = logging.getLogger(__name__) self._event_bus = event_bus self._stream_id = stream_id + self._storage_path = Path(storage_path) if storage_path else None # Initialize output cleaner self.cleansing_config = cleansing_config or CleansingConfig() @@ -380,8 +384,16 @@ class GraphExecutor: ) if result.success: - # Validate output before accepting it - if result.output and node_spec.output_keys: + # Validate output before accepting it. + # Skip for event_loop nodes — their judge system is + # the sole acceptance mechanism (see WP-8). Empty + # strings and other flexible outputs are legitimate + # for LLM-driven nodes that already passed the judge. + if ( + result.output + and node_spec.output_keys + and node_spec.node_type != "event_loop" + ): validation = self.validator.validate_all( output=result.output, expected_keys=node_spec.output_keys, @@ -799,11 +811,43 @@ class GraphExecutor: ) if node_spec.node_type == "event_loop": - # Event loop nodes must be pre-registered (like function nodes) - raise RuntimeError( - f"EventLoopNode '{node_spec.id}' not found in registry. " - "Register it with executor.register_node() before execution." + # Auto-create EventLoopNode with sensible defaults. + # Custom configs can still be pre-registered via node_registry. + from framework.graph.event_loop_node import EventLoopNode, LoopConfig + + # Create a FileConversationStore if a storage path is available + conv_store = None + if self._storage_path: + from framework.storage.conversation_store import FileConversationStore + + store_path = self._storage_path / "conversations" / node_spec.id + conv_store = FileConversationStore(base_path=store_path) + + # Auto-configure spillover directory for large tool results. + # When a tool result exceeds max_tool_result_chars, the full + # content is written to spillover_dir and the agent gets a + # truncated preview with instructions to use load_data(). + spillover = None + if self._storage_path: + spillover = str(self._storage_path / "data") + + node = EventLoopNode( + event_bus=self._event_bus, + judge=None, # implicit judge: accept when output_keys are filled + config=LoopConfig( + max_iterations=100 if node_spec.client_facing else 50, + max_tool_calls_per_turn=10, + stall_detection_threshold=3, + max_history_tokens=32000, + max_tool_result_chars=3_000, + spillover_dir=spillover, + ), + tool_executor=self.tool_executor, + conversation_store=conv_store, ) + # Cache so inject_event() is reachable for client-facing input + self.node_registry[node_spec.id] = node + return node # Should never reach here due to validation above raise RuntimeError(f"Unhandled node type: {node_spec.node_type}") @@ -832,9 +876,12 @@ class GraphExecutor: source_node_name=current_node_spec.name if current_node_spec else current_node_id, target_node_name=target_node_spec.name if target_node_spec else edge.target, ): - # Validate and clean output before mapping inputs + # Validate and clean output before mapping inputs. + # Use full memory state (not just result.output) because + # target input_keys may come from earlier nodes in the + # graph, not only from the immediate source node. if self.cleansing_config.enabled and target_node_spec: - output_to_validate = result.output + output_to_validate = memory.read_all() validation = self.output_cleaner.validate_output( output=output_to_validate, @@ -1030,10 +1077,13 @@ class GraphExecutor: branch.status = "running" try: - # Validate and clean output before mapping inputs (same as _follow_edges) + # Validate and clean output before mapping inputs (same as _follow_edges). + # Use full memory state since target input_keys may come + # from earlier nodes, not just the immediate source. if self.cleansing_config.enabled and node_spec: + mem_snapshot = memory.read_all() validation = self.output_cleaner.validate_output( - output=source_result.output, + output=mem_snapshot, source_node_id=source_node_spec.id if source_node_spec else "unknown", target_node_spec=node_spec, ) @@ -1044,7 +1094,7 @@ class GraphExecutor: f"{branch.node_id}: {validation.errors}" ) cleaned_output = self.output_cleaner.clean_output( - output=source_result.output, + output=mem_snapshot, source_node_id=source_node_spec.id if source_node_spec else "unknown", target_node_spec=node_spec, validation_errors=validation.errors, diff --git a/core/framework/graph/output_cleaner.py b/core/framework/graph/output_cleaner.py index 2b299f8d..060a8a3a 100644 --- a/core/framework/graph/output_cleaner.py +++ b/core/framework/graph/output_cleaner.py @@ -144,8 +144,11 @@ class OutputCleaner: errors = [] warnings = [] - # Check 1: Required input keys present + # Check 1: Required input keys present (skip nullable keys) + nullable = set(getattr(target_node_spec, "nullable_output_keys", None) or []) for key in target_node_spec.input_keys: + if key in nullable: + continue if key not in output: errors.append(f"Missing required key: '{key}'") continue diff --git a/core/framework/llm/litellm.py b/core/framework/llm/litellm.py index 2148eedb..4404b04e 100644 --- a/core/framework/llm/litellm.py +++ b/core/framework/llm/litellm.py @@ -572,17 +572,21 @@ class LiteLLMProvider(LLMProvider): # and we skip the retry path — nothing was yielded in vain.) has_content = accumulated_text or tool_calls_acc if not has_content and attempt < RATE_LIMIT_MAX_RETRIES: - # If the conversation ends with an assistant message, - # an empty stream is expected (nothing new to say). - # Don't retry — just flush whatever we have. + # If the conversation ends with an assistant or tool + # message, an empty stream is expected — the LLM has + # nothing new to say. Don't burn retries on this; + # let the caller (EventLoopNode) decide what to do. + # Typical case: client_facing node where the LLM set + # all outputs via set_output tool calls, and the tool + # results are the last messages. last_role = next( (m["role"] for m in reversed(full_messages) if m.get("role") != "system"), None, ) - if last_role == "assistant": + if last_role in ("assistant", "tool"): logger.debug( - "[stream] Empty response after assistant message — " - "expected, not retrying." + "[stream] Empty response after %s message — expected, not retrying.", + last_role, ) for event in tail_events: yield event diff --git a/core/framework/runner/tool_registry.py b/core/framework/runner/tool_registry.py index e82374d8..b1315a61 100644 --- a/core/framework/runner/tool_registry.py +++ b/core/framework/runner/tool_registry.py @@ -280,7 +280,16 @@ class ToolRegistry: return base_dir = config_path.parent - for server_config in config.get("servers", []): + + # Support both formats: + # {"servers": [{"name": "x", ...}]} (list format) + # {"server-name": {"transport": ...}, ...} (dict format) + server_list = config.get("servers", []) + if not server_list and "servers" not in config: + # Treat top-level keys as server names + server_list = [{"name": name, **cfg} for name, cfg in config.items()] + + for server_config in server_list: cwd = server_config.get("cwd") if cwd and not Path(cwd).is_absolute(): server_config["cwd"] = str((base_dir / cwd).resolve()) diff --git a/core/framework/runtime/agent_runtime.py b/core/framework/runtime/agent_runtime.py index 90e446c8..ead6048d 100644 --- a/core/framework/runtime/agent_runtime.py +++ b/core/framework/runtime/agent_runtime.py @@ -296,6 +296,25 @@ class AgentRuntime: raise ValueError(f"Entry point '{entry_point_id}' not found") return await stream.wait_for_completion(exec_id, timeout) + async def inject_input(self, node_id: str, content: str) -> bool: + """Inject user input into a running client-facing node. + + Routes input to the EventLoopNode identified by ``node_id`` + across all active streams. Used by the TUI ChatRepl to deliver + user responses during client-facing node execution. + + Args: + node_id: The node currently waiting for input + content: The user's input text + + Returns: + True if input was delivered, False if no matching node found + """ + for stream in self._streams.values(): + if await stream.inject_input(node_id, content): + return True + return False + async def get_goal_progress(self) -> dict[str, Any]: """ Evaluate goal progress across all streams. diff --git a/core/framework/runtime/execution_stream.py b/core/framework/runtime/execution_stream.py index 3c10e98c..14d1b763 100644 --- a/core/framework/runtime/execution_stream.py +++ b/core/framework/runtime/execution_stream.py @@ -153,6 +153,7 @@ class ExecutionStream: # Execution tracking self._active_executions: dict[str, ExecutionContext] = {} self._execution_tasks: dict[str, asyncio.Task] = {} + self._active_executors: dict[str, GraphExecutor] = {} self._execution_results: OrderedDict[str, ExecutionResult] = OrderedDict() self._execution_result_times: dict[str, float] = {} self._completion_events: dict[str, asyncio.Event] = {} @@ -237,6 +238,21 @@ class ExecutionStream: ) ) + async def inject_input(self, node_id: str, content: str) -> bool: + """Inject user input into a running client-facing EventLoopNode. + + Searches active executors for a node matching ``node_id`` and calls + its ``inject_event()`` method to unblock ``_await_user_input()``. + + Returns True if input was delivered, False otherwise. + """ + for executor in self._active_executors.values(): + node = executor.node_registry.get(node_id) + if node is not None and hasattr(node, "inject_event"): + await node.inject_event(content) + return True + return False + async def execute( self, input_data: dict[str, Any], @@ -314,7 +330,10 @@ class ExecutionStream: # Create runtime adapter for this execution runtime_adapter = StreamRuntimeAdapter(self._runtime, execution_id) - # Create executor for this execution + # Create executor for this execution. + # Scope storage by execution_id so each execution gets + # fresh conversations and spillover directories. + exec_storage = self._storage.base_path / "sessions" / execution_id executor = GraphExecutor( runtime=runtime_adapter, llm=self._llm, @@ -322,7 +341,10 @@ class ExecutionStream: tool_executor=self._tool_executor, event_bus=self._event_bus, stream_id=self.stream_id, + storage_path=exec_storage, ) + # Track executor so inject_input() can reach EventLoopNode instances + self._active_executors[execution_id] = executor # Create modified graph with entry point # We need to override the entry_node to use our entry point @@ -336,6 +358,9 @@ class ExecutionStream: session_state=ctx.session_state, ) + # Clean up executor reference + self._active_executors.pop(execution_id, None) + # Store result with retention self._record_execution_result(execution_id, result) diff --git a/core/framework/tui/app.py b/core/framework/tui/app.py index 071cdab1..16975399 100644 --- a/core/framework/tui/app.py +++ b/core/framework/tui/app.py @@ -372,7 +372,9 @@ class AdenTUI(App): elif et == EventType.EXECUTION_FAILED: self.chat_repl.handle_execution_failed(event.data.get("error", "Unknown error")) elif et == EventType.CLIENT_INPUT_REQUESTED: - self.chat_repl.handle_input_requested(event.data.get("prompt", "")) + self.chat_repl.handle_input_requested( + event.node_id or event.data.get("node_id", ""), + ) # --- Graph view events --- if et in ( diff --git a/core/framework/tui/widgets/chat_repl.py b/core/framework/tui/widgets/chat_repl.py index cfc1646b..973b7a7e 100644 --- a/core/framework/tui/widgets/chat_repl.py +++ b/core/framework/tui/widgets/chat_repl.py @@ -6,6 +6,12 @@ Streaming display approach: (Label.update() replaces text in-place, unlike RichLog which is append-only). - On EXECUTION_COMPLETED, the final output is written to RichLog as permanent history. - Tool events are written directly to RichLog as discrete status lines. + +Client-facing input: +- When a client_facing=True EventLoopNode emits CLIENT_INPUT_REQUESTED, the + ChatRepl transitions to "waiting for input" state: input is re-enabled and + subsequent submissions are routed to runtime.inject_input() instead of + starting a new execution. """ import asyncio @@ -66,6 +72,8 @@ class ChatRepl(Vertical): self.runtime = runtime self._current_exec_id: str | None = None self._streaming_snapshot: str = "" + self._waiting_for_input: bool = False + self._input_node_id: str | None = None # Dedicated event loop for agent execution. # Keeps blocking runtime code (LLM calls, MCP tools) off @@ -97,11 +105,38 @@ class ChatRepl(Vertical): history.write("[bold cyan]Chat REPL Ready[/bold cyan] — Type your input below\n") async def on_input_submitted(self, message: Input.Submitted) -> None: - """Handle input submission — fire-and-forget via trigger().""" + """Handle input submission — either start new execution or inject input.""" user_input = message.value.strip() if not user_input: return + # Client-facing input: route to the waiting node + if self._waiting_for_input and self._input_node_id: + self._write_history(f"[bold green]You:[/bold green] {user_input}") + message.input.value = "" + + # Disable input while agent processes the response + chat_input = self.query_one("#chat-input", Input) + chat_input.disabled = True + chat_input.placeholder = "Enter input for agent..." + self._waiting_for_input = False + + indicator = self.query_one("#processing-indicator", Label) + indicator.update("Thinking...") + + node_id = self._input_node_id + self._input_node_id = None + + try: + future = asyncio.run_coroutine_threadsafe( + self.runtime.inject_input(node_id, user_input), + self._agent_loop, + ) + await asyncio.wrap_future(future) + except Exception as e: + self._write_history(f"[bold red]Error delivering input:[/bold red] {e}") + return + # Double-submit guard: reject input while an execution is in-flight if self._current_exec_id is not None: self._write_history("[dim]Agent is still running — please wait.[/dim]") @@ -206,17 +241,23 @@ class ChatRepl(Vertical): indicator = self.query_one("#processing-indicator", Label) indicator.display = False - # Write the final output to permanent history - output_str = str(output.get("output_string", output)) - self._write_history(f"[bold blue]Agent:[/bold blue] {output_str}") + # Write the final streaming snapshot to permanent history (if any) + if self._streaming_snapshot: + self._write_history(f"[bold blue]Agent:[/bold blue] {self._streaming_snapshot}") + else: + output_str = str(output.get("output_string", output)) + self._write_history(f"[bold blue]Agent:[/bold blue] {output_str}") self._write_history("") # separator self._current_exec_id = None self._streaming_snapshot = "" + self._waiting_for_input = False + self._input_node_id = None # Re-enable input chat_input = self.query_one("#chat-input", Input) chat_input.disabled = False + chat_input.placeholder = "Enter input for agent..." chat_input.focus() def handle_execution_failed(self, error: str) -> None: @@ -229,17 +270,34 @@ class ChatRepl(Vertical): self._current_exec_id = None self._streaming_snapshot = "" + self._waiting_for_input = False + self._input_node_id = None # Re-enable input chat_input = self.query_one("#chat-input", Input) chat_input.disabled = False + chat_input.placeholder = "Enter input for agent..." chat_input.focus() - def handle_input_requested(self, prompt: str) -> None: - """Handle the agent requesting input from the user.""" + def handle_input_requested(self, node_id: str) -> None: + """Handle a client-facing node requesting user input. + + Transitions to 'waiting for input' state: flushes the current + streaming snapshot to history, re-enables the input widget, + and sets a flag so the next submission routes to inject_input(). + """ + # Flush accumulated streaming text as agent output + if self._streaming_snapshot: + self._write_history(f"[bold blue]Agent:[/bold blue] {self._streaming_snapshot}") + self._streaming_snapshot = "" + + self._waiting_for_input = True + self._input_node_id = node_id or None + indicator = self.query_one("#processing-indicator", Label) - indicator.update(f"Input requested: {prompt}") + indicator.update("Waiting for your input...") chat_input = self.query_one("#chat-input", Input) - chat_input.placeholder = prompt or "Agent is waiting for input..." + chat_input.disabled = False + chat_input.placeholder = "Type your response..." chat_input.focus() diff --git a/tools/src/aden_tools/tools/__init__.py b/tools/src/aden_tools/tools/__init__.py index 01faebfc..67c271ba 100644 --- a/tools/src/aden_tools/tools/__init__.py +++ b/tools/src/aden_tools/tools/__init__.py @@ -26,6 +26,7 @@ from .email_tool import register_tools as register_email from .example_tool import register_tools as register_example from .file_system_toolkits.apply_diff import register_tools as register_apply_diff from .file_system_toolkits.apply_patch import register_tools as register_apply_patch +from .file_system_toolkits.data_tools import register_tools as register_data_tools from .file_system_toolkits.execute_command_tool import ( register_tools as register_execute_command, ) @@ -82,6 +83,7 @@ def register_all_tools( register_apply_patch(mcp) register_grep_search(mcp) register_execute_command(mcp) + register_data_tools(mcp) register_csv(mcp) return [ @@ -97,6 +99,9 @@ def register_all_tools( "apply_patch", "grep_search", "execute_command_tool", + "load_data", + "save_data", + "list_data_files", "csv_read", "csv_write", "csv_append", diff --git a/tools/src/aden_tools/tools/file_system_toolkits/data_tools/__init__.py b/tools/src/aden_tools/tools/file_system_toolkits/data_tools/__init__.py new file mode 100644 index 00000000..e5558697 --- /dev/null +++ b/tools/src/aden_tools/tools/file_system_toolkits/data_tools/__init__.py @@ -0,0 +1,3 @@ +from .data_tools import register_tools + +__all__ = ["register_tools"] diff --git a/tools/src/aden_tools/tools/file_system_toolkits/data_tools/data_tools.py b/tools/src/aden_tools/tools/file_system_toolkits/data_tools/data_tools.py new file mode 100644 index 00000000..38997296 --- /dev/null +++ b/tools/src/aden_tools/tools/file_system_toolkits/data_tools/data_tools.py @@ -0,0 +1,179 @@ +""" +Data Tools - Load, save, and list data files for agent pipelines. + +These tools let agents store large intermediate results in files and +retrieve them with pagination, keeping the LLM conversation context small. +Used in conjunction with the spillover system: when a tool result is too +large, the framework writes it to a file and the agent can load it back +with load_data(). +""" + +from __future__ import annotations + +import json +from pathlib import Path + +from mcp.server.fastmcp import FastMCP + + +def register_tools(mcp: FastMCP) -> None: + """Register data management tools with the MCP server.""" + + @mcp.tool() + def save_data(filename: str, data: str, data_dir: str) -> dict: + """ + Purpose + Save data to a file for later retrieval by this or downstream nodes. + + When to use + Store large results (search results, profiles, analysis) instead + of passing them inline through set_output. + Returns a brief summary with the filename to reference later. + + Rules & Constraints + filename must be a simple name like 'results.json' — no paths or '..' + data_dir must be the absolute path to the data directory + + Args: + filename: Simple filename like 'github_users.json'. No paths or '..'. + data: The string data to write (typically JSON). + data_dir: Absolute path to the data directory. + + Returns: + Dict with success status and file metadata, or error dict + """ + if not filename or ".." in filename or "/" in filename or "\\" in filename: + return {"error": "Invalid filename. Use simple names like 'users.json'"} + if not data_dir: + return {"error": "data_dir is required"} + + try: + dir_path = Path(data_dir) + dir_path.mkdir(parents=True, exist_ok=True) + path = dir_path / filename + path.write_text(data, encoding="utf-8") + lines = data.count("\n") + 1 + return { + "success": True, + "filename": filename, + "size_bytes": len(data.encode("utf-8")), + "lines": lines, + "preview": data[:200] + ("..." if len(data) > 200 else ""), + } + except Exception as e: + return {"error": f"Failed to save data: {str(e)}"} + + @mcp.tool() + def load_data( + filename: str, + data_dir: str, + offset: int = 0, + limit: int = 50, + ) -> dict: + """ + Purpose + Load data from a previously saved file with pagination. + + When to use + Retrieve large tool results that were spilled to disk. + Read data saved by save_data or by the spillover system. + Page through large files without loading everything into context. + + Rules & Constraints + filename must match a file in data_dir + Returns a page of lines with metadata about the full file + + Args: + filename: The filename to load (as shown in spillover messages or save_data results). + data_dir: Absolute path to the data directory. + offset: 0-based line number to start reading from. Default 0. + limit: Max number of lines to return. Default 50. + + Returns: + Dict with content, pagination info, and metadata + + Examples: + load_data('users.json', '/path/to/data') # first 50 lines + load_data('users.json', '/path/to/data', offset=50, limit=50) # next 50 + load_data('users.json', '/path/to/data', limit=200) # first 200 lines + """ + if not filename or ".." in filename or "/" in filename or "\\" in filename: + return {"error": "Invalid filename"} + if not data_dir: + return {"error": "data_dir is required"} + + try: + offset = int(offset) + limit = int(limit) + path = Path(data_dir) / filename + if not path.exists(): + return {"error": f"File not found: {filename}"} + + content = path.read_text(encoding="utf-8") + size_bytes = len(content.encode("utf-8")) + + # If content is a single long line, try to pretty-print JSON so + # line-based pagination actually works. + all_lines = content.split("\n") + if len(all_lines) <= 2 and size_bytes > 500: + try: + parsed = json.loads(content) + content = json.dumps(parsed, indent=2, ensure_ascii=False) + all_lines = content.split("\n") + except (json.JSONDecodeError, TypeError, ValueError): + pass + + total = len(all_lines) + start = min(offset, total) + end = min(start + limit, total) + sliced = all_lines[start:end] + + return { + "success": True, + "filename": filename, + "content": "\n".join(sliced), + "total_lines": total, + "size_bytes": size_bytes, + "offset": start, + "lines_returned": len(sliced), + "has_more": end < total, + } + except Exception as e: + return {"error": f"Failed to load data: {str(e)}"} + + @mcp.tool() + def list_data_files(data_dir: str) -> dict: + """ + Purpose + List all data files in the data directory. + + When to use + Discover what intermediate results or spillover files are available. + Check what data was saved by previous nodes in the pipeline. + + Args: + data_dir: Absolute path to the data directory. + + Returns: + Dict with list of files and their sizes + """ + if not data_dir: + return {"error": "data_dir is required"} + + try: + dir_path = Path(data_dir) + if not dir_path.exists(): + return {"files": []} + + files = [] + for f in sorted(dir_path.iterdir()): + if f.is_file(): + files.append( + { + "filename": f.name, + "size_bytes": f.stat().st_size, + } + ) + return {"files": files} + except Exception as e: + return {"error": f"Failed to list data files: {str(e)}"} diff --git a/uv.lock b/uv.lock index 08c469a6..a4a726a0 100644 --- a/uv.lock +++ b/uv.lock @@ -769,6 +769,11 @@ dependencies = [ { name = "tools" }, ] +[package.optional-dependencies] +tui = [ + { name = "textual" }, +] + [package.dev-dependencies] dev = [ { name = "ruff" }, @@ -786,8 +791,10 @@ requires-dist = [ { name = "pytest", specifier = ">=8.0" }, { name = "pytest-asyncio", specifier = ">=0.23" }, { name = "pytest-xdist", specifier = ">=3.0" }, + { name = "textual", marker = "extra == 'tui'", specifier = ">=0.75.0" }, { name = "tools", editable = "tools" }, ] +provides-extras = ["tui"] [package.metadata.requires-dev] dev = [ @@ -1310,6 +1317,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/81/db/e655086b7f3a705df045bf0933bdd9c2f79bb3c97bfef1384598bb79a217/keyring-25.7.0-py3-none-any.whl", hash = "sha256:be4a0b195f149690c166e850609a477c532ddbfbaed96a404d4e43f8d5e2689f", size = 39160, upload-time = "2025-11-16T16:26:08.402Z" }, ] +[[package]] +name = "linkify-it-py" +version = "2.0.3" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "uc-micro-py" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/2a/ae/bb56c6828e4797ba5a4821eec7c43b8bf40f69cda4d4f5f8c8a2810ec96a/linkify-it-py-2.0.3.tar.gz", hash = "sha256:68cda27e162e9215c17d786649d1da0021a451bdc436ef9e0fa0ba5234b9b048", size = 27946, upload-time = "2024-02-04T14:48:04.179Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/04/1e/b832de447dee8b582cac175871d2f6c3d5077cc56d5575cadba1fd1cccfa/linkify_it_py-2.0.3-py3-none-any.whl", hash = "sha256:6bcbc417b0ac14323382aef5c5192c0075bf8a9d6b41820a2b66371eac6b6d79", size = 19820, upload-time = "2024-02-04T14:48:02.496Z" }, +] + [[package]] name = "litellm" version = "1.81.7" @@ -1408,6 +1427,11 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/94/54/e7d793b573f298e1c9013b8c4dade17d481164aa517d1d7148619c2cedbf/markdown_it_py-4.0.0-py3-none-any.whl", hash = "sha256:87327c59b172c5011896038353a81343b6754500a08cd7a4973bb48c6d578147", size = 87321, upload-time = "2025-08-11T12:57:51.923Z" }, ] +[package.optional-dependencies] +linkify = [ + { name = "linkify-it-py" }, +] + [[package]] name = "markupsafe" version = "3.0.3" @@ -1507,6 +1531,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/fd/d9/eaa1f80170d2b7c5ba23f3b59f766f3a0bb41155fbc32a69adfa1adaaef9/mcp-1.26.0-py3-none-any.whl", hash = "sha256:904a21c33c25aa98ddbeb47273033c435e595bbacfdb177f4bd87f6dceebe1ca", size = 233615, upload-time = "2026-01-24T19:40:30.652Z" }, ] +[[package]] +name = "mdit-py-plugins" +version = "0.5.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "markdown-it-py" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/b2/fd/a756d36c0bfba5f6e39a1cdbdbfdd448dc02692467d83816dff4592a1ebc/mdit_py_plugins-0.5.0.tar.gz", hash = "sha256:f4918cb50119f50446560513a8e311d574ff6aaed72606ddae6d35716fe809c6", size = 44655, upload-time = "2025-08-11T07:25:49.083Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/fb/86/dd6e5db36df29e76c7a7699123569a4a18c1623ce68d826ed96c62643cae/mdit_py_plugins-0.5.0-py3-none-any.whl", hash = "sha256:07a08422fc1936a5d26d146759e9155ea466e842f5ab2f7d2266dd084c8dab1f", size = 57205, upload-time = "2025-08-11T07:25:47.597Z" }, +] + [[package]] name = "mdurl" version = "0.1.2" @@ -2957,6 +2993,23 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/81/0d/13d1d239a25cbfb19e740db83143e95c772a1fe10202dda4b76792b114dd/starlette-0.52.1-py3-none-any.whl", hash = "sha256:0029d43eb3d273bc4f83a08720b4912ea4b071087a3b48db01b7c839f7954d74", size = 74272, upload-time = "2026-01-18T13:34:09.188Z" }, ] +[[package]] +name = "textual" +version = "7.5.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "markdown-it-py", extra = ["linkify"] }, + { name = "mdit-py-plugins" }, + { name = "platformdirs" }, + { name = "pygments" }, + { name = "rich" }, + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/9f/38/7d169a765993efde5095c70a668bf4f5831bb7ac099e932f2783e9b71abf/textual-7.5.0.tar.gz", hash = "sha256:c730cba1e3d704e8f1ca915b6a3af01451e3bca380114baacf6abf87e9dac8b6", size = 1592319, upload-time = "2026-01-30T13:46:39.881Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/9c/78/96ddb99933e11d91bc6e05edae23d2687e44213066bcbaca338898c73c47/textual-7.5.0-py3-none-any.whl", hash = "sha256:849dfee9d705eab3b2d07b33152b7bd74fb1f5056e002873cc448bce500c6374", size = 718164, upload-time = "2026-01-30T13:46:37.635Z" }, +] + [[package]] name = "tiktoken" version = "0.12.0" @@ -3215,6 +3268,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/c7/b0/003792df09decd6849a5e39c28b513c06e84436a54440380862b5aeff25d/tzdata-2025.3-py2.py3-none-any.whl", hash = "sha256:06a47e5700f3081aab02b2e513160914ff0694bce9947d6b76ebd6bf57cfc5d1", size = 348521, upload-time = "2025-12-13T17:45:33.889Z" }, ] +[[package]] +name = "uc-micro-py" +version = "1.0.3" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/91/7a/146a99696aee0609e3712f2b44c6274566bc368dfe8375191278045186b8/uc-micro-py-1.0.3.tar.gz", hash = "sha256:d321b92cff673ec58027c04015fcaa8bb1e005478643ff4a500882eaab88c48a", size = 6043, upload-time = "2024-02-09T16:52:01.654Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/37/87/1f677586e8ac487e29672e4b17455758fce261de06a0d086167bb760361a/uc_micro_py-1.0.3-py3-none-any.whl", hash = "sha256:db1dffff340817673d7b466ec86114a9dc0e9d4d9b5ba229d9d60e5c12600cd5", size = 6229, upload-time = "2024-02-09T16:52:00.371Z" }, +] + [[package]] name = "urllib3" version = "2.6.3"