merge: PR #4636 feature/memory-inheritance into feat/inbox-management

Brings in append_data tool, continuous conversation mode, conversation
judge, phase compaction, and prompt composer from the memory-inheritance
feature branch.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
bryan
2026-02-12 13:23:25 -08:00
27 changed files with 2611 additions and 235 deletions
+64 -1
View File
@@ -553,6 +553,26 @@ AskUserQuestion(questions=[{
- condition_expr (Python expression, only if conditional)
- priority (positive = forward, negative = feedback/loop-back)
**DETERMINE the graph lifecycle.** Not every agent needs a terminal node:
| Pattern | `terminal_nodes` | When to Use |
|---------|-------------------|-------------|
| **Linear (finish)** | `["last-node"]` | Agent completes a task and exits (batch processing, one-shot generation) |
| **Forever-alive (loop)** | `[]` (empty) | Agent stays alive for continuous interaction (research assistant, personal assistant, monitoring) |
**Forever-alive pattern:** The deep_research_agent example uses `terminal_nodes=[]`. Every leaf node has edges that loop back to earlier nodes, creating a perpetual session. The agent only stops when the user explicitly exits. This is the preferred pattern for interactive, multi-turn agents.
**Key design rules for forever-alive graphs:**
- Every node must have at least one outgoing edge (no dead ends)
- Client-facing nodes block for user input — these are the natural "pause points"
- The user controls when to stop, not the graph
- Sessions accumulate memory across loops — plan for conversation compaction
- Use `conversation_mode="continuous"` to preserve conversation history across node transitions
- `max_iterations` should be set high (e.g., 100) since the agent is designed to run indefinitely
- The agent will NOT enter a "completed" execution state — this is intentional, not a bug
**Ask the user** which lifecycle pattern fits their agent. Default to forever-alive for interactive agents, linear for batch/one-shot tasks.
**RENDER the complete graph as ASCII art.** Make it large and clear — the user needs to see and understand the full workflow at a glance.
**IMPORTANT: Make the ASCII art BIG and READABLE.** Use a box-and-arrow style with generous spacing. Do NOT make it tiny or compressed. Example format:
@@ -912,6 +932,46 @@ result = await executor.execute(graph=graph, goal=goal, input_data=input_data)
---
## REFERENCE: Graph Lifecycle & Conversation Memory
### Terminal vs Forever-Alive Graphs
Agents have two lifecycle patterns:
**Linear (terminal) graphs** have `terminal_nodes=["last-node"]`. Execution ends when the terminal node completes. The session enters a "completed" state. Use for batch processing, one-shot generation, and fire-and-forget tasks.
**Forever-alive graphs** have `terminal_nodes=[]` (empty). Every node has at least one outgoing edge — the graph loops indefinitely. The session **never enters a "completed" state** — this is intentional. The agent stays alive until the user explicitly exits. Use for interactive assistants, research tools, and any agent where the user drives the conversation.
The deep_research_agent example demonstrates this: `report` loops back to either `research` (dig deeper) or `intake` (new topic). The agent is a persistent, interactive assistant.
### Continuous Conversation Mode
When `conversation_mode="continuous"` is set on the GraphSpec, the framework preserves a **single conversation thread** across all node transitions:
**What the framework does automatically:**
- **Inherits conversation**: Same message history carries forward to the next node
- **Composes layered system prompts**: Identity (agent-level) + Narrative (auto-generated state summary) + Focus (per-node instructions)
- **Inserts transition markers**: At each node boundary, a "State of the World" message showing completed phases, current memory, and available data files
- **Accumulates tools**: Once a tool becomes available, it stays available in subsequent nodes
- **Compacts opportunistically**: At phase transitions, old tool results are pruned to stay within token budget
**What this means for agent builders:**
- Nodes don't need to re-explain context — the conversation carries it forward
- Output keys from earlier nodes are available in memory for edge conditions and later nodes
- For forever-alive agents, conversation memory persists across the entire session lifetime
- Plan for compaction: very long sessions will have older tool results summarized automatically
**When to use continuous mode:**
- Interactive agents with client-facing nodes (always)
- Multi-phase workflows where context matters across phases
- Forever-alive agents that loop indefinitely
**When NOT to use continuous mode:**
- Embarrassingly parallel fan-out nodes (each branch should be independent)
- Stateless utility agents that process items independently
---
## REFERENCE: Framework Capabilities for Qualification
Use this reference during STEP 2 to give accurate, honest assessments.
@@ -944,7 +1004,7 @@ Use this reference during STEP 2 to give accurate, honest assessments.
| Use Case | Why It's Problematic | Alternative |
|----------|---------------------|-------------|
| Long-running daemons | Framework is request-response, not persistent | External scheduler + agent |
| Persistent background daemons (no user) | Forever-alive graphs need a user at client-facing nodes; no autonomous background polling without user | External scheduler triggering agent runs |
| Sub-second responses | LLM latency is inherent | Traditional code, no LLM |
| Processing millions of items | Context windows and rate limits | Batch processing + sampling |
| Real-time streaming data | No built-in pub/sub or streaming input | Custom MCP server + agent |
@@ -979,3 +1039,6 @@ Use this reference during STEP 2 to give accurate, honest assessments.
11. **Adding framework gating for LLM behavior** - Fix prompts or use judges, not ad-hoc code
12. **Writing code before user approves the graph** - Always get approval on goal, nodes, and graph BEFORE writing any agent code
13. **Wrong mcp_servers.json format** - Use flat format (no `"mcpServers"` wrapper), `cwd` must be `"../../tools"`, and `command` must be `"uv"` with args `["run", "python", ...]`
14. **Assuming all agents need terminal nodes** - Interactive agents often work best with `terminal_nodes=[]` (forever-alive pattern). The agent never enters "completed" state — this is intentional. Only batch/one-shot agents need terminal nodes
15. **Creating dead-end nodes in forever-alive graphs** - Every node must have at least one outgoing edge. A node with no outgoing edges will cause execution to end unexpectedly, breaking the forever-alive loop
16. **Not using continuous conversation mode for interactive agents** - Multi-phase interactive agents should use `conversation_mode="continuous"` to preserve context across node transitions. Without it, each node starts with a blank conversation and loses all prior context
@@ -1,12 +1,15 @@
"""Agent graph construction for Deep Research Agent."""
from pathlib import Path
from framework.graph import EdgeSpec, EdgeCondition, Goal, SuccessCriterion, Constraint
from framework.graph.edge import GraphSpec
from framework.graph.executor import ExecutionResult, GraphExecutor
from framework.runtime.event_bus import EventBus
from framework.runtime.core import Runtime
from framework.graph.executor import ExecutionResult
from framework.graph.checkpoint_config import CheckpointConfig
from framework.llm import LiteLLMProvider
from framework.runner.tool_registry import ToolRegistry
from framework.runtime.agent_runtime import AgentRuntime, create_agent_runtime
from framework.runtime.execution_stream import EntryPointSpec
from .config import default_config, metadata
from .nodes import (
@@ -120,13 +123,31 @@ edges = [
condition_expr="needs_more_research == False",
priority=2,
),
# report -> research (user wants deeper research on current topic)
EdgeSpec(
id="report-to-research",
source="report",
target="research",
condition=EdgeCondition.CONDITIONAL,
condition_expr="str(next_action).lower() == 'more_research'",
priority=2,
),
# report -> intake (user wants a new topic — default when not more_research)
EdgeSpec(
id="report-to-intake",
source="report",
target="intake",
condition=EdgeCondition.CONDITIONAL,
condition_expr="str(next_action).lower() != 'more_research'",
priority=1,
),
]
# Graph configuration
entry_node = "intake"
entry_points = {"start": "intake"}
pause_nodes = []
terminal_nodes = ["report"]
terminal_nodes = []
class DeepResearchAgent:
@@ -136,6 +157,12 @@ class DeepResearchAgent:
Flow: intake -> research -> review -> report
^ |
+-- feedback loop (if user wants more)
Uses AgentRuntime for proper session management:
- Session-scoped storage (sessions/{session_id}/)
- Checkpointing for resume capability
- Runtime logging
- Data folder for save_data/load_data
"""
def __init__(self, config=None):
@@ -147,10 +174,10 @@ class DeepResearchAgent:
self.entry_points = entry_points
self.pause_nodes = pause_nodes
self.terminal_nodes = terminal_nodes
self._executor: GraphExecutor | None = None
self._graph: GraphSpec | None = None
self._event_bus: EventBus | None = None
self._agent_runtime: AgentRuntime | None = None
self._tool_registry: ToolRegistry | None = None
self._storage_path: Path | None = None
def _build_graph(self) -> GraphSpec:
"""Build the GraphSpec."""
@@ -171,16 +198,20 @@ class DeepResearchAgent:
"max_tool_calls_per_turn": 20,
"max_history_tokens": 32000,
},
conversation_mode="continuous",
identity_prompt=(
"You are a rigorous research agent. You search for information "
"from diverse, authoritative sources, analyze findings critically, "
"and produce well-cited reports. You never fabricate information — "
"every claim must trace back to a source you actually retrieved."
),
)
def _setup(self, mock_mode=False) -> GraphExecutor:
"""Set up the executor with all components."""
from pathlib import Path
def _setup(self, mock_mode=False) -> None:
"""Set up the agent runtime with sessions, checkpoints, and logging."""
self._storage_path = Path.home() / ".hive" / "agents" / "deep_research_agent"
self._storage_path.mkdir(parents=True, exist_ok=True)
storage_path = Path.home() / ".hive" / "agents" / "deep_research_agent"
storage_path.mkdir(parents=True, exist_ok=True)
self._event_bus = EventBus()
self._tool_registry = ToolRegistry()
mcp_config_path = Path(__file__).parent / "mcp_servers.json"
@@ -199,47 +230,63 @@ class DeepResearchAgent:
tools = list(self._tool_registry.get_tools().values())
self._graph = self._build_graph()
runtime = Runtime(storage_path)
self._executor = GraphExecutor(
runtime=runtime,
checkpoint_config = CheckpointConfig(
enabled=True,
checkpoint_on_node_start=False,
checkpoint_on_node_complete=True,
checkpoint_max_age_days=7,
async_checkpoint=True,
)
entry_point_specs = [
EntryPointSpec(
id="default",
name="Default",
entry_node=self.entry_node,
trigger_type="manual",
isolation_level="shared",
)
]
self._agent_runtime = create_agent_runtime(
graph=self._graph,
goal=self.goal,
storage_path=self._storage_path,
entry_points=entry_point_specs,
llm=llm,
tools=tools,
tool_executor=tool_executor,
event_bus=self._event_bus,
storage_path=storage_path,
loop_config=self._graph.loop_config,
checkpoint_config=checkpoint_config,
)
return self._executor
async def start(self, mock_mode=False) -> None:
"""Set up the agent (initialize executor and tools)."""
if self._executor is None:
"""Set up and start the agent runtime."""
if self._agent_runtime is None:
self._setup(mock_mode=mock_mode)
if not self._agent_runtime.is_running:
await self._agent_runtime.start()
async def stop(self) -> None:
"""Clean up resources."""
self._executor = None
self._event_bus = None
"""Stop the agent runtime and clean up."""
if self._agent_runtime and self._agent_runtime.is_running:
await self._agent_runtime.stop()
self._agent_runtime = None
async def trigger_and_wait(
self,
entry_point: str,
input_data: dict,
entry_point: str = "default",
input_data: dict | None = None,
timeout: float | None = None,
session_state: dict | None = None,
) -> ExecutionResult | None:
"""Execute the graph and wait for completion."""
if self._executor is None:
if self._agent_runtime is None:
raise RuntimeError("Agent not started. Call start() first.")
if self._graph is None:
raise RuntimeError("Graph not built. Call start() first.")
return await self._executor.execute(
graph=self._graph,
goal=self.goal,
input_data=input_data,
return await self._agent_runtime.trigger_and_wait(
entry_point_id=entry_point,
input_data=input_data or {},
session_state=session_state,
)
@@ -250,7 +297,7 @@ class DeepResearchAgent:
await self.start(mock_mode=mock_mode)
try:
result = await self.trigger_and_wait(
"start", context, session_state=session_state
"default", context, session_state=session_state
)
return result or ExecutionResult(success=False, error="Execution timeout")
finally:
@@ -10,8 +10,13 @@ intake_node = NodeSpec(
description="Discuss the research topic with the user, clarify scope, and confirm direction",
node_type="event_loop",
client_facing=True,
max_node_visits=0,
input_keys=["topic"],
output_keys=["research_brief"],
success_criteria=(
"The research brief is specific and actionable: it states the topic, "
"the key questions to answer, the desired scope, and depth."
),
system_prompt="""\
You are a research intake specialist. The user wants to research a topic.
Have a brief conversation to clarify what they need.
@@ -38,10 +43,14 @@ research_node = NodeSpec(
name="Research",
description="Search the web, fetch source content, and compile findings",
node_type="event_loop",
max_node_visits=3,
max_node_visits=0,
input_keys=["research_brief", "feedback"],
output_keys=["findings", "sources", "gaps"],
nullable_output_keys=["feedback"],
success_criteria=(
"Findings reference at least 3 distinct sources with URLs. "
"Key claims are substantiated by fetched content, not generated."
),
system_prompt="""\
You are a research agent. Given a research brief, find and analyze sources.
@@ -56,18 +65,19 @@ Work in phases:
and any contradictions between sources.
Important:
- Work in batches of 3-4 tool calls at a time to manage context
- Work in batches of 3-4 tool calls at a time never more than 10 per turn
- After each batch, assess whether you have enough material
- Prefer quality over quantity 5 good sources beat 15 thin ones
- Track which URL each finding comes from (you'll need citations later)
- Call set_output for each key in a SEPARATE turn (not in the same turn as other tool calls)
When done, use set_output:
When done, use set_output (one key at a time, separate turns):
- set_output("findings", "Structured summary: key findings with source URLs for each claim. \
Include themes, contradictions, and confidence levels.")
- set_output("sources", [{"url": "...", "title": "...", "summary": "..."}])
- set_output("gaps", "What aspects of the research brief are NOT well-covered yet, if any.")
""",
tools=["web_search", "web_scrape", "load_data", "save_data", "list_data_files"],
tools=["web_search", "web_scrape", "load_data", "save_data", "append_data", "list_data_files"],
)
# Node 3: Review (client-facing)
@@ -78,9 +88,13 @@ review_node = NodeSpec(
description="Present findings to user and decide whether to research more or write the report",
node_type="event_loop",
client_facing=True,
max_node_visits=3,
max_node_visits=0,
input_keys=["findings", "sources", "gaps", "research_brief"],
output_keys=["needs_more_research", "feedback"],
success_criteria=(
"The user has been presented with findings and has explicitly indicated "
"whether they want more research or are ready for the report."
),
system_prompt="""\
Present the research findings to the user clearly and concisely.
@@ -109,49 +123,66 @@ report_node = NodeSpec(
description="Write a cited HTML report from the findings and present it to the user",
node_type="event_loop",
client_facing=True,
max_node_visits=0,
input_keys=["findings", "sources", "research_brief"],
output_keys=["delivery_status"],
output_keys=["delivery_status", "next_action"],
success_criteria=(
"An HTML report has been saved, the file link has been presented to the user, "
"and the user has indicated what they want to do next."
),
system_prompt="""\
Write a comprehensive research report as an HTML file and present it to the user.
Write a research report as an HTML file and present it to the user.
**STEP 1 Write the HTML report (tool calls, NO text to user yet):**
IMPORTANT: save_data requires TWO separate arguments: filename and data.
Call it like: save_data(filename="report.html", data="<html>...</html>")
Do NOT use _raw, do NOT nest arguments inside a JSON string.
1. Compose a complete, self-contained HTML document with embedded CSS styling.
Use a clean, readable design: max-width container, pleasant typography,
numbered citation links, a table of contents, and a references section.
**STEP 1 Write and save the HTML report (tool calls, NO text to user yet):**
Report structure inside the HTML:
- Title & date
- Executive Summary (2-3 paragraphs)
- Table of Contents
- Findings (organized by theme, with [n] citation links)
- Analysis (synthesis, implications, areas of debate)
- Conclusion (key takeaways, confidence assessment)
- References (numbered list with clickable URLs)
Build a clean HTML document. Keep the HTML concise aim for clarity over length.
Use minimal embedded CSS (a few lines of style, not a full framework).
Requirements:
- Every factual claim must cite its source with [n] notation
- Be objective present multiple viewpoints where sources disagree
- Distinguish well-supported conclusions from speculation
- Answer the original research questions from the brief
Report structure:
- Title & date
- Executive Summary (2-3 paragraphs)
- Key Findings (organized by theme, with [n] citation links)
- Analysis (synthesis, implications)
- Conclusion (key takeaways)
- References (numbered list with clickable URLs)
2. Save the HTML file:
save_data(filename="report.html", data=<your_html>)
Requirements:
- Every factual claim must cite its source with [n] notation
- Be objective present multiple viewpoints where sources disagree
- Answer the original research questions from the brief
3. Get the clickable link:
serve_file_to_user(filename="report.html", label="Research Report")
Save the HTML:
save_data(filename="report.html", data="<html>...</html>")
Then get the clickable link:
serve_file_to_user(filename="report.html", label="Research Report")
If save_data fails, simplify and shorten the HTML, then retry.
**STEP 2 Present the link to the user (text only, NO tool calls):**
Tell the user the report is ready and include the file:// URI from
serve_file_to_user so they can click it to open. Give a brief summary
of what the report covers. Ask if they have questions.
of what the report covers. Ask if they have questions or want to continue.
**STEP 3 After the user responds:**
- Answer follow-up questions from the research material
- When the user is satisfied: set_output("delivery_status", "completed")
- Answer any follow-up questions from the research material
- When the user is ready to move on, ask what they'd like to do next:
- Research a new topic?
- Dig deeper into the current topic?
- Then call set_output:
- set_output("delivery_status", "completed")
- set_output("next_action", "new_topic") if they want a new topic
- set_output("next_action", "more_research") if they want deeper research
""",
tools=["save_data", "serve_file_to_user", "load_data", "list_data_files"],
tools=[
"save_data", "append_data", "edit_data",
"serve_file_to_user", "load_data", "list_data_files",
],
)
__all__ = [
+23 -1
View File
@@ -26,6 +26,17 @@ Use `/hive-debugger` when:
This skill works alongside agents running in TUI mode and provides supervisor-level insights into execution behavior.
### Forever-Alive Agent Awareness
Some agents use `terminal_nodes=[]` (the "forever-alive" pattern), meaning they loop indefinitely and never enter a "completed" execution state. For these agents:
- Sessions with status "in_progress" or "paused" are **normal**, not failures
- High step counts, long durations, and many node visits are expected behavior
- The agent stops only when the user explicitly exits — there is no graph-driven completion
- Debug focus should be on **quality of individual node visits and iterations**, not whether the session reached a terminal state
- Conversation memory accumulates across loops — watch for context overflow and stale data issues
**How to identify forever-alive agents:** Check `agent.py` or `agent.json` for `terminal_nodes=[]` (empty list). If empty, the agent is forever-alive.
---
## Prerequisites
@@ -142,6 +153,7 @@ Store the selected mode for the session.
- Check `attention_summary.categories` for issue types
- Note the `run_id` of problematic sessions
- Check `status` field: "degraded", "failure", "in_progress"
- **For forever-alive agents:** Sessions with status "in_progress" or "paused" are normal — these agents never reach "completed". Only flag sessions with `needs_attention: true` or actual error indicators (tool failures, retry loops, missing outputs). High step counts alone do not indicate a problem.
3. **Attention flag triggers to understand:**
From runtime_logger.py, runs are flagged when:
@@ -199,13 +211,20 @@ Which run would you like to investigate?
| **Tool Errors** | `tool_error_count > 0`, `attention_reasons` contains "tool_failures" | Tool calls failed (API errors, timeouts, auth issues) |
| **Retry Loops** | `retry_count > 3`, `verdict_counts.RETRY > 5` | Judge repeatedly rejecting outputs |
| **Guard Failures** | `guard_reject_count > 0` | Output validation failed (wrong types, missing keys) |
| **Stalled Execution** | `total_steps > 20`, `verdict_counts.CONTINUE > 10` | EventLoopNode not making progress |
| **Stalled Execution** | `total_steps > 20`, `verdict_counts.CONTINUE > 10` | EventLoopNode not making progress. **Caveat:** Forever-alive agents may legitimately have high step counts — check if agent is blocked at a client-facing node (normal) vs genuinely stuck in a loop |
| **High Latency** | `latency_ms > 60000`, `avg_step_latency > 5000` | Slow tool calls or LLM responses |
| **Client-Facing Issues** | `client_input_requested` but no `user_input_received` | Premature set_output before user input |
| **Edge Routing Errors** | `exit_status == "no_valid_edge"`, `attention_reasons` contains "routing_issue" | No edges match current state |
| **Memory/Context Issues** | `tokens_used > 100000`, `context_overflow_count > 0` | Conversation history too long |
| **Constraint Violations** | Compare output against goal constraints | Agent violated goal-level rules |
**Forever-Alive Agent Caveat:** If the agent uses `terminal_nodes=[]`, sessions will never reach "completed" status. This is by design. When debugging these agents, focus on:
- Whether individual node visits succeed (not whether the graph "finishes")
- Quality of each loop iteration — are outputs improving or degrading across loops?
- Whether client-facing nodes are correctly blocking for user input
- Memory accumulation issues: stale data from previous loops, context overflow across many iterations
- Conversation compaction behavior: is the conversation growing unbounded?
3. **Analyze each flagged node:**
- Node ID and name
- Exit status
@@ -1015,6 +1034,9 @@ Your agent should now work correctly!"
3. **Don't ignore edge conditions** - Missing edges cause routing failures
4. **Don't overlook judge configuration** - Mismatched expectations cause retry loops
5. **Don't forget nullable_output_keys** - Optional inputs need explicit marking
6. **Don't diagnose "in_progress" as a failure for forever-alive agents** - Agents with `terminal_nodes=[]` are designed to never enter "completed" state. This is intentional. Focus on quality of individual node visits, not session completion status
7. **Don't ignore conversation memory issues in long-running sessions** - In continuous conversation mode, history grows across node transitions and loop iterations. Watch for context overflow (tokens_used > 100K), stale data from previous loops affecting edge conditions, and compaction failures that cause the LLM to lose important context
8. **Don't confuse "waiting for user" with "stalled"** - Client-facing nodes in forever-alive agents block for user input by design. A session paused at a client-facing node is working correctly, not stalled
---
+71 -5
View File
@@ -27,6 +27,9 @@ class Message:
tool_use_id: str | None = None
tool_calls: list[dict[str, Any]] | None = None
is_error: bool = False
# Phase-aware compaction metadata (continuous mode)
phase_id: str | None = None
is_transition_marker: bool = False
def to_llm_dict(self) -> dict[str, Any]:
"""Convert to OpenAI-format message dict."""
@@ -60,6 +63,10 @@ class Message:
d["tool_calls"] = self.tool_calls
if self.is_error:
d["is_error"] = self.is_error
if self.phase_id is not None:
d["phase_id"] = self.phase_id
if self.is_transition_marker:
d["is_transition_marker"] = self.is_transition_marker
return d
@classmethod
@@ -72,6 +79,8 @@ class Message:
tool_use_id=data.get("tool_use_id"),
tool_calls=data.get("tool_calls"),
is_error=data.get("is_error", False),
phase_id=data.get("phase_id"),
is_transition_marker=data.get("is_transition_marker", False),
)
@@ -188,6 +197,7 @@ class NodeConversation:
self._next_seq: int = 0
self._meta_persisted: bool = False
self._last_api_input_tokens: int | None = None
self._current_phase: str | None = None
# --- Properties --------------------------------------------------------
@@ -195,6 +205,33 @@ class NodeConversation:
def system_prompt(self) -> str:
return self._system_prompt
def update_system_prompt(self, new_prompt: str) -> None:
"""Update the system prompt.
Used in continuous conversation mode at phase transitions to swap
Layer 3 (focus) while preserving the conversation history.
"""
self._system_prompt = new_prompt
def set_current_phase(self, phase_id: str) -> None:
"""Set the current phase ID. Subsequent messages will be stamped with it."""
self._current_phase = phase_id
async def switch_store(self, new_store: ConversationStore) -> None:
"""Switch to a new persistence store at a phase transition.
Subsequent messages are written to *new_store*. Meta (system
prompt, config) is re-persisted on the next write so the new
store's ``meta.json`` reflects the updated prompt.
"""
self._store = new_store
self._meta_persisted = False
await new_store.write_cursor({"next_seq": self._next_seq})
@property
def current_phase(self) -> str | None:
return self._current_phase
@property
def messages(self) -> list[Message]:
"""Return a defensive copy of the message list."""
@@ -216,8 +253,19 @@ class NodeConversation:
# --- Add messages ------------------------------------------------------
async def add_user_message(self, content: str) -> Message:
msg = Message(seq=self._next_seq, role="user", content=content)
async def add_user_message(
self,
content: str,
*,
is_transition_marker: bool = False,
) -> Message:
msg = Message(
seq=self._next_seq,
role="user",
content=content,
phase_id=self._current_phase,
is_transition_marker=is_transition_marker,
)
self._messages.append(msg)
self._next_seq += 1
await self._persist(msg)
@@ -233,6 +281,7 @@ class NodeConversation:
role="assistant",
content=content,
tool_calls=tool_calls,
phase_id=self._current_phase,
)
self._messages.append(msg)
self._next_seq += 1
@@ -251,6 +300,7 @@ class NodeConversation:
content=content,
tool_use_id=tool_use_id,
is_error=is_error,
phase_id=self._current_phase,
)
self._messages.append(msg)
self._next_seq += 1
@@ -380,6 +430,11 @@ class NodeConversation:
spillover filename reference (if any). Message structure (role,
seq, tool_use_id) stays valid for the LLM API.
Phase-aware behavior (continuous mode): when messages have ``phase_id``
metadata, all messages in the current phase are protected regardless of
token budget. Transition markers are never pruned. Older phases' tool
results are pruned more aggressively.
Error tool results are never pruned they prevent re-calling
failing tools.
@@ -388,13 +443,18 @@ class NodeConversation:
if not self._messages:
return 0
# Phase 1: Walk backward, classify tool results as protected vs pruneable
# Walk backward, classify tool results as protected vs pruneable
protected_tokens = 0
pruneable: list[int] = [] # indices into self._messages
pruneable_tokens = 0
for i in range(len(self._messages) - 1, -1, -1):
msg = self._messages[i]
# Transition markers are never pruned (any role)
if msg.is_transition_marker:
continue
if msg.role != "tool":
continue
if msg.is_error:
@@ -402,6 +462,10 @@ class NodeConversation:
if msg.content.startswith("[Pruned tool result"):
continue # already pruned
# Phase-aware: protect current phase messages
if self._current_phase and msg.phase_id == self._current_phase:
continue
est = len(msg.content) // 4
if protected_tokens < protect_tokens:
protected_tokens += est
@@ -409,11 +473,11 @@ class NodeConversation:
pruneable.append(i)
pruneable_tokens += est
# Phase 2: Only prune if enough to be worthwhile
# Only prune if enough to be worthwhile
if pruneable_tokens < min_prune_tokens:
return 0
# Phase 3: Replace content with compact placeholder
# Replace content with compact placeholder
count = 0
for i in pruneable:
msg = self._messages[i]
@@ -436,6 +500,8 @@ class NodeConversation:
tool_use_id=msg.tool_use_id,
tool_calls=msg.tool_calls,
is_error=msg.is_error,
phase_id=msg.phase_id,
is_transition_marker=msg.is_transition_marker,
)
count += 1
+177
View File
@@ -0,0 +1,177 @@
"""Level 2 Conversation-Aware Judge.
When a node has `success_criteria` set, the implicit judge upgrades:
after Level 0 passes (all output keys set), a fast LLM call evaluates
whether the conversation actually meets the criteria.
This prevents nodes from "checking boxes" (setting output keys) without
doing quality work. The LLM reads the recent conversation and assesses
whether the phase's goal was genuinely accomplished.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass
from typing import Any
from framework.graph.conversation import NodeConversation
from framework.llm.provider import LLMProvider
logger = logging.getLogger(__name__)
@dataclass
class PhaseVerdict:
"""Result of Level 2 conversation-aware evaluation."""
action: str # "ACCEPT" or "RETRY"
confidence: float = 0.8
feedback: str = ""
async def evaluate_phase_completion(
llm: LLMProvider,
conversation: NodeConversation,
phase_name: str,
phase_description: str,
success_criteria: str,
accumulator_state: dict[str, Any],
max_history_tokens: int = 8_196,
) -> PhaseVerdict:
"""Level 2 judge: read the conversation and evaluate quality.
Only called after Level 0 passes (all output keys set).
Args:
llm: LLM provider for evaluation
conversation: The current conversation to evaluate
phase_name: Name of the current phase/node
phase_description: Description of the phase
success_criteria: Natural-language criteria for phase completion
accumulator_state: Current output key values
max_history_tokens: Main conversation token budget (judge gets 20%)
Returns:
PhaseVerdict with action and optional feedback
"""
# Build a compact view of the recent conversation
recent_messages = _extract_recent_context(conversation, max_messages=10)
outputs_summary = _format_outputs(accumulator_state)
system_prompt = (
"You are a quality judge evaluating whether a phase of work is complete. "
"Be concise. Evaluate based on the success criteria, not on style."
)
user_prompt = f"""Evaluate this phase:
PHASE: {phase_name}
DESCRIPTION: {phase_description}
SUCCESS CRITERIA:
{success_criteria}
OUTPUTS SET:
{outputs_summary}
RECENT CONVERSATION:
{recent_messages}
Has this phase accomplished its goal based on the success criteria?
Respond in exactly this format:
ACTION: ACCEPT or RETRY
CONFIDENCE: 0.X
FEEDBACK: (reason if RETRY, empty if ACCEPT)"""
try:
response = llm.complete(
messages=[{"role": "user", "content": user_prompt}],
system=system_prompt,
max_tokens=max(1024, max_history_tokens // 5),
max_retries=1,
)
if not response.content or not response.content.strip():
logger.debug("Level 2 judge: empty response, accepting by default")
return PhaseVerdict(action="ACCEPT", confidence=0.5, feedback="")
return _parse_verdict(response.content)
except Exception as e:
logger.warning(f"Level 2 judge failed, accepting by default: {e}")
# On failure, don't block — Level 0 already passed
return PhaseVerdict(action="ACCEPT", confidence=0.5, feedback="")
def _extract_recent_context(conversation: NodeConversation, max_messages: int = 10) -> str:
"""Extract recent conversation messages for evaluation."""
messages = conversation.messages
recent = messages[-max_messages:] if len(messages) > max_messages else messages
parts = []
for msg in recent:
role = msg.role.upper()
content = msg.content or ""
# Truncate long tool results
if msg.role == "tool" and len(content) > 200:
content = content[:200] + "..."
if content.strip():
parts.append(f"[{role}]: {content.strip()}")
return "\n".join(parts) if parts else "(no messages)"
def _format_outputs(accumulator_state: dict[str, Any]) -> str:
"""Format output key values for evaluation.
Lists and dicts get structural formatting so the judge can assess
quantity and structure, not just a truncated stringification.
"""
if not accumulator_state:
return "(none)"
parts = []
for key, value in accumulator_state.items():
if isinstance(value, list):
# Show count + brief per-item preview so the judge can
# verify quantity without the full serialization.
items_preview = []
for i, item in enumerate(value[:8]):
item_str = str(item)
if len(item_str) > 150:
item_str = item_str[:150] + "..."
items_preview.append(f" [{i}]: {item_str}")
val_str = f"list ({len(value)} items):\n" + "\n".join(items_preview)
if len(value) > 8:
val_str += f"\n ... and {len(value) - 8} more"
elif isinstance(value, dict):
val_str = str(value)
if len(val_str) > 400:
val_str = val_str[:400] + "..."
else:
val_str = str(value)
if len(val_str) > 300:
val_str = val_str[:300] + "..."
parts.append(f" {key}: {val_str}")
return "\n".join(parts)
def _parse_verdict(response: str) -> PhaseVerdict:
"""Parse LLM response into PhaseVerdict."""
action = "ACCEPT"
confidence = 0.8
feedback = ""
for line in response.strip().split("\n"):
line = line.strip()
if line.startswith("ACTION:"):
action_str = line.split(":", 1)[1].strip().upper()
if action_str in ("ACCEPT", "RETRY"):
action = action_str
elif line.startswith("CONFIDENCE:"):
try:
confidence = float(line.split(":", 1)[1].strip())
except ValueError:
pass
elif line.startswith("FEEDBACK:"):
feedback = line.split(":", 1)[1].strip()
return PhaseVerdict(action=action, confidence=confidence, feedback=feedback)
+19
View File
@@ -435,6 +435,25 @@ class GraphSpec(BaseModel):
description="EventLoopNode configuration (max_iterations, max_tool_calls_per_turn, etc.)",
)
# Conversation mode
conversation_mode: str = Field(
default="continuous",
description=(
"How conversations flow between event_loop nodes. "
"'continuous' (default): one conversation threads through all "
"event_loop nodes with cumulative tools and layered prompt composition. "
"'isolated': each node gets a fresh conversation."
),
)
identity_prompt: str | None = Field(
default=None,
description=(
"Agent-level identity prompt (Layer 1 of the onion model). "
"In continuous mode, this is the static identity that persists "
"unchanged across all node transitions. In isolated mode, ignored."
),
)
# Metadata
description: str = ""
created_by: str = "" # "human" or "builder_agent"
+222 -41
View File
@@ -149,7 +149,7 @@ class EventLoopNode(NodeProtocol):
1. Try to restore from durable state (crash recovery)
2. If no prior state, init from NodeSpec.system_prompt + input_keys
3. Loop: drain injection queue -> stream LLM -> execute tools
-> if client_facing + ask_user called: block for user input
-> if client_facing: block for user input (see below)
-> judge evaluates (acceptance criteria)
(each add_* and set_output writes through to store immediately)
4. Publish events to EventBus at each stage
@@ -157,11 +157,17 @@ class EventLoopNode(NodeProtocol):
6. Terminate when judge returns ACCEPT, shutdown signaled, or max iterations
7. Build output dict from OutputAccumulator
Client-facing blocking: When ``client_facing=True``, a synthetic
``ask_user`` tool is injected. The node blocks for user input ONLY
when the LLM explicitly calls ``ask_user()``. Text-only turns
without ``ask_user`` flow through without blocking, allowing the LLM
to stream progress updates and summaries freely.
Client-facing blocking (``client_facing=True``):
- **Text-only turns** (no real tool calls, no set_output)
automatically block for user input. If the LLM is talking to the
user (not calling tools or setting outputs), it should wait for
the user's response before the judge runs.
- **Work turns** (tool calls or set_output) flow through without
blocking the LLM is making progress, not asking the user.
- A synthetic ``ask_user`` tool is also injected for explicit
blocking when the LLM wants to be deliberate about requesting
input (e.g. mid-tool-call).
Always returns NodeResult with retryable=False semantics. The executor
must NOT retry event loop nodes -- retry is handled internally by the
@@ -234,23 +240,44 @@ class EventLoopNode(NodeProtocol):
return NodeResult(success=False, error=error_msg)
# 2. Restore or create new conversation + accumulator
conversation, accumulator, start_iteration = await self._restore(ctx)
if conversation is None:
system_prompt = ctx.node_spec.system_prompt or ""
# Track whether we're in continuous mode (conversation threaded across nodes)
_is_continuous = getattr(ctx, "continuous_mode", False)
conversation = NodeConversation(
system_prompt=system_prompt,
max_history_tokens=self._config.max_history_tokens,
output_keys=ctx.node_spec.output_keys or None,
store=self._conversation_store,
)
if _is_continuous and ctx.inherited_conversation is not None:
# Continuous mode with inherited conversation from prior node.
# This takes priority over store restoration — when the graph loops
# back to a previously-visited node, the inherited conversation
# carries forward the full thread rather than restoring stale state.
# System prompt already updated by executor. Transition marker
# already inserted by executor. Fresh accumulator for this phase.
# Phase already set by executor via set_current_phase().
conversation = ctx.inherited_conversation
conversation._output_keys = ctx.node_spec.output_keys or None
accumulator = OutputAccumulator(store=self._conversation_store)
start_iteration = 0
else:
# Try crash-recovery restore from store, then fall back to fresh.
conversation, accumulator, start_iteration = await self._restore(ctx)
if conversation is None:
# Fresh conversation: either isolated mode or first node in continuous mode.
system_prompt = ctx.node_spec.system_prompt or ""
# Add initial user message from input data
initial_message = self._build_initial_message(ctx)
if initial_message:
await conversation.add_user_message(initial_message)
conversation = NodeConversation(
system_prompt=system_prompt,
max_history_tokens=self._config.max_history_tokens,
output_keys=ctx.node_spec.output_keys or None,
store=self._conversation_store,
)
# Stamp phase for first node in continuous mode
if _is_continuous:
conversation.set_current_phase(ctx.node_id)
accumulator = OutputAccumulator(store=self._conversation_store)
start_iteration = 0
# Add initial user message from input data
initial_message = self._build_initial_message(ctx)
if initial_message:
await conversation.add_user_message(initial_message)
# 3. Build tool list: node tools + synthetic set_output + ask_user tools
tools = list(ctx.available_tools)
@@ -305,6 +332,7 @@ class EventLoopNode(NodeProtocol):
output=accumulator.to_dict(),
tokens_used=total_input_tokens + total_output_tokens,
latency_ms=latency_ms,
conversation=conversation if _is_continuous else None,
)
# 6b. Drain injection queue
@@ -429,6 +457,7 @@ class EventLoopNode(NodeProtocol):
output=accumulator.to_dict(),
tokens_used=total_input_tokens + total_output_tokens,
latency_ms=latency_ms,
conversation=conversation if _is_continuous else None,
)
# 6f. Stall detection
@@ -479,6 +508,7 @@ class EventLoopNode(NodeProtocol):
output=accumulator.to_dict(),
tokens_used=total_input_tokens + total_output_tokens,
latency_ms=latency_ms,
conversation=conversation if _is_continuous else None,
)
# 6g. Write cursor checkpoint
@@ -486,12 +516,27 @@ class EventLoopNode(NodeProtocol):
# 6h. Client-facing input blocking
#
# Block ONLY when the LLM explicitly calls ask_user().
# Text-only turns and set_output-only turns flow through
# without blocking, allowing progress updates and summaries
# to stream freely. After user input arrives, fall through
# to judge evaluation (6i) — the judge handles acceptance.
if ctx.node_spec.client_facing and user_input_requested:
# Two triggers:
# (a) Explicit ask_user() — always blocks, then falls through
# to judge evaluation (6i).
# (b) Auto-block — a text-only turn (no real tools, no
# set_output) from a client-facing node is addressed to the
# user. Block for their response, then *skip* judge so the
# next LLM turn can process the reply without confusing
# "missing outputs" feedback.
#
# Turns that include tool calls or set_output are *work*, not
# conversation — they flow through without blocking.
_cf_block = False
_cf_auto = False
if ctx.node_spec.client_facing:
if user_input_requested:
_cf_block = True
elif assistant_text and not real_tool_results and not outputs_set:
_cf_block = True
_cf_auto = True
if _cf_block:
if self._shutdown:
await self._publish_loop_completed(stream_id, node_id, iteration + 1)
latency_ms = int((time.time() - start_time) * 1000)
@@ -531,9 +576,15 @@ class EventLoopNode(NodeProtocol):
output=accumulator.to_dict(),
tokens_used=total_input_tokens + total_output_tokens,
latency_ms=latency_ms,
conversation=conversation if _is_continuous else None,
)
logger.info("[%s] iter=%d: blocking for user input...", node_id, iteration)
logger.info(
"[%s] iter=%d: blocking for user input (auto=%s)...",
node_id,
iteration,
_cf_auto,
)
got_input = await self._await_user_input(ctx)
logger.info("[%s] iter=%d: unblocked, got_input=%s", node_id, iteration, got_input)
if not got_input:
@@ -575,11 +626,34 @@ class EventLoopNode(NodeProtocol):
output=accumulator.to_dict(),
tokens_used=total_input_tokens + total_output_tokens,
latency_ms=latency_ms,
conversation=conversation if _is_continuous else None,
)
user_interaction_count += 1
recent_responses.clear()
# Fall through to judge evaluation (6i)
if _cf_auto:
# Auto-block: skip judge — let the LLM process the
# user's response on the next turn without confusing
# "missing outputs" feedback injected between the
# assistant's question and the user's answer.
_continue_count += 1
if ctx.runtime_logger:
iter_latency_ms = int((time.time() - iter_start) * 1000)
ctx.runtime_logger.log_step(
node_id=node_id,
node_type="event_loop",
step_index=iteration,
verdict="CONTINUE",
verdict_feedback="Auto-blocked for user input (pre-interaction)",
tool_calls=logged_tool_calls,
llm_text=assistant_text,
input_tokens=turn_tokens.get("input", 0),
output_tokens=turn_tokens.get("output", 0),
latency_ms=iter_latency_ms,
)
continue
# Explicit ask_user: fall through to judge evaluation (6i)
# 6i. Judge evaluation
should_judge = (
@@ -702,6 +776,7 @@ class EventLoopNode(NodeProtocol):
output=accumulator.to_dict(),
tokens_used=total_input_tokens + total_output_tokens,
latency_ms=latency_ms,
conversation=conversation if _is_continuous else None,
)
elif verdict.action == "ESCALATE":
@@ -746,6 +821,7 @@ class EventLoopNode(NodeProtocol):
output=accumulator.to_dict(),
tokens_used=total_input_tokens + total_output_tokens,
latency_ms=latency_ms,
conversation=conversation if _is_continuous else None,
)
elif verdict.action == "RETRY":
@@ -795,6 +871,7 @@ class EventLoopNode(NodeProtocol):
output=accumulator.to_dict(),
tokens_used=total_input_tokens + total_output_tokens,
latency_ms=latency_ms,
conversation=conversation if _is_continuous else None,
)
async def inject_event(self, content: str) -> None:
@@ -819,8 +896,11 @@ class EventLoopNode(NodeProtocol):
async def _await_user_input(self, ctx: NodeContext) -> bool:
"""Block until user input arrives or shutdown is signaled.
Called when a client_facing node explicitly calls ask_user()
an intentional conversational turn boundary.
Called in two situations:
- The LLM explicitly calls ask_user().
- Auto-block: any text-only turn (no real tools, no set_output)
from a client-facing node ensures the user sees and responds
before the judge runs.
Returns True if input arrived, False if shutdown was signaled.
"""
@@ -893,6 +973,21 @@ class EventLoopNode(NodeProtocol):
await self._compact_tiered(ctx, conversation, accumulator)
messages = conversation.to_llm_messages()
# Defensive guard: ensure messages don't end with an assistant
# message. The Anthropic API rejects "assistant message prefill"
# (conversations must end with a user or tool message). This can
# happen after compaction trims messages leaving an assistant tail,
# or when a conversation is inherited without a transition marker
# (e.g. parallel-branch execution).
if messages and messages[-1].get("role") == "assistant":
logger.info(
"[%s] Messages end with assistant — injecting continuation prompt",
node_id,
)
await conversation.add_user_message("[Continue working on your current task.]")
messages = conversation.to_llm_messages()
accumulated_text = ""
tool_calls: list[ToolCallEvent] = []
@@ -1027,7 +1122,25 @@ class EventLoopNode(NodeProtocol):
)
else:
# --- Real tool execution ---
result = await self._execute_tool(tc)
# Guard: detect truncated tool arguments (_raw fallback
# from litellm when json.loads fails on max_tokens hit).
if "_raw" in tc.tool_input:
result = ToolResult(
tool_use_id=tc.tool_use_id,
content=(
f"Tool call to '{tc.tool_name}' failed: your arguments "
"were truncated (hit output token limit). "
"Simplify or shorten your arguments and try again."
),
is_error=True,
)
logger.warning(
"[%s] Blocked truncated _raw tool call: %s",
node_id,
tc.tool_name,
)
else:
result = await self._execute_tool(tc)
result = self._truncate_tool_result(result, tc.tool_name)
tool_entry = {
"tool_use_id": tc.tool_use_id,
@@ -1307,6 +1420,26 @@ class EventLoopNode(NodeProtocol):
f"Use set_output to set at least one of: {output_keys}"
),
)
# Level 2: conversation-aware quality check (if success_criteria set)
if ctx.node_spec.success_criteria and ctx.llm:
from framework.graph.conversation_judge import evaluate_phase_completion
verdict = await evaluate_phase_completion(
llm=ctx.llm,
conversation=conversation,
phase_name=ctx.node_spec.name,
phase_description=ctx.node_spec.description,
success_criteria=ctx.node_spec.success_criteria,
accumulator_state=accumulator.to_dict(),
max_history_tokens=self._config.max_history_tokens,
)
if verdict.action != "ACCEPT":
return JudgeVerdict(
action=verdict.action,
feedback=verdict.feedback or "Phase criteria not met.",
)
return JudgeVerdict(action="ACCEPT")
else:
return JudgeVerdict(
@@ -1332,26 +1465,43 @@ class EventLoopNode(NodeProtocol):
Used in compaction summaries to prevent the LLM from re-calling
tools it already called. Extracts:
- Tool call counts (e.g. "github_list_pull_requests (6x)")
- Tool call details: name, count, and *inputs* for key tools
(search queries, scrape URLs, loaded filenames)
- Files saved via save_data
- Outputs set via set_output
- Errors encountered
"""
tool_counts: dict[str, int] = {}
# Per-tool: list of input summaries (one per call)
tool_calls_detail: dict[str, list[str]] = {}
files_saved: list[str] = []
outputs_set: list[str] = []
errors: list[str] = []
# Tool-specific input extractors: return a short summary string
def _summarize_input(name: str, args: dict) -> str:
if name == "web_search":
return args.get("query", "")
if name == "web_scrape":
return args.get("url", "")
if name == "load_data":
return args.get("filename", "")
if name == "save_data":
return args.get("filename", "")
return ""
for msg in conversation.messages:
if msg.role == "assistant" and msg.tool_calls:
for tc in msg.tool_calls:
func = tc.get("function", {})
name = func.get("name", "unknown")
tool_counts[name] = tool_counts.get(name, 0) + 1
try:
args = json.loads(func.get("arguments", "{}"))
except (json.JSONDecodeError, TypeError):
args = {}
summary = _summarize_input(name, args)
tool_calls_detail.setdefault(name, []).append(summary)
if name == "save_data" and args.get("filename"):
files_saved.append(args["filename"])
if name == "set_output" and args.get("key"):
@@ -1362,9 +1512,18 @@ class EventLoopNode(NodeProtocol):
errors.append(preview)
parts: list[str] = []
if tool_counts:
lines = [f" {n} ({c}x)" for n, c in tool_counts.items()]
parts.append("TOOLS ALREADY CALLED:\n" + "\n".join(lines[:max_entries]))
if tool_calls_detail:
lines: list[str] = []
for name, inputs in list(tool_calls_detail.items())[:max_entries]:
count = len(inputs)
# Include input details for tools where inputs matter
non_empty = [s for s in inputs if s]
if non_empty:
detail_lines = [f" - {s[:120]}" for s in non_empty[:8]]
lines.append(f" {name} ({count}x):\n" + "\n".join(detail_lines))
else:
lines.append(f" {name} ({count}x)")
parts.append("TOOLS ALREADY CALLED:\n" + "\n".join(lines))
if files_saved:
unique = list(dict.fromkeys(files_saved))
parts.append("FILES SAVED: " + ", ".join(unique))
@@ -1453,6 +1612,12 @@ class EventLoopNode(NodeProtocol):
if limit <= 0 or result.is_error or len(result.content) <= limit:
return result
# load_data is the designated mechanism for reading spilled files.
# The LLM controls chunk size via offset/limit — re-spilling its
# result would create a circular loop.
if tool_name == "load_data":
return result
# Determine a preview size — leave room for the metadata wrapper
preview_chars = max(limit - 300, limit // 2)
preview = result.content[:preview_chars]
@@ -1698,12 +1863,28 @@ class EventLoopNode(NodeProtocol):
if spec.tools:
parts.append(f"AVAILABLE TOOLS: {', '.join(spec.tools)}")
# 5. Spillover files hint
# 5. Spillover files — list actual files so the LLM can load
# them immediately instead of having to call list_data_files first.
if self._config.spillover_dir:
parts.append(
"NOTE: Large tool results were saved to files. "
"Use load_data(filename='<filename>') to read them."
)
try:
from pathlib import Path
data_dir = Path(self._config.spillover_dir)
if data_dir.is_dir():
files = sorted(f.name for f in data_dir.iterdir() if f.is_file())
if files:
file_list = "\n".join(f" - {f}" for f in files[:30])
parts.append("DATA FILES (use load_data to read):\n" + file_list)
else:
parts.append(
"NOTE: Large tool results may have been saved to files. "
"Use list_data_files() to check."
)
except Exception:
parts.append(
"NOTE: Large tool results were saved to files. "
"Use load_data(filename='<filename>') to read them."
)
# 6. Tool call history (prevent re-calling tools)
if conversation is not None:
+151 -7
View File
@@ -186,6 +186,52 @@ class GraphExecutor:
# Pause/resume control
self._pause_requested = asyncio.Event()
def _write_progress(
self,
current_node: str,
path: list[str],
memory: Any,
node_visit_counts: dict[str, int],
) -> None:
"""Update state.json with live progress at node transitions.
Reads the existing state.json (written by ExecutionStream at session
start) and patches the progress fields in-place. This keeps
state.json as the single source of truth readers always see
current progress, not stale initial values.
The write is synchronous and best-effort: never blocks execution.
"""
if not self._storage_path:
return
try:
import json as _json
from datetime import datetime
state_path = self._storage_path / "state.json"
if state_path.exists():
state_data = _json.loads(state_path.read_text(encoding="utf-8"))
else:
state_data = {}
# Patch progress fields
progress = state_data.setdefault("progress", {})
progress["current_node"] = current_node
progress["path"] = list(path)
progress["node_visit_counts"] = dict(node_visit_counts)
progress["steps_executed"] = len(path)
# Update timestamp
timestamps = state_data.setdefault("timestamps", {})
timestamps["updated_at"] = datetime.now().isoformat()
# Memory keys (lightweight — just keys, not values)
state_data["memory_keys"] = list(memory.read_all().keys())
state_path.write_text(_json.dumps(state_data, indent=2), encoding="utf-8")
except Exception:
pass # Best-effort — never block execution
def _validate_tools(self, graph: GraphSpec) -> list[str]:
"""
Validate that all tools declared by nodes are available.
@@ -257,6 +303,12 @@ class GraphExecutor:
# Initialize execution state
memory = SharedMemory()
# Continuous conversation mode state
is_continuous = getattr(graph, "conversation_mode", "isolated") == "continuous"
continuous_conversation = None # NodeConversation threaded across nodes
cumulative_tools: list = [] # Tools accumulate, never removed
cumulative_tool_names: set[str] = set()
# Initialize checkpoint store if checkpointing is enabled
checkpoint_store: CheckpointStore | None = None
if checkpoint_config and checkpoint_config.enabled and self._storage_path:
@@ -273,13 +325,20 @@ class GraphExecutor:
f"{type(memory_data).__name__}, expected dict"
)
else:
# Restore memory from previous session
# Restore memory from previous session.
# Skip validation — this data was already validated when
# originally written, and research text triggers false
# positives on the code-indicator heuristic.
for key, value in memory_data.items():
memory.write(key, value)
memory.write(key, value, validate=False)
self.logger.info(f"📥 Restored session state with {len(memory_data)} memory keys")
# Write new input data to memory (each key individually)
if input_data:
# Write new input data to memory (each key individually).
# Skip when resuming from a paused session — restored memory already
# contains all state including the original input, and re-writing
# input_data would overwrite intermediate results with stale values.
_is_resuming = bool(session_state and session_state.get("paused_at"))
if input_data and not _is_resuming:
for key, value in input_data.items():
memory.write(key, value)
@@ -530,6 +589,13 @@ class GraphExecutor:
self.logger.info(f" Inputs: {node_spec.input_keys}")
self.logger.info(f" Outputs: {node_spec.output_keys}")
# Continuous mode: accumulate tools from this node
if is_continuous and node_spec.tools:
for t in self.tools:
if t.name in node_spec.tools and t.name not in cumulative_tool_names:
cumulative_tools.append(t)
cumulative_tool_names.add(t.name)
# Build context for node
ctx = self._build_context(
node_spec=node_spec,
@@ -537,6 +603,9 @@ class GraphExecutor:
goal=goal,
input_data=input_data or {},
max_tokens=graph.max_tokens,
continuous_mode=is_continuous,
inherited_conversation=continuous_conversation if is_continuous else None,
override_tools=cumulative_tools if is_continuous else None,
)
# Log actual input data being read
@@ -763,6 +832,7 @@ class GraphExecutor:
"memory": saved_memory,
"execution_path": list(path),
"node_visit_counts": dict(node_visit_counts),
"resume_from": current_node_id,
}
return ExecutionResult(
@@ -843,6 +913,7 @@ class GraphExecutor:
# Router explicitly set next node
self.logger.info(f" → Router directing to: {result.next_node}")
current_node_id = result.next_node
self._write_progress(current_node_id, path, memory, node_visit_counts)
else:
# Get all traversable edges for fan-out detection
traversable_edges = self._get_all_traversable_edges(
@@ -886,6 +957,7 @@ class GraphExecutor:
if fan_in_node:
self.logger.info(f" ⑃ Fan-in: converging at {fan_in_node}")
current_node_id = fan_in_node
self._write_progress(current_node_id, path, memory, node_visit_counts)
else:
# No convergence point - branches are terminal
self.logger.info(" → Parallel branches completed (no convergence)")
@@ -940,6 +1012,68 @@ class GraphExecutor:
current_node_id = next_node
# Write progress snapshot at node transition
self._write_progress(current_node_id, path, memory, node_visit_counts)
# Continuous mode: thread conversation forward with transition marker
if is_continuous and result.conversation is not None:
continuous_conversation = result.conversation
# Look up the next node spec for the transition marker
next_spec = graph.get_node(current_node_id)
if next_spec and next_spec.node_type == "event_loop":
from framework.graph.prompt_composer import (
build_narrative,
build_transition_marker,
compose_system_prompt,
)
# Build Layer 2 (narrative) from current state
narrative = build_narrative(memory, path, graph)
# Compose new system prompt (Layer 1 + 2 + 3)
new_system = compose_system_prompt(
identity_prompt=getattr(graph, "identity_prompt", None),
focus_prompt=next_spec.system_prompt,
narrative=narrative,
)
continuous_conversation.update_system_prompt(new_system)
# Switch conversation store to the next node's directory
# so the transition marker and all subsequent messages are
# persisted there instead of the first node's directory.
if self._storage_path:
from framework.storage.conversation_store import (
FileConversationStore,
)
next_store_path = self._storage_path / "conversations" / next_spec.id
next_store = FileConversationStore(base_path=next_store_path)
await continuous_conversation.switch_store(next_store)
# Insert transition marker into conversation
data_dir = str(self._storage_path / "data") if self._storage_path else None
marker = build_transition_marker(
previous_node=node_spec,
next_node=next_spec,
memory=memory,
cumulative_tool_names=sorted(cumulative_tool_names),
data_dir=data_dir,
)
await continuous_conversation.add_user_message(
marker,
is_transition_marker=True,
)
# Set current phase for phase-aware compaction
continuous_conversation.set_current_phase(next_spec.id)
# Opportunistic compaction at transition
if continuous_conversation.usage_ratio() > 0.5:
await continuous_conversation.prune_old_tool_results(
protect_tokens=2000,
)
# Update input_data for next node
input_data = result.output
@@ -1082,6 +1216,7 @@ class GraphExecutor:
"memory": saved_memory,
"execution_path": list(path),
"node_visit_counts": dict(node_visit_counts),
"resume_from": current_node_id,
}
# Mark latest checkpoint for resume on failure
@@ -1134,12 +1269,19 @@ class GraphExecutor:
goal: Goal,
input_data: dict[str, Any],
max_tokens: int = 4096,
continuous_mode: bool = False,
inherited_conversation: Any = None,
override_tools: list | None = None,
) -> NodeContext:
"""Build execution context for a node."""
# Filter tools to those available to this node
available_tools = []
if node_spec.tools:
available_tools = [t for t in self.tools if t.name in node_spec.tools]
if override_tools is not None:
# Continuous mode: use cumulative tool set
available_tools = list(override_tools)
else:
available_tools = []
if node_spec.tools:
available_tools = [t for t in self.tools if t.name in node_spec.tools]
# Create scoped memory view
scoped_memory = memory.with_permissions(
@@ -1160,6 +1302,8 @@ class GraphExecutor:
max_tokens=max_tokens,
runtime_logger=self.runtime_logger,
pause_event=self._pause_requested, # Pass pause event for granular control
continuous_mode=continuous_mode,
inherited_conversation=inherited_conversation,
)
# Valid node types - no ambiguous "llm" type allowed
+17
View File
@@ -238,6 +238,16 @@ class NodeSpec(BaseModel):
description="If True, this node streams output to the end user and can request input.",
)
# Phase completion criteria for conversation-aware judge (Level 2)
success_criteria: str | None = Field(
default=None,
description=(
"Natural-language criteria for phase completion. When set, the "
"implicit judge upgrades to Level 2: after output keys are satisfied, "
"a fast LLM evaluates whether the conversation meets these criteria."
),
)
model_config = {"extra": "allow", "arbitrary_types_allowed": True}
@@ -483,6 +493,10 @@ class NodeContext:
# Pause control (optional) - asyncio.Event for pause requests
pause_event: Any = None # asyncio.Event | None
# Continuous conversation mode
continuous_mode: bool = False # True when graph has conversation_mode="continuous"
inherited_conversation: Any = None # NodeConversation | None (from prior node)
@dataclass
class NodeResult:
@@ -511,6 +525,9 @@ class NodeResult:
# Pydantic validation errors (if any)
validation_errors: list[str] = field(default_factory=list)
# Continuous conversation mode: return conversation for threading to next node
conversation: Any = None # NodeConversation | None
def to_summary(self, node_spec: Any = None) -> str:
"""
Generate a human-readable summary of this node's execution and output.
+185
View File
@@ -0,0 +1,185 @@
"""Prompt composition for continuous agent mode.
Composes the three-layer system prompt (onion model) and generates
transition markers inserted into the conversation at phase boundaries.
Layer 1 Identity (static, defined at agent level, never changes):
"You are a thorough research agent. You prefer clarity over jargon..."
Layer 2 Narrative (auto-generated from conversation/memory state):
"We've finished scoping the project. The user wants to focus on..."
Layer 3 Focus (per-node system_prompt, reframed as focus directive):
"Your current attention: synthesize findings into a report..."
"""
from __future__ import annotations
import logging
from pathlib import Path
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from framework.graph.edge import GraphSpec
from framework.graph.node import NodeSpec, SharedMemory
logger = logging.getLogger(__name__)
def compose_system_prompt(
identity_prompt: str | None,
focus_prompt: str | None,
narrative: str | None = None,
) -> str:
"""Compose the three-layer system prompt.
Args:
identity_prompt: Layer 1 static agent identity (from GraphSpec).
focus_prompt: Layer 3 per-node focus directive (from NodeSpec.system_prompt).
narrative: Layer 2 auto-generated from conversation state.
Returns:
Composed system prompt with all layers present.
"""
parts: list[str] = []
# Layer 1: Identity (always first, anchors the personality)
if identity_prompt:
parts.append(identity_prompt)
# Layer 2: Narrative (what's happened so far)
if narrative:
parts.append(f"\n--- Context (what has happened so far) ---\n{narrative}")
# Layer 3: Focus (current phase directive)
if focus_prompt:
parts.append(f"\n--- Current Focus ---\n{focus_prompt}")
return "\n".join(parts) if parts else ""
def build_narrative(
memory: SharedMemory,
execution_path: list[str],
graph: GraphSpec,
) -> str:
"""Build Layer 2 (narrative) from structured state.
Deterministic no LLM call. Reads SharedMemory and execution path
to describe what has happened so far. Cheap and fast.
Args:
memory: Current shared memory state.
execution_path: List of node IDs visited so far.
graph: Graph spec (for node names/descriptions).
Returns:
Narrative string describing the session state.
"""
parts: list[str] = []
# Describe execution path
if execution_path:
phase_descriptions: list[str] = []
for node_id in execution_path:
node_spec = graph.get_node(node_id)
if node_spec:
phase_descriptions.append(f"- {node_spec.name}: {node_spec.description}")
else:
phase_descriptions.append(f"- {node_id}")
parts.append("Phases completed:\n" + "\n".join(phase_descriptions))
# Describe key memory values (skip very long values)
all_memory = memory.read_all()
if all_memory:
memory_lines: list[str] = []
for key, value in all_memory.items():
if value is None:
continue
val_str = str(value)
if len(val_str) > 200:
val_str = val_str[:200] + "..."
memory_lines.append(f"- {key}: {val_str}")
if memory_lines:
parts.append("Current state:\n" + "\n".join(memory_lines))
return "\n\n".join(parts) if parts else ""
def build_transition_marker(
previous_node: NodeSpec,
next_node: NodeSpec,
memory: SharedMemory,
cumulative_tool_names: list[str],
data_dir: Path | str | None = None,
) -> str:
"""Build a 'State of the World' transition marker.
Inserted into the conversation as a user message at phase boundaries.
Gives the LLM full situational awareness: what happened, what's stored,
what tools are available, and what to focus on next.
Args:
previous_node: NodeSpec of the phase just completed.
next_node: NodeSpec of the phase about to start.
memory: Current shared memory state.
cumulative_tool_names: All tools available (cumulative set).
data_dir: Path to spillover data directory.
Returns:
Transition marker message text.
"""
sections: list[str] = []
# Header
sections.append(f"--- PHASE TRANSITION: {previous_node.name}{next_node.name} ---")
# What just completed
sections.append(f"\nCompleted: {previous_node.name}")
sections.append(f" {previous_node.description}")
# Outputs in memory
all_memory = memory.read_all()
if all_memory:
memory_lines: list[str] = []
for key, value in all_memory.items():
if value is None:
continue
val_str = str(value)
if len(val_str) > 300:
val_str = val_str[:300] + "..."
memory_lines.append(f" {key}: {val_str}")
if memory_lines:
sections.append("\nOutputs available:\n" + "\n".join(memory_lines))
# Files in data directory
if data_dir:
data_path = Path(data_dir)
if data_path.exists():
files = sorted(data_path.iterdir())
if files:
file_lines = [
f" {f.name} ({f.stat().st_size:,} bytes)" for f in files if f.is_file()
]
if file_lines:
sections.append(
"\nData files (use load_data to access):\n" + "\n".join(file_lines)
)
# Available tools
if cumulative_tool_names:
sections.append("\nAvailable tools: " + ", ".join(sorted(cumulative_tool_names)))
# Next phase
sections.append(f"\nNow entering: {next_node.name}")
sections.append(f" {next_node.description}")
# Reflection prompt (engineered metacognition)
sections.append(
"\nBefore proceeding, briefly reflect: what went well in the "
"previous phase? Are there any gaps or surprises worth noting?"
)
sections.append("\n--- END TRANSITION ---")
return "\n".join(sections)
+2
View File
@@ -70,6 +70,7 @@ class AnthropicProvider(LLMProvider):
max_tokens: int = 1024,
response_format: dict[str, Any] | None = None,
json_mode: bool = False,
max_retries: int | None = None,
) -> LLMResponse:
"""Generate a completion from Claude (via LiteLLM)."""
return self._provider.complete(
@@ -79,6 +80,7 @@ class AnthropicProvider(LLMProvider):
max_tokens=max_tokens,
response_format=response_format,
json_mode=json_mode,
max_retries=max_retries,
)
def complete_with_tools(
+13 -9
View File
@@ -150,10 +150,13 @@ class LiteLLMProvider(LLMProvider):
"LiteLLM is not installed. Please install it with: uv pip install litellm"
)
def _completion_with_rate_limit_retry(self, **kwargs: Any) -> Any:
def _completion_with_rate_limit_retry(
self, max_retries: int | None = None, **kwargs: Any
) -> Any:
"""Call litellm.completion with retry on 429 rate limit errors and empty responses."""
model = kwargs.get("model", self.model)
for attempt in range(RATE_LIMIT_MAX_RETRIES + 1):
retries = max_retries if max_retries is not None else RATE_LIMIT_MAX_RETRIES
for attempt in range(retries + 1):
try:
response = litellm.completion(**kwargs) # type: ignore[union-attr]
@@ -194,9 +197,9 @@ class LiteLLMProvider(LLMProvider):
f"Full request dumped to: {dump_path}"
)
if attempt == RATE_LIMIT_MAX_RETRIES:
if attempt == retries:
logger.error(
f"[retry] GAVE UP on {model} after {RATE_LIMIT_MAX_RETRIES + 1} "
f"[retry] GAVE UP on {model} after {retries + 1} "
f"attempts — empty response "
f"(finish_reason={finish_reason}, "
f"choices={len(response.choices) if response.choices else 0})"
@@ -209,7 +212,7 @@ class LiteLLMProvider(LLMProvider):
f"choices={len(response.choices) if response.choices else 0}) — "
f"likely rate limited or quota exceeded. "
f"Retrying in {wait}s "
f"(attempt {attempt + 1}/{RATE_LIMIT_MAX_RETRIES})"
f"(attempt {attempt + 1}/{retries})"
)
time.sleep(wait)
continue
@@ -225,9 +228,9 @@ class LiteLLMProvider(LLMProvider):
error_type="rate_limit",
attempt=attempt,
)
if attempt == RATE_LIMIT_MAX_RETRIES:
if attempt == retries:
logger.error(
f"[retry] GAVE UP on {model} after {RATE_LIMIT_MAX_RETRIES + 1} "
f"[retry] GAVE UP on {model} after {retries + 1} "
f"attempts — rate limit error: {e!s}. "
f"~{token_count} tokens ({token_method}). "
f"Full request dumped to: {dump_path}"
@@ -239,7 +242,7 @@ class LiteLLMProvider(LLMProvider):
f"~{token_count} tokens ({token_method}). "
f"Full request dumped to: {dump_path}. "
f"Retrying in {wait}s "
f"(attempt {attempt + 1}/{RATE_LIMIT_MAX_RETRIES})"
f"(attempt {attempt + 1}/{retries})"
)
time.sleep(wait)
# unreachable, but satisfies type checker
@@ -253,6 +256,7 @@ class LiteLLMProvider(LLMProvider):
max_tokens: int = 1024,
response_format: dict[str, Any] | None = None,
json_mode: bool = False,
max_retries: int | None = None,
) -> LLMResponse:
"""Generate a completion using LiteLLM."""
# Prepare messages with system prompt
@@ -293,7 +297,7 @@ class LiteLLMProvider(LLMProvider):
kwargs["response_format"] = response_format
# Make the call
response = self._completion_with_rate_limit_retry(**kwargs)
response = self._completion_with_rate_limit_retry(max_retries=max_retries, **kwargs)
# Extract content
content = response.choices[0].message.content or ""
+1
View File
@@ -120,6 +120,7 @@ class MockLLMProvider(LLMProvider):
max_tokens: int = 1024,
response_format: dict[str, Any] | None = None,
json_mode: bool = False,
max_retries: int | None = None,
) -> LLMResponse:
"""
Generate a mock completion without calling a real LLM.
+3
View File
@@ -65,6 +65,7 @@ class LLMProvider(ABC):
max_tokens: int = 1024,
response_format: dict[str, Any] | None = None,
json_mode: bool = False,
max_retries: int | None = None,
) -> LLMResponse:
"""
Generate a completion from the LLM.
@@ -79,6 +80,8 @@ class LLMProvider(ABC):
- {"type": "json_schema", "json_schema": {"name": "...", "schema": {...}}}
for strict JSON schema enforcement
json_mode: If True, request structured JSON output from the LLM
max_retries: Override retry count for rate-limit/empty-response retries.
None uses the provider default.
Returns:
LLMResponse with content and metadata
+2
View File
@@ -362,6 +362,7 @@ def _load_resume_state(
except (json.JSONDecodeError, OSError):
return None
return {
"resume_session_id": session_id,
"memory": cp_data.get("shared_memory", {}),
"paused_at": cp_data.get("next_node") or cp_data.get("current_node"),
"execution_path": cp_data.get("execution_path", []),
@@ -379,6 +380,7 @@ def _load_resume_state(
progress = state_data.get("progress", {})
paused_at = progress.get("paused_at") or progress.get("resume_from")
return {
"resume_session_id": session_id,
"memory": state_data.get("memory", {}),
"paused_at": paused_at,
"execution_path": progress.get("path", []),
+23 -4
View File
@@ -293,8 +293,13 @@ class ExecutionStream:
if not self._running:
raise RuntimeError(f"ExecutionStream '{self.stream_id}' is not running")
# Generate execution ID using unified session format
if self._session_store:
# When resuming, reuse the original session ID so the execution
# continues in the same session directory instead of creating a new one.
resume_session_id = session_state.get("resume_session_id") if session_state else None
if resume_session_id:
execution_id = resume_session_id
elif self._session_store:
execution_id = self._session_store.generate_session_id()
else:
# Fallback to old format if SessionStore not available (shouldn't happen)
@@ -597,10 +602,22 @@ class ExecutionStream:
entry_point=self.entry_spec.id,
)
else:
# Create initial state
from framework.schemas.session_state import SessionTimestamps
# Create initial state — when resuming, preserve the previous
# execution's progress so crashes don't lose track of state.
from framework.schemas.session_state import (
SessionProgress,
SessionTimestamps,
)
now = datetime.now().isoformat()
ss = ctx.session_state or {}
progress = SessionProgress(
current_node=ss.get("paused_at") or ss.get("resume_from"),
paused_at=ss.get("paused_at"),
resume_from=ss.get("paused_at") or ss.get("resume_from"),
path=ss.get("execution_path", []),
node_visit_counts=ss.get("node_visit_counts", {}),
)
state = SessionState(
session_id=execution_id,
stream_id=self.stream_id,
@@ -613,6 +630,8 @@ class ExecutionStream:
started_at=ctx.started_at.isoformat(),
updated_at=now,
),
progress=progress,
memory=ss.get("memory", {}),
input_data=ctx.input_data,
)
+8 -2
View File
@@ -156,8 +156,14 @@ class SessionState(BaseModel):
@computed_field
@property
def is_resumable(self) -> bool:
"""Can this session be resumed?"""
return self.status == SessionStatus.PAUSED and self.progress.resume_from is not None
"""Can this session be resumed?
A session is resumable when it stopped mid-execution (paused or
failed) and we know which node to resume from.
"""
if self.status not in (SessionStatus.PAUSED, SessionStatus.FAILED):
return False
return self.progress.resume_from is not None or self.progress.paused_at is not None
@computed_field
@property
+9 -2
View File
@@ -451,6 +451,7 @@ class ChatRepl(Vertical):
if paused_at:
# Has paused_at - resume from there
resume_session_state = {
"resume_session_id": session_id,
"paused_at": paused_at,
"memory": state.get("memory", {}),
"execution_path": progress.get("path", []),
@@ -458,8 +459,13 @@ class ChatRepl(Vertical):
}
resume_info = f"From node: [cyan]{paused_at}[/cyan]"
else:
# No paused_at - just retry with same input
resume_session_state = {}
# No paused_at - retry with same input but reuse session directory
resume_session_state = {
"resume_session_id": session_id,
"memory": state.get("memory", {}),
"execution_path": progress.get("path", []),
"node_visit_counts": progress.get("node_visit_counts", {}),
}
resume_info = "Retrying with same input"
# Display resume info
@@ -563,6 +569,7 @@ class ChatRepl(Vertical):
# Create session_state for checkpoint recovery
recover_session_state = {
"resume_session_id": session_id,
"resume_from_checkpoint": checkpoint_id,
}
+538
View File
@@ -0,0 +1,538 @@
"""Tests for the Continuous Agent architecture (conversation threading + cumulative tools).
Validates:
- conversation_mode="isolated" preserves existing behavior
- conversation_mode="continuous" threads one conversation across nodes
- Transition markers are inserted at phase boundaries
- System prompt updates at each transition (layered prompt composition)
- Tools accumulate across nodes in continuous mode
- prompt_composer functions work correctly
"""
from __future__ import annotations
from collections.abc import AsyncIterator
from typing import Any
from unittest.mock import MagicMock
import pytest
from framework.graph.conversation import NodeConversation
from framework.graph.edge import EdgeCondition, EdgeSpec, GraphSpec
from framework.graph.executor import GraphExecutor
from framework.graph.goal import Goal
from framework.graph.node import NodeResult, NodeSpec, SharedMemory
from framework.graph.prompt_composer import (
build_narrative,
build_transition_marker,
compose_system_prompt,
)
from framework.llm.provider import LLMProvider, LLMResponse, Tool
from framework.llm.stream_events import FinishEvent, TextDeltaEvent, ToolCallEvent
from framework.runtime.core import Runtime
# ---------------------------------------------------------------------------
# Mock LLM
# ---------------------------------------------------------------------------
class MockStreamingLLM(LLMProvider):
"""Mock LLM that yields pre-programmed StreamEvent sequences."""
def __init__(self, scenarios: list[list] | None = None):
self.scenarios = scenarios or []
self._call_index = 0
self.stream_calls: list[dict] = []
async def stream(
self,
messages: list[dict[str, Any]],
system: str = "",
tools: list[Tool] | None = None,
max_tokens: int = 4096,
) -> AsyncIterator:
self.stream_calls.append({"messages": messages, "system": system, "tools": tools})
if not self.scenarios:
return
events = self.scenarios[self._call_index % len(self.scenarios)]
self._call_index += 1
for event in events:
yield event
def complete(self, messages, system="", **kwargs) -> LLMResponse:
return LLMResponse(content="Summary.", model="mock", stop_reason="stop")
def complete_with_tools(self, messages, system, tools, tool_executor, **kwargs) -> LLMResponse:
return LLMResponse(content="", model="mock", stop_reason="stop")
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _set_output_scenario(key: str, value: str) -> list:
"""LLM calls set_output then finishes."""
return [
ToolCallEvent(
tool_use_id=f"call_{key}",
tool_name="set_output",
tool_input={"key": key, "value": value},
),
FinishEvent(stop_reason="tool_calls", input_tokens=10, output_tokens=5, model="mock"),
]
def _text_then_set_output(text: str, key: str, value: str) -> list:
"""LLM produces text, then calls set_output, then finishes (2 turns needed)."""
return [
TextDeltaEvent(content=text, snapshot=text),
ToolCallEvent(
tool_use_id=f"call_{key}",
tool_name="set_output",
tool_input={"key": key, "value": value},
),
FinishEvent(stop_reason="tool_calls", input_tokens=10, output_tokens=5, model="mock"),
]
def _text_finish(text: str) -> list:
"""LLM produces text and stops (triggers judge)."""
return [
TextDeltaEvent(content=text, snapshot=text),
FinishEvent(stop_reason="stop", input_tokens=10, output_tokens=5, model="mock"),
]
def _make_runtime():
rt = MagicMock(spec=Runtime)
rt.start_run = MagicMock(return_value="run_1")
rt.end_run = MagicMock()
rt.report_problem = MagicMock()
rt.decide = MagicMock(return_value="dec_1")
rt.record_outcome = MagicMock()
rt.set_node = MagicMock()
return rt
def _make_goal():
return Goal(id="g1", name="test", description="test goal")
def _make_tool(name: str) -> Tool:
return Tool(
name=name,
description=f"Tool {name}",
parameters={"type": "object", "properties": {}},
)
# ===========================================================================
# prompt_composer unit tests
# ===========================================================================
class TestComposeSystemPrompt:
def test_all_layers(self):
result = compose_system_prompt(
identity_prompt="I am a research agent.",
focus_prompt="Focus on writing the report.",
narrative="We found 5 sources on topic X.",
)
assert "I am a research agent." in result
assert "Focus on writing the report." in result
assert "We found 5 sources on topic X." in result
# Identity comes first
assert result.index("I am a research agent.") < result.index("Focus on writing")
def test_identity_only(self):
result = compose_system_prompt(identity_prompt="I am an agent.", focus_prompt=None)
assert result == "I am an agent."
def test_focus_only(self):
result = compose_system_prompt(identity_prompt=None, focus_prompt="Do the thing.")
assert "Current Focus" in result
assert "Do the thing." in result
def test_empty(self):
result = compose_system_prompt(identity_prompt=None, focus_prompt=None)
assert result == ""
class TestBuildNarrative:
def test_with_execution_path(self):
memory = SharedMemory()
memory.write("findings", "some findings")
node_a = NodeSpec(
id="a", name="Research", description="Research the topic", node_type="event_loop"
)
node_b = NodeSpec(id="b", name="Report", description="Write report", node_type="event_loop")
graph = GraphSpec(
id="g1",
goal_id="g1",
entry_node="a",
nodes=[node_a, node_b],
edges=[],
)
result = build_narrative(memory, ["a"], graph)
assert "Research" in result
assert "findings" in result
def test_empty_state(self):
memory = SharedMemory()
graph = GraphSpec(id="g1", goal_id="g1", entry_node="a", nodes=[], edges=[])
result = build_narrative(memory, [], graph)
assert result == ""
class TestBuildTransitionMarker:
def test_basic_marker(self):
prev = NodeSpec(
id="research", name="Research", description="Find sources", node_type="event_loop"
)
next_n = NodeSpec(
id="report", name="Report", description="Write report", node_type="event_loop"
)
memory = SharedMemory()
memory.write("findings", "important stuff")
marker = build_transition_marker(
previous_node=prev,
next_node=next_n,
memory=memory,
cumulative_tool_names=["web_search", "save_data"],
)
assert "PHASE TRANSITION" in marker
assert "Research" in marker
assert "Report" in marker
assert "findings" in marker
assert "web_search" in marker
assert "reflect" in marker.lower()
# ===========================================================================
# NodeConversation.update_system_prompt
# ===========================================================================
class TestUpdateSystemPrompt:
def test_update(self):
conv = NodeConversation(system_prompt="original")
assert conv.system_prompt == "original"
conv.update_system_prompt("updated")
assert conv.system_prompt == "updated"
# ===========================================================================
# Conversation threading through executor
# ===========================================================================
class TestContinuousConversation:
"""Test that conversation_mode='continuous' threads a single conversation."""
@pytest.mark.asyncio
async def test_isolated_mode_no_conversation_in_result(self):
"""In isolated mode, NodeResult.conversation should be None."""
runtime = _make_runtime()
llm = MockStreamingLLM(
scenarios=[
_set_output_scenario("result", "done"),
_text_finish("accepted"),
]
)
spec = NodeSpec(
id="n1",
name="Node1",
description="test",
node_type="event_loop",
output_keys=["result"],
)
graph = GraphSpec(
id="g1",
goal_id="g1",
entry_node="n1",
nodes=[spec],
edges=[],
conversation_mode="isolated",
)
executor = GraphExecutor(runtime=runtime, llm=llm)
result = await executor.execute(graph=graph, goal=_make_goal())
assert result.success
@pytest.mark.asyncio
async def test_continuous_threads_conversation(self):
"""In continuous mode, second node sees messages from first node."""
runtime = _make_runtime()
# Node A: set_output("brief", "the brief"), then finish (accept)
# Node B: set_output("report", "the report"), then finish (accept)
llm = MockStreamingLLM(
scenarios=[
_text_then_set_output("I'll research this.", "brief", "the brief"),
_text_finish(""), # triggers accept for node A (all keys set)
_text_then_set_output("Here's the report.", "report", "the report"),
_text_finish(""), # triggers accept for node B
]
)
node_a = NodeSpec(
id="a",
name="Intake",
description="Gather requirements",
node_type="event_loop",
output_keys=["brief"],
)
node_b = NodeSpec(
id="b",
name="Report",
description="Write report",
node_type="event_loop",
input_keys=["brief"],
output_keys=["report"],
)
graph = GraphSpec(
id="g1",
goal_id="g1",
entry_node="a",
nodes=[node_a, node_b],
edges=[EdgeSpec(id="e1", source="a", target="b", condition=EdgeCondition.ON_SUCCESS)],
terminal_nodes=["b"],
conversation_mode="continuous",
identity_prompt="You are a thorough research agent.",
)
executor = GraphExecutor(runtime=runtime, llm=llm)
result = await executor.execute(graph=graph, goal=_make_goal())
assert result.success
assert result.path == ["a", "b"]
# Verify the LLM saw the identity prompt in system messages
# The second node's system prompt should contain the identity
if len(llm.stream_calls) >= 3:
system_at_node_b = llm.stream_calls[2]["system"]
assert "thorough research agent" in system_at_node_b
@pytest.mark.asyncio
async def test_continuous_transition_marker_present(self):
"""Transition marker should appear in messages when switching nodes."""
runtime = _make_runtime()
llm = MockStreamingLLM(
scenarios=[
_text_then_set_output("Research done.", "brief", "the brief"),
_text_finish(""),
_text_then_set_output("Report done.", "report", "the report"),
_text_finish(""),
]
)
node_a = NodeSpec(
id="a",
name="Research",
description="Do research",
node_type="event_loop",
output_keys=["brief"],
)
node_b = NodeSpec(
id="b",
name="Report",
description="Write report",
node_type="event_loop",
input_keys=["brief"],
output_keys=["report"],
)
graph = GraphSpec(
id="g1",
goal_id="g1",
entry_node="a",
nodes=[node_a, node_b],
edges=[EdgeSpec(id="e1", source="a", target="b", condition=EdgeCondition.ON_SUCCESS)],
terminal_nodes=["b"],
conversation_mode="continuous",
)
executor = GraphExecutor(runtime=runtime, llm=llm)
result = await executor.execute(graph=graph, goal=_make_goal())
assert result.success
# When node B's first LLM call happens, its messages should contain
# the transition marker from the executor
if len(llm.stream_calls) >= 3:
node_b_messages = llm.stream_calls[2]["messages"]
all_content = " ".join(
m.get("content", "") for m in node_b_messages if isinstance(m.get("content"), str)
)
assert "PHASE TRANSITION" in all_content
# ===========================================================================
# Cumulative tools
# ===========================================================================
class TestCumulativeTools:
"""Test that tools accumulate in continuous mode."""
@pytest.mark.asyncio
async def test_isolated_mode_tools_scoped(self):
"""In isolated mode, each node only gets its own declared tools."""
runtime = _make_runtime()
tool_a = _make_tool("web_search")
tool_b = _make_tool("save_data")
llm = MockStreamingLLM(
scenarios=[
_text_then_set_output("Done.", "brief", "brief"),
_text_finish(""),
_text_then_set_output("Done.", "report", "report"),
_text_finish(""),
]
)
node_a = NodeSpec(
id="a",
name="Research",
description="Research",
node_type="event_loop",
output_keys=["brief"],
tools=["web_search"],
)
node_b = NodeSpec(
id="b",
name="Report",
description="Report",
node_type="event_loop",
input_keys=["brief"],
output_keys=["report"],
tools=["save_data"],
)
graph = GraphSpec(
id="g1",
goal_id="g1",
entry_node="a",
nodes=[node_a, node_b],
edges=[EdgeSpec(id="e1", source="a", target="b", condition=EdgeCondition.ON_SUCCESS)],
terminal_nodes=["b"],
conversation_mode="isolated",
)
executor = GraphExecutor(
runtime=runtime,
llm=llm,
tools=[tool_a, tool_b],
)
result = await executor.execute(graph=graph, goal=_make_goal())
assert result.success
# In isolated mode, node B should NOT have web_search
if len(llm.stream_calls) >= 3:
node_b_tools = llm.stream_calls[2].get("tools") or []
tool_names = [t.name for t in node_b_tools]
assert "save_data" in tool_names or "set_output" in tool_names
# web_search should NOT be present (only set_output + save_data)
real_tools = [n for n in tool_names if n != "set_output"]
assert "web_search" not in real_tools
@pytest.mark.asyncio
async def test_continuous_mode_tools_accumulate(self):
"""In continuous mode, node B should have both web_search and save_data."""
runtime = _make_runtime()
tool_a = _make_tool("web_search")
tool_b = _make_tool("save_data")
llm = MockStreamingLLM(
scenarios=[
_text_then_set_output("Done.", "brief", "brief"),
_text_finish(""),
_text_then_set_output("Done.", "report", "report"),
_text_finish(""),
]
)
node_a = NodeSpec(
id="a",
name="Research",
description="Research",
node_type="event_loop",
output_keys=["brief"],
tools=["web_search"],
)
node_b = NodeSpec(
id="b",
name="Report",
description="Report",
node_type="event_loop",
input_keys=["brief"],
output_keys=["report"],
tools=["save_data"],
)
graph = GraphSpec(
id="g1",
goal_id="g1",
entry_node="a",
nodes=[node_a, node_b],
edges=[EdgeSpec(id="e1", source="a", target="b", condition=EdgeCondition.ON_SUCCESS)],
terminal_nodes=["b"],
conversation_mode="continuous",
)
executor = GraphExecutor(
runtime=runtime,
llm=llm,
tools=[tool_a, tool_b],
)
result = await executor.execute(graph=graph, goal=_make_goal())
assert result.success
# In continuous mode, node B should have BOTH tools
if len(llm.stream_calls) >= 3:
node_b_tools = llm.stream_calls[2].get("tools") or []
tool_names = [t.name for t in node_b_tools]
real_tools = [n for n in tool_names if n != "set_output"]
assert "web_search" in real_tools
assert "save_data" in real_tools
# ===========================================================================
# Schema field defaults
# ===========================================================================
class TestSchemaDefaults:
def test_graphspec_defaults(self):
"""New fields should have safe defaults."""
graph = GraphSpec(
id="g1",
goal_id="g1",
entry_node="n1",
nodes=[],
edges=[],
)
assert graph.conversation_mode == "continuous"
assert graph.identity_prompt is None
def test_nodespec_defaults(self):
"""NodeSpec.success_criteria should default to None."""
spec = NodeSpec(
id="n1",
name="test",
description="test",
node_type="event_loop",
)
assert spec.success_criteria is None
def test_noderesult_defaults(self):
"""NodeResult.conversation should default to None."""
result = NodeResult(success=True)
assert result.conversation is None
+380
View File
@@ -0,0 +1,380 @@
"""Tests for Level 2 conversation-aware judge.
Validates:
- No success_criteria Level 0 only (existing behavior)
- success_criteria set, good conversation Level 2 ACCEPT
- success_criteria set, poor conversation Level 2 RETRY with feedback
- Custom explicit judge takes priority over Level 2
- Level 2 fires only when Level 0 passes (all keys set)
- _parse_verdict correctly parses LLM responses
"""
from __future__ import annotations
from collections.abc import AsyncIterator
from typing import Any
from unittest.mock import MagicMock
import pytest
from framework.graph.conversation import NodeConversation
from framework.graph.conversation_judge import (
_parse_verdict,
evaluate_phase_completion,
)
from framework.graph.edge import GraphSpec
from framework.graph.executor import GraphExecutor
from framework.graph.goal import Goal
from framework.graph.node import NodeSpec
from framework.llm.provider import LLMProvider, LLMResponse, Tool
from framework.llm.stream_events import FinishEvent, TextDeltaEvent, ToolCallEvent
from framework.runtime.core import Runtime
# ---------------------------------------------------------------------------
# Mock LLM
# ---------------------------------------------------------------------------
class MockStreamingLLM(LLMProvider):
"""Mock LLM that yields pre-programmed StreamEvent sequences."""
def __init__(self, scenarios: list[list] | None = None, complete_response: str = ""):
self.scenarios = scenarios or []
self._call_index = 0
self.stream_calls: list[dict] = []
self.complete_response = complete_response
self.complete_calls: list[dict] = []
async def stream(
self,
messages: list[dict[str, Any]],
system: str = "",
tools: list[Tool] | None = None,
max_tokens: int = 4096,
) -> AsyncIterator:
self.stream_calls.append({"messages": messages, "system": system, "tools": tools})
if not self.scenarios:
return
events = self.scenarios[self._call_index % len(self.scenarios)]
self._call_index += 1
for event in events:
yield event
def complete(self, messages, system="", **kwargs) -> LLMResponse:
self.complete_calls.append({"messages": messages, "system": system})
return LLMResponse(content=self.complete_response, model="mock", stop_reason="stop")
def complete_with_tools(self, messages, system, tools, tool_executor, **kwargs) -> LLMResponse:
return LLMResponse(content="", model="mock", stop_reason="stop")
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _set_output_scenario(key: str, value: str) -> list:
return [
ToolCallEvent(
tool_use_id=f"call_{key}",
tool_name="set_output",
tool_input={"key": key, "value": value},
),
FinishEvent(stop_reason="tool_calls", input_tokens=10, output_tokens=5, model="mock"),
]
def _text_then_set_output(text: str, key: str, value: str) -> list:
return [
TextDeltaEvent(content=text, snapshot=text),
ToolCallEvent(
tool_use_id=f"call_{key}",
tool_name="set_output",
tool_input={"key": key, "value": value},
),
FinishEvent(stop_reason="tool_calls", input_tokens=10, output_tokens=5, model="mock"),
]
def _text_finish(text: str) -> list:
return [
TextDeltaEvent(content=text, snapshot=text),
FinishEvent(stop_reason="stop", input_tokens=10, output_tokens=5, model="mock"),
]
def _make_runtime():
rt = MagicMock(spec=Runtime)
rt.start_run = MagicMock(return_value="run_1")
rt.end_run = MagicMock()
rt.report_problem = MagicMock()
rt.decide = MagicMock(return_value="dec_1")
rt.record_outcome = MagicMock()
rt.set_node = MagicMock()
return rt
def _make_goal():
return Goal(id="g1", name="test", description="test goal")
# ===========================================================================
# Unit tests for _parse_verdict
# ===========================================================================
class TestParseVerdict:
def test_accept(self):
v = _parse_verdict("ACTION: ACCEPT\nCONFIDENCE: 0.9\nFEEDBACK:")
assert v.action == "ACCEPT"
assert v.confidence == 0.9
assert v.feedback == ""
def test_retry_with_feedback(self):
v = _parse_verdict("ACTION: RETRY\nCONFIDENCE: 0.6\nFEEDBACK: Research is too shallow.")
assert v.action == "RETRY"
assert v.confidence == 0.6
assert "shallow" in v.feedback
def test_defaults_on_garbage(self):
v = _parse_verdict("some random text\nno structured output")
assert v.action == "ACCEPT" # default
assert v.confidence == 0.8 # default
def test_invalid_action_defaults_to_accept(self):
v = _parse_verdict("ACTION: ESCALATE\nCONFIDENCE: 0.5")
assert v.action == "ACCEPT" # ESCALATE not valid for Level 2
# ===========================================================================
# Unit tests for evaluate_phase_completion
# ===========================================================================
class TestEvaluatePhaseCompletion:
@pytest.mark.asyncio
async def test_accept_on_good_response(self):
"""LLM says ACCEPT → verdict is ACCEPT."""
llm = MockStreamingLLM(complete_response="ACTION: ACCEPT\nCONFIDENCE: 0.95\nFEEDBACK:")
conv = NodeConversation(system_prompt="test")
await conv.add_user_message("Do research on topic X")
await conv.add_assistant_message("I found 5 high-quality sources on X.")
verdict = await evaluate_phase_completion(
llm=llm,
conversation=conv,
phase_name="Research",
phase_description="Research the topic",
success_criteria="Find at least 3 credible sources",
accumulator_state={"findings": "5 sources found"},
)
assert verdict.action == "ACCEPT"
assert verdict.confidence == 0.95
@pytest.mark.asyncio
async def test_retry_on_poor_response(self):
"""LLM says RETRY → verdict is RETRY with feedback."""
llm = MockStreamingLLM(
complete_response=(
"ACTION: RETRY\nCONFIDENCE: 0.4\nFEEDBACK: Only found 1 source, need 3."
)
)
conv = NodeConversation(system_prompt="test")
await conv.add_user_message("Do research")
await conv.add_assistant_message("I found 1 source.")
verdict = await evaluate_phase_completion(
llm=llm,
conversation=conv,
phase_name="Research",
phase_description="Research the topic",
success_criteria="Find at least 3 credible sources",
accumulator_state={"findings": "1 source"},
)
assert verdict.action == "RETRY"
assert "1 source" in verdict.feedback
@pytest.mark.asyncio
async def test_llm_failure_defaults_to_accept(self):
"""When LLM fails, Level 2 should not block (Level 0 already passed)."""
llm = MockStreamingLLM()
# Make complete() raise an exception
llm.complete = MagicMock(side_effect=RuntimeError("LLM unavailable"))
conv = NodeConversation(system_prompt="test")
await conv.add_assistant_message("Done.")
verdict = await evaluate_phase_completion(
llm=llm,
conversation=conv,
phase_name="Test",
phase_description="Test phase",
success_criteria="Do the thing",
accumulator_state={"result": "done"},
)
assert verdict.action == "ACCEPT"
assert verdict.confidence == 0.5
# ===========================================================================
# Integration: Level 2 in EventLoopNode implicit judge
# ===========================================================================
class TestLevel2InImplicitJudge:
@pytest.mark.asyncio
async def test_no_success_criteria_level0_only(self):
"""Without success_criteria, Level 0 accepts normally (existing behavior)."""
runtime = _make_runtime()
llm = MockStreamingLLM(
scenarios=[
_set_output_scenario("result", "done"),
_text_finish("accepted"),
]
)
spec = NodeSpec(
id="n1",
name="Node1",
description="test",
node_type="event_loop",
output_keys=["result"],
# No success_criteria!
)
graph = GraphSpec(
id="g1",
goal_id="g1",
entry_node="n1",
nodes=[spec],
edges=[],
)
executor = GraphExecutor(runtime=runtime, llm=llm)
result = await executor.execute(graph=graph, goal=_make_goal())
assert result.success
# LLM.complete should NOT have been called for Level 2
assert len(llm.complete_calls) == 0
@pytest.mark.asyncio
async def test_success_criteria_accept(self):
"""With success_criteria and good work, Level 2 accepts."""
runtime = _make_runtime()
llm = MockStreamingLLM(
scenarios=[
_text_then_set_output("I did thorough research.", "result", "done"),
_text_finish(""), # triggers judge
],
complete_response="ACTION: ACCEPT\nCONFIDENCE: 0.9\nFEEDBACK:",
)
spec = NodeSpec(
id="n1",
name="Research",
description="Do research",
node_type="event_loop",
output_keys=["result"],
success_criteria="Provide thorough research with multiple sources.",
)
graph = GraphSpec(
id="g1",
goal_id="g1",
entry_node="n1",
nodes=[spec],
edges=[],
)
executor = GraphExecutor(runtime=runtime, llm=llm)
result = await executor.execute(graph=graph, goal=_make_goal())
assert result.success
# LLM.complete should have been called for Level 2
assert len(llm.complete_calls) >= 1
@pytest.mark.asyncio
async def test_success_criteria_retry_then_accept(self):
"""Level 2 rejects first attempt, LLM tries again, Level 2 accepts."""
runtime = _make_runtime()
# Track complete calls to alternate responses
complete_responses = [
"ACTION: RETRY\nCONFIDENCE: 0.4\nFEEDBACK: Need more detail.",
"ACTION: ACCEPT\nCONFIDENCE: 0.9\nFEEDBACK:",
]
call_count = [0]
class SequentialLLM(MockStreamingLLM):
def complete(self, messages, system="", **kwargs):
idx = call_count[0]
call_count[0] += 1
resp = complete_responses[idx % len(complete_responses)]
return LLMResponse(content=resp, model="mock", stop_reason="stop")
llm = SequentialLLM(
scenarios=[
# Turn 1: set output, then stop → Level 2 RETRY
_text_then_set_output("Brief research.", "result", "brief"),
_text_finish(""), # triggers judge → Level 2 RETRY
# Turn 2: after retry feedback, set output again, stop → Level 2 ACCEPT
_text_then_set_output("Much more detailed research.", "result", "detailed"),
_text_finish(""), # triggers judge → Level 2 ACCEPT
]
)
spec = NodeSpec(
id="n1",
name="Research",
description="Do research",
node_type="event_loop",
output_keys=["result"],
success_criteria="Provide thorough research with multiple sources.",
)
graph = GraphSpec(
id="g1",
goal_id="g1",
entry_node="n1",
nodes=[spec],
edges=[],
)
executor = GraphExecutor(runtime=runtime, llm=llm)
result = await executor.execute(graph=graph, goal=_make_goal())
assert result.success
# Should have had 2 complete calls (first RETRY, second ACCEPT)
assert call_count[0] >= 2
@pytest.mark.asyncio
async def test_level2_only_fires_when_level0_passes(self):
"""Level 2 should NOT fire when output keys are missing."""
runtime = _make_runtime()
llm = MockStreamingLLM(
scenarios=[
# Turn 1: just text, no set_output → Level 0 RETRY (missing keys)
_text_finish("I did some thinking."),
# Turn 2: set output → Level 0 ACCEPT, Level 2 check
_text_then_set_output("Now I have output.", "result", "done"),
_text_finish(""), # triggers judge
],
complete_response="ACTION: ACCEPT\nCONFIDENCE: 0.9\nFEEDBACK:",
)
spec = NodeSpec(
id="n1",
name="Research",
description="Do research",
node_type="event_loop",
output_keys=["result"],
success_criteria="Provide results.",
)
graph = GraphSpec(
id="g1",
goal_id="g1",
entry_node="n1",
nodes=[spec],
edges=[],
)
executor = GraphExecutor(runtime=runtime, llm=llm)
result = await executor.execute(graph=graph, goal=_make_goal())
assert result.success
# Level 2 should only fire once (when Level 0 passes)
assert len(llm.complete_calls) == 1
+280
View File
@@ -0,0 +1,280 @@
"""Tests for phase-aware compaction in continuous conversation mode.
Validates:
- Phase tags persist through storage roundtrip
- Transition markers survive compaction
- Current phase messages protected during compaction
- Older phase tool results pruned first
- Phase metadata fields have safe defaults
"""
from __future__ import annotations
import pytest
from framework.graph.conversation import Message, NodeConversation
class TestPhaseMetadata:
"""Phase metadata on Message dataclass."""
def test_defaults(self):
msg = Message(seq=0, role="user", content="hello")
assert msg.phase_id is None
assert msg.is_transition_marker is False
def test_set_phase(self):
msg = Message(seq=0, role="user", content="hello", phase_id="research")
assert msg.phase_id == "research"
def test_transition_marker(self):
msg = Message(
seq=0,
role="user",
content="PHASE TRANSITION",
is_transition_marker=True,
phase_id="report",
)
assert msg.is_transition_marker is True
assert msg.phase_id == "report"
def test_storage_roundtrip(self):
"""Phase metadata should survive to_storage_dict → from_storage_dict."""
msg = Message(
seq=5,
role="user",
content="transition",
phase_id="review",
is_transition_marker=True,
)
d = msg.to_storage_dict()
assert d["phase_id"] == "review"
assert d["is_transition_marker"] is True
restored = Message.from_storage_dict(d)
assert restored.phase_id == "review"
assert restored.is_transition_marker is True
def test_storage_roundtrip_no_phase(self):
"""Messages without phase metadata should roundtrip cleanly."""
msg = Message(seq=0, role="assistant", content="hello")
d = msg.to_storage_dict()
assert "phase_id" not in d
assert "is_transition_marker" not in d
restored = Message.from_storage_dict(d)
assert restored.phase_id is None
assert restored.is_transition_marker is False
def test_to_llm_dict_no_metadata(self):
"""Phase metadata should NOT appear in LLM-facing dicts."""
msg = Message(
seq=0,
role="user",
content="hello",
phase_id="research",
is_transition_marker=True,
)
d = msg.to_llm_dict()
assert "phase_id" not in d
assert "is_transition_marker" not in d
assert d == {"role": "user", "content": "hello"}
class TestPhaseStamping:
"""Messages are stamped with current phase."""
@pytest.mark.asyncio
async def test_messages_stamped_with_phase(self):
conv = NodeConversation(system_prompt="test")
conv.set_current_phase("research")
msg1 = await conv.add_user_message("search for X")
msg2 = await conv.add_assistant_message("Found it.")
assert msg1.phase_id == "research"
assert msg2.phase_id == "research"
@pytest.mark.asyncio
async def test_phase_changes_stamp(self):
conv = NodeConversation(system_prompt="test")
conv.set_current_phase("research")
msg1 = await conv.add_user_message("research msg")
conv.set_current_phase("report")
msg2 = await conv.add_user_message("report msg")
assert msg1.phase_id == "research"
assert msg2.phase_id == "report"
@pytest.mark.asyncio
async def test_no_phase_no_stamp(self):
conv = NodeConversation(system_prompt="test")
msg = await conv.add_user_message("no phase")
assert msg.phase_id is None
@pytest.mark.asyncio
async def test_transition_marker_flag(self):
conv = NodeConversation(system_prompt="test")
conv.set_current_phase("report")
msg = await conv.add_user_message(
"PHASE TRANSITION: Research → Report",
is_transition_marker=True,
)
assert msg.is_transition_marker is True
assert msg.phase_id == "report"
@pytest.mark.asyncio
async def test_tool_result_stamped(self):
conv = NodeConversation(system_prompt="test")
conv.set_current_phase("research")
msg = await conv.add_tool_result("call_1", "tool output here")
assert msg.phase_id == "research"
class TestPhaseAwareCompaction:
"""prune_old_tool_results protects current phase and transition markers."""
@pytest.mark.asyncio
async def test_transition_marker_survives_compaction(self):
"""Transition markers should never be pruned."""
conv = NodeConversation(system_prompt="test")
# Old phase with a big tool result
conv.set_current_phase("research")
await conv.add_assistant_message(
"calling tool",
tool_calls=[
{
"id": "call_1",
"type": "function",
"function": {"name": "search", "arguments": "{}"},
}
],
)
await conv.add_tool_result("call_1", "x" * 20000) # big tool result
# Transition marker
await conv.add_user_message(
"PHASE TRANSITION: Research → Report",
is_transition_marker=True,
)
# New phase
conv.set_current_phase("report")
await conv.add_assistant_message(
"calling another tool",
tool_calls=[
{
"id": "call_2",
"type": "function",
"function": {"name": "save", "arguments": "{}"},
}
],
)
await conv.add_tool_result("call_2", "y" * 200)
pruned = await conv.prune_old_tool_results(protect_tokens=0, min_prune_tokens=100)
assert pruned >= 1
# Transition marker should still be intact
marker_msgs = [m for m in conv.messages if m.is_transition_marker]
assert len(marker_msgs) == 1
assert "PHASE TRANSITION" in marker_msgs[0].content
@pytest.mark.asyncio
async def test_current_phase_protected(self):
"""Tool results in the current phase should not be pruned."""
conv = NodeConversation(system_prompt="test")
# Old phase
conv.set_current_phase("research")
await conv.add_assistant_message(
"tool call",
tool_calls=[
{"id": "c1", "type": "function", "function": {"name": "s", "arguments": "{}"}}
],
)
await conv.add_tool_result("c1", "old_data " * 5000)
# Current phase
conv.set_current_phase("report")
await conv.add_assistant_message(
"tool call",
tool_calls=[
{"id": "c2", "type": "function", "function": {"name": "s", "arguments": "{}"}}
],
)
await conv.add_tool_result("c2", "current_data " * 5000)
await conv.prune_old_tool_results(protect_tokens=0, min_prune_tokens=100)
# Old phase's tool result should be pruned
msgs = conv.messages
old_tool = [m for m in msgs if m.role == "tool" and m.phase_id == "research"]
assert len(old_tool) == 1
assert old_tool[0].content.startswith("[Pruned tool result")
# Current phase's tool result should be intact
current_tool = [m for m in msgs if m.role == "tool" and m.phase_id == "report"]
assert len(current_tool) == 1
assert "current_data" in current_tool[0].content
@pytest.mark.asyncio
async def test_no_phase_metadata_works_normally(self):
"""Without phase metadata, compaction works as before (no regression)."""
conv = NodeConversation(system_prompt="test")
# No phase set — messages have phase_id=None
await conv.add_assistant_message(
"tool call",
tool_calls=[
{"id": "c1", "type": "function", "function": {"name": "s", "arguments": "{}"}}
],
)
await conv.add_tool_result("c1", "data " * 5000) # ~6250 tokens
await conv.add_assistant_message(
"another tool call",
tool_calls=[
{"id": "c2", "type": "function", "function": {"name": "s", "arguments": "{}"}}
],
)
await conv.add_tool_result("c2", "more " * 100) # ~125 tokens
# protect_tokens=100: c2 (~125 tokens) fills the budget,
# c1 (~6250 tokens) becomes pruneable
pruned = await conv.prune_old_tool_results(protect_tokens=100, min_prune_tokens=100)
assert pruned >= 1
@pytest.mark.asyncio
async def test_pruned_message_preserves_phase_metadata(self):
"""Pruned messages should keep their phase_id."""
conv = NodeConversation(system_prompt="test")
conv.set_current_phase("research")
await conv.add_assistant_message(
"tool call",
tool_calls=[
{"id": "c1", "type": "function", "function": {"name": "s", "arguments": "{}"}}
],
)
await conv.add_tool_result("c1", "data " * 5000)
# Switch to new phase so research messages become pruneable
conv.set_current_phase("report")
await conv.add_assistant_message(
"recent",
tool_calls=[
{"id": "c2", "type": "function", "function": {"name": "s", "arguments": "{}"}}
],
)
await conv.add_tool_result("c2", "x" * 200)
await conv.prune_old_tool_results(protect_tokens=0, min_prune_tokens=100)
pruned_msg = [m for m in conv.messages if m.content.startswith("[Pruned")][0]
assert pruned_msg.phase_id == "research"
@@ -17,6 +17,8 @@
"terminal_nodes": [
"report"
],
"conversation_mode": "continuous",
"identity_prompt": "You are a rigorous research agent. You search for information from diverse, authoritative sources, analyze findings critically, and produce well-cited reports. You never fabricate information \u2014 every claim must trace back to a source you actually retrieved.",
"nodes": [
{
"id": "intake",
@@ -32,7 +34,8 @@
"nullable_output_keys": [],
"input_schema": {},
"output_schema": {},
"system_prompt": "You are a research intake specialist. The user wants to research a topic.\nHave a brief conversation to clarify what they need.\n\n**STEP 1 \u2014 Read and respond (text only, NO tool calls):**\n1. Read the topic provided\n2. If it's vague, ask 1-2 clarifying questions (scope, angle, depth)\n3. If it's already clear, confirm your understanding and ask the user to confirm\n\nKeep it short. Don't over-ask.\n\nAfter your message, call ask_user() to wait for the user's response.\n\n**STEP 2 \u2014 After the user confirms, call set_output:**\n- set_output(\"research_brief\", \"A clear paragraph describing exactly what to research, what questions to answer, what scope to cover, and how deep to go.\")",
"success_criteria": "The research brief is specific and actionable: it states the topic, the key questions to answer, the desired scope, and depth.",
"system_prompt": "You are a research intake specialist. The user wants to research a topic.\nHave a brief conversation to clarify what they need.\n\n**STEP 1 \u2014 Read and respond (text only, NO tool calls):**\n1. Read the topic provided\n2. If it's vague, ask 1-2 clarifying questions (scope, angle, depth)\n3. If it's already clear, confirm your understanding and ask the user to confirm\n\nKeep it short. Don't over-ask.\n\n**STEP 2 \u2014 After the user confirms, call set_output:**\n- set_output(\"research_brief\", \"A clear paragraph describing exactly what to research, what questions to answer, what scope to cover, and how deep to go.\")",
"tools": [],
"model": null,
"function": null,
@@ -63,7 +66,8 @@
],
"input_schema": {},
"output_schema": {},
"system_prompt": "You are a research agent. Given a research brief, find and analyze sources.\n\nIf feedback is provided, this is a follow-up round \u2014 focus on the gaps identified.\n\nWork in phases:\n1. **Search**: Use web_search with 3-5 diverse queries covering different angles.\n Prioritize authoritative sources (.edu, .gov, established publications).\n2. **Fetch**: Use web_scrape on the most promising URLs (aim for 5-8 sources).\n Skip URLs that fail. Extract the substantive content.\n3. **Analyze**: Review what you've collected. Identify key findings, themes,\n and any contradictions between sources.\n\nImportant:\n- Work in batches of 3-4 tool calls at a time to manage context\n- After each batch, assess whether you have enough material\n- Prefer quality over quantity \u2014 5 good sources beat 15 thin ones\n- Track which URL each finding comes from (you'll need citations later)\n\nWhen done, use set_output:\n- set_output(\"findings\", \"Structured summary: key findings with source URLs for each claim. Include themes, contradictions, and confidence levels.\")\n- set_output(\"sources\", [{\"url\": \"...\", \"title\": \"...\", \"summary\": \"...\"}])\n- set_output(\"gaps\", \"What aspects of the research brief are NOT well-covered yet, if any.\")",
"success_criteria": "Findings reference at least 3 distinct sources with URLs. Key claims are substantiated by fetched content, not generated.",
"system_prompt": "You are a research agent. Given a research brief, find and analyze sources.\n\nIf feedback is provided, this is a follow-up round \u2014 focus on the gaps identified.\n\nWork in phases:\n1. **Search**: Use web_search with 3-5 diverse queries covering different angles.\n Prioritize authoritative sources (.edu, .gov, established publications).\n2. **Fetch**: Use web_scrape on the most promising URLs (aim for 5-8 sources).\n Skip URLs that fail. Extract the substantive content.\n3. **Analyze**: Review what you've collected. Identify key findings, themes,\n and any contradictions between sources.\n\nImportant:\n- Work in batches of 3-4 tool calls at a time \u2014 never more than 10 per turn\n- After each batch, assess whether you have enough material\n- Prefer quality over quantity \u2014 5 good sources beat 15 thin ones\n- Track which URL each finding comes from (you'll need citations later)\n- Call set_output for each key in a SEPARATE turn (not in the same turn as other tool calls)\n\nWhen done, use set_output (one key at a time, separate turns):\n- set_output(\"findings\", \"Structured summary: key findings with source URLs for each claim. Include themes, contradictions, and confidence levels.\")\n- set_output(\"sources\", [{\"url\": \"...\", \"title\": \"...\", \"summary\": \"...\"}])\n- set_output(\"gaps\", \"What aspects of the research brief are NOT well-covered yet, if any.\")",
"tools": [
"web_search",
"web_scrape",
@@ -99,7 +103,8 @@
"nullable_output_keys": [],
"input_schema": {},
"output_schema": {},
"system_prompt": "Present the research findings to the user clearly and concisely.\n\n**STEP 1 \u2014 Present (your first message, text only, NO tool calls):**\n1. **Summary** (2-3 sentences of what was found)\n2. **Key Findings** (bulleted, with confidence levels)\n3. **Sources Used** (count and quality assessment)\n4. **Gaps** (what's still unclear or under-covered)\n\nEnd by asking: Are they satisfied, or do they want deeper research? Should we proceed to writing the final report?\n\nAfter your presentation, call ask_user() to wait for the user's response.\n\n**STEP 2 \u2014 After the user responds, call set_output:**\n- set_output(\"needs_more_research\", \"true\") \u2014 if they want more\n- set_output(\"needs_more_research\", \"false\") \u2014 if they're satisfied\n- set_output(\"feedback\", \"What the user wants explored further, or empty string\")",
"success_criteria": "The user has been presented with findings and has explicitly indicated whether they want more research or are ready for the report.",
"system_prompt": "Present the research findings to the user clearly and concisely.\n\n**STEP 1 \u2014 Present (your first message, text only, NO tool calls):**\n1. **Summary** (2-3 sentences of what was found)\n2. **Key Findings** (bulleted, with confidence levels)\n3. **Sources Used** (count and quality assessment)\n4. **Gaps** (what's still unclear or under-covered)\n\nEnd by asking: Are they satisfied, or do they want deeper research? Should we proceed to writing the final report?\n\n**STEP 2 \u2014 After the user responds, call set_output:**\n- set_output(\"needs_more_research\", \"true\") \u2014 if they want more\n- set_output(\"needs_more_research\", \"false\") \u2014 if they're satisfied\n- set_output(\"feedback\", \"What the user wants explored further, or empty string\")",
"tools": [],
"model": null,
"function": null,
@@ -127,7 +132,8 @@
"nullable_output_keys": [],
"input_schema": {},
"output_schema": {},
"system_prompt": "Write a comprehensive research report as an HTML file and present it to the user.\n\n**STEP 1 \u2014 Write the HTML report (tool calls, NO text to user yet):**\n\n1. Compose a complete, self-contained HTML document with embedded CSS styling.\n Use a clean, readable design: max-width container, pleasant typography,\n numbered citation links, a table of contents, and a references section.\n\n Report structure inside the HTML:\n - Title & date\n - Executive Summary (2-3 paragraphs)\n - Table of Contents\n - Findings (organized by theme, with [n] citation links)\n - Analysis (synthesis, implications, areas of debate)\n - Conclusion (key takeaways, confidence assessment)\n - References (numbered list with clickable URLs)\n\n Requirements:\n - Every factual claim must cite its source with [n] notation\n - Be objective \u2014 present multiple viewpoints where sources disagree\n - Distinguish well-supported conclusions from speculation\n - Answer the original research questions from the brief\n\n2. Save the HTML file:\n save_data(filename=\"report.html\", data=<your_html>)\n\n3. Get the clickable link:\n serve_file_to_user(filename=\"report.html\", label=\"Research Report\")\n\n**STEP 2 \u2014 Present the link to the user (text only, NO tool calls):**\n\nTell the user the report is ready and include the file:// URI from\nserve_file_to_user so they can click it to open. Give a brief summary\nof what the report covers. Ask if they have questions.\n\nAfter presenting the link, call ask_user() to wait for the user's response.\n\n**STEP 3 \u2014 After the user responds:**\n- Answer follow-up questions from the research material\n- Call ask_user() again if they might have more questions\n- When the user is satisfied: set_output(\"delivery_status\", \"completed\")",
"success_criteria": "An HTML report has been saved, the file link has been presented to the user, and the user has acknowledged receipt.",
"system_prompt": "Write a research report as an HTML file and present it to the user.\n\nIMPORTANT: save_data requires TWO separate arguments: filename and data.\nCall it like: save_data(filename=\"report.html\", data=\"<html>...</html>\")\nDo NOT use _raw, do NOT nest arguments inside a JSON string.\n\n**STEP 1 \u2014 Write and save the HTML report (tool calls, NO text to user yet):**\n\nBuild a clean HTML document. Keep the HTML concise \u2014 aim for clarity over length.\nUse minimal embedded CSS (a few lines of style, not a full framework).\n\nReport structure:\n- Title & date\n- Executive Summary (2-3 paragraphs)\n- Key Findings (organized by theme, with [n] citation links)\n- Analysis (synthesis, implications)\n- Conclusion (key takeaways)\n- References (numbered list with clickable URLs)\n\nRequirements:\n- Every factual claim must cite its source with [n] notation\n- Be objective \u2014 present multiple viewpoints where sources disagree\n- Answer the original research questions from the brief\n\nSave the HTML:\n save_data(filename=\"report.html\", data=\"<html>...</html>\")\n\nThen get the clickable link:\n serve_file_to_user(filename=\"report.html\", label=\"Research Report\")\n\nIf save_data fails, simplify and shorten the HTML, then retry.\n\n**STEP 2 \u2014 Present the link to the user (text only, NO tool calls):**\n\nTell the user the report is ready and include the file:// URI from\nserve_file_to_user so they can click it to open. Give a brief summary\nof what the report covers. Ask if they have questions.\n\n**STEP 3 \u2014 After the user responds:**\n- Answer follow-up questions from the research material\n- When the user is satisfied: set_output(\"delivery_status\", \"completed\")",
"tools": [
"save_data",
"serve_file_to_user",
+90 -52
View File
@@ -1,12 +1,15 @@
"""Agent graph construction for Deep Research Agent."""
from pathlib import Path
from framework.graph import EdgeSpec, EdgeCondition, Goal, SuccessCriterion, Constraint
from framework.graph.edge import GraphSpec
from framework.graph.executor import ExecutionResult, GraphExecutor
from framework.runtime.event_bus import EventBus
from framework.runtime.core import Runtime
from framework.graph.executor import ExecutionResult
from framework.graph.checkpoint_config import CheckpointConfig
from framework.llm import LiteLLMProvider
from framework.runner.tool_registry import ToolRegistry
from framework.runtime.agent_runtime import AgentRuntime, create_agent_runtime
from framework.runtime.execution_stream import EntryPointSpec
from .config import default_config, metadata
from .nodes import (
@@ -102,22 +105,40 @@ edges = [
condition=EdgeCondition.ON_SUCCESS,
priority=1,
),
# review -> research (feedback loop, checked first)
# review -> research (feedback loop)
EdgeSpec(
id="review-to-research-feedback",
source="review",
target="research",
condition=EdgeCondition.CONDITIONAL,
condition_expr="str(needs_more_research).lower() == 'true'",
priority=2,
condition_expr="needs_more_research == True",
priority=1,
),
# review -> report (complementary condition — proceed to report when no more research needed)
# review -> report (user satisfied)
EdgeSpec(
id="review-to-report",
source="review",
target="report",
condition=EdgeCondition.CONDITIONAL,
condition_expr="str(needs_more_research).lower() != 'true'",
condition_expr="needs_more_research == False",
priority=2,
),
# report -> research (user wants deeper research on current topic)
EdgeSpec(
id="report-to-research",
source="report",
target="research",
condition=EdgeCondition.CONDITIONAL,
condition_expr="str(next_action).lower() == 'more_research'",
priority=2,
),
# report -> intake (user wants a new topic — default when not more_research)
EdgeSpec(
id="report-to-intake",
source="report",
target="intake",
condition=EdgeCondition.CONDITIONAL,
condition_expr="str(next_action).lower() != 'more_research'",
priority=1,
),
]
@@ -126,7 +147,7 @@ edges = [
entry_node = "intake"
entry_points = {"start": "intake"}
pause_nodes = []
terminal_nodes = ["report"]
terminal_nodes = []
class DeepResearchAgent:
@@ -136,6 +157,12 @@ class DeepResearchAgent:
Flow: intake -> research -> review -> report
^ |
+-- feedback loop (if user wants more)
Uses AgentRuntime for proper session management:
- Session-scoped storage (sessions/{session_id}/)
- Checkpointing for resume capability
- Runtime logging
- Data folder for save_data/load_data
"""
def __init__(self, config=None):
@@ -147,10 +174,10 @@ class DeepResearchAgent:
self.entry_points = entry_points
self.pause_nodes = pause_nodes
self.terminal_nodes = terminal_nodes
self._executor: GraphExecutor | None = None
self._graph: GraphSpec | None = None
self._event_bus: EventBus | None = None
self._agent_runtime: AgentRuntime | None = None
self._tool_registry: ToolRegistry | None = None
self._storage_path: Path | None = None
def _build_graph(self) -> GraphSpec:
"""Build the GraphSpec."""
@@ -177,84 +204,95 @@ class DeepResearchAgent:
"""Set up the executor with all components."""
from pathlib import Path
from framework.credentials.validation import (
ensure_credential_key_env,
validate_agent_credentials,
)
ensure_credential_key_env()
validate_agent_credentials(self.nodes)
storage_path = Path.home() / ".hive" / "agents" / "deep_research_agent"
storage_path.mkdir(parents=True, exist_ok=True)
self._event_bus = EventBus()
self._tool_registry = ToolRegistry()
mcp_config_path = Path(__file__).parent / "mcp_servers.json"
if mcp_config_path.exists():
self._tool_registry.load_mcp_config(mcp_config_path)
llm = LiteLLMProvider(
model=self.config.model,
api_key=self.config.api_key,
api_base=self.config.api_base,
)
llm = None
if not mock_mode:
llm = LiteLLMProvider(
model=self.config.model,
api_key=self.config.api_key,
api_base=self.config.api_base,
)
tool_executor = self._tool_registry.get_executor()
tools = list(self._tool_registry.get_tools().values())
self._graph = self._build_graph()
runtime = Runtime(storage_path)
self._executor = GraphExecutor(
runtime=runtime,
checkpoint_config = CheckpointConfig(
enabled=True,
checkpoint_on_node_start=False,
checkpoint_on_node_complete=True,
checkpoint_max_age_days=7,
async_checkpoint=True,
)
entry_point_specs = [
EntryPointSpec(
id="default",
name="Default",
entry_node=self.entry_node,
trigger_type="manual",
isolation_level="shared",
)
]
self._agent_runtime = create_agent_runtime(
graph=self._graph,
goal=self.goal,
storage_path=self._storage_path,
entry_points=entry_point_specs,
llm=llm,
tools=tools,
tool_executor=tool_executor,
event_bus=self._event_bus,
storage_path=storage_path,
loop_config=self._graph.loop_config,
checkpoint_config=checkpoint_config,
)
return self._executor
async def start(self) -> None:
"""Set up the agent (initialize executor and tools)."""
if self._executor is None:
self._setup()
async def start(self, mock_mode=False) -> None:
"""Set up and start the agent runtime."""
if self._agent_runtime is None:
self._setup(mock_mode=mock_mode)
if not self._agent_runtime.is_running:
await self._agent_runtime.start()
async def stop(self) -> None:
"""Clean up resources."""
self._executor = None
self._event_bus = None
"""Stop the agent runtime and clean up."""
if self._agent_runtime and self._agent_runtime.is_running:
await self._agent_runtime.stop()
self._agent_runtime = None
async def trigger_and_wait(
self,
entry_point: str,
input_data: dict,
entry_point: str = "default",
input_data: dict | None = None,
timeout: float | None = None,
session_state: dict | None = None,
) -> ExecutionResult | None:
"""Execute the graph and wait for completion."""
if self._executor is None:
if self._agent_runtime is None:
raise RuntimeError("Agent not started. Call start() first.")
if self._graph is None:
raise RuntimeError("Graph not built. Call start() first.")
return await self._executor.execute(
graph=self._graph,
goal=self.goal,
input_data=input_data,
return await self._agent_runtime.trigger_and_wait(
entry_point_id=entry_point,
input_data=input_data or {},
session_state=session_state,
)
async def run(self, context: dict, session_state=None) -> ExecutionResult:
async def run(
self, context: dict, mock_mode=False, session_state=None
) -> ExecutionResult:
"""Run the agent (convenience method for single execution)."""
await self.start()
await self.start(mock_mode=mock_mode)
try:
result = await self.trigger_and_wait(
"start", context, session_state=session_state
"default", context, session_state=session_state
)
return result or ExecutionResult(success=False, error="Execution timeout")
finally:
@@ -10,8 +10,13 @@ intake_node = NodeSpec(
description="Discuss the research topic with the user, clarify scope, and confirm direction",
node_type="event_loop",
client_facing=True,
max_node_visits=0,
input_keys=["topic"],
output_keys=["research_brief"],
success_criteria=(
"The research brief is specific and actionable: it states the topic, "
"the key questions to answer, the desired scope, and depth."
),
system_prompt="""\
You are a research intake specialist. The user wants to research a topic.
Have a brief conversation to clarify what they need.
@@ -23,8 +28,6 @@ Have a brief conversation to clarify what they need.
Keep it short. Don't over-ask.
After your message, call ask_user() to wait for the user's response.
**STEP 2 After the user confirms, call set_output:**
- set_output("research_brief", "A clear paragraph describing exactly what to research, \
what questions to answer, what scope to cover, and how deep to go.")
@@ -40,10 +43,14 @@ research_node = NodeSpec(
name="Research",
description="Search the web, fetch source content, and compile findings",
node_type="event_loop",
max_node_visits=3,
max_node_visits=0,
input_keys=["research_brief", "feedback"],
output_keys=["findings", "sources", "gaps"],
nullable_output_keys=["feedback"],
success_criteria=(
"Findings reference at least 3 distinct sources with URLs. "
"Key claims are substantiated by fetched content, not generated."
),
system_prompt="""\
You are a research agent. Given a research brief, find and analyze sources.
@@ -58,18 +65,19 @@ Work in phases:
and any contradictions between sources.
Important:
- Work in batches of 3-4 tool calls at a time to manage context
- Work in batches of 3-4 tool calls at a time never more than 10 per turn
- After each batch, assess whether you have enough material
- Prefer quality over quantity 5 good sources beat 15 thin ones
- Track which URL each finding comes from (you'll need citations later)
- Call set_output for each key in a SEPARATE turn (not in the same turn as other tool calls)
When done, use set_output:
When done, use set_output (one key at a time, separate turns):
- set_output("findings", "Structured summary: key findings with source URLs for each claim. \
Include themes, contradictions, and confidence levels.")
- set_output("sources", [{"url": "...", "title": "...", "summary": "..."}])
- set_output("gaps", "What aspects of the research brief are NOT well-covered yet, if any.")
""",
tools=["web_search", "web_scrape", "load_data", "save_data", "list_data_files"],
tools=["web_search", "web_scrape", "load_data", "save_data", "append_data", "list_data_files"],
)
# Node 3: Review (client-facing)
@@ -80,9 +88,13 @@ review_node = NodeSpec(
description="Present findings to user and decide whether to research more or write the report",
node_type="event_loop",
client_facing=True,
max_node_visits=3,
max_node_visits=0,
input_keys=["findings", "sources", "gaps", "research_brief"],
output_keys=["needs_more_research", "feedback"],
success_criteria=(
"The user has been presented with findings and has explicitly indicated "
"whether they want more research or are ready for the report."
),
system_prompt="""\
Present the research findings to the user clearly and concisely.
@@ -95,8 +107,6 @@ Present the research findings to the user clearly and concisely.
End by asking: Are they satisfied, or do they want deeper research? \
Should we proceed to writing the final report?
After your presentation, call ask_user() to wait for the user's response.
**STEP 2 After the user responds, call set_output:**
- set_output("needs_more_research", "true") if they want more
- set_output("needs_more_research", "false") if they're satisfied
@@ -113,52 +123,66 @@ report_node = NodeSpec(
description="Write a cited HTML report from the findings and present it to the user",
node_type="event_loop",
client_facing=True,
max_node_visits=0,
input_keys=["findings", "sources", "research_brief"],
output_keys=["delivery_status"],
output_keys=["delivery_status", "next_action"],
success_criteria=(
"An HTML report has been saved, the file link has been presented to the user, "
"and the user has indicated what they want to do next."
),
system_prompt="""\
Write a comprehensive research report as an HTML file and present it to the user.
Write a research report as an HTML file and present it to the user.
**STEP 1 Write the HTML report (tool calls, NO text to user yet):**
IMPORTANT: save_data requires TWO separate arguments: filename and data.
Call it like: save_data(filename="report.html", data="<html>...</html>")
Do NOT use _raw, do NOT nest arguments inside a JSON string.
1. Compose a complete, self-contained HTML document with embedded CSS styling.
Use a clean, readable design: max-width container, pleasant typography,
numbered citation links, a table of contents, and a references section.
**STEP 1 Write and save the HTML report (tool calls, NO text to user yet):**
Report structure inside the HTML:
- Title & date
- Executive Summary (2-3 paragraphs)
- Table of Contents
- Findings (organized by theme, with [n] citation links)
- Analysis (synthesis, implications, areas of debate)
- Conclusion (key takeaways, confidence assessment)
- References (numbered list with clickable URLs)
Build a clean HTML document. Keep the HTML concise aim for clarity over length.
Use minimal embedded CSS (a few lines of style, not a full framework).
Requirements:
- Every factual claim must cite its source with [n] notation
- Be objective present multiple viewpoints where sources disagree
- Distinguish well-supported conclusions from speculation
- Answer the original research questions from the brief
Report structure:
- Title & date
- Executive Summary (2-3 paragraphs)
- Key Findings (organized by theme, with [n] citation links)
- Analysis (synthesis, implications)
- Conclusion (key takeaways)
- References (numbered list with clickable URLs)
2. Save the HTML file:
save_data(filename="report.html", data=<your_html>)
Requirements:
- Every factual claim must cite its source with [n] notation
- Be objective present multiple viewpoints where sources disagree
- Answer the original research questions from the brief
3. Get the clickable link:
serve_file_to_user(filename="report.html", label="Research Report")
Save the HTML:
save_data(filename="report.html", data="<html>...</html>")
Then get the clickable link:
serve_file_to_user(filename="report.html", label="Research Report")
If save_data fails, simplify and shorten the HTML, then retry.
**STEP 2 Present the link to the user (text only, NO tool calls):**
Tell the user the report is ready and include the file:// URI from
serve_file_to_user so they can click it to open. Give a brief summary
of what the report covers. Ask if they have questions.
After presenting the link, call ask_user() to wait for the user's response.
of what the report covers. Ask if they have questions or want to continue.
**STEP 3 After the user responds:**
- Answer follow-up questions from the research material
- Call ask_user() again if they might have more questions
- When the user is satisfied: set_output("delivery_status", "completed")
- Answer any follow-up questions from the research material
- When the user is ready to move on, ask what they'd like to do next:
- Research a new topic?
- Dig deeper into the current topic?
- Then call set_output:
- set_output("delivery_status", "completed")
- set_output("next_action", "new_topic") if they want a new topic
- set_output("next_action", "more_research") if they want deeper research
""",
tools=["save_data", "serve_file_to_user", "load_data", "list_data_files"],
tools=[
"save_data", "append_data", "edit_data",
"serve_file_to_user", "load_data", "list_data_files",
],
)
__all__ = [
+2
View File
@@ -112,6 +112,8 @@ def register_all_tools(
"execute_command_tool",
"load_data",
"save_data",
"append_data",
"edit_data",
"list_data_files",
"serve_file_to_user",
"csv_read",
@@ -237,3 +237,115 @@ def register_tools(mcp: FastMCP) -> None:
return {"files": files}
except Exception as e:
return {"error": f"Failed to list data files: {str(e)}"}
@mcp.tool()
def append_data(filename: str, data: str, data_dir: str) -> dict:
"""
Purpose
Append data to the end of an existing file, or create it if it
doesn't exist yet.
When to use
Build large files incrementally instead of writing everything in
one save_data call. For example, write an HTML skeleton first,
then append each section separately to stay within token limits.
Rules & Constraints
filename must be a simple name like 'report.html' no paths or '..'
Args:
filename: Simple filename to append to. No paths or '..'.
data: The string data to append.
data_dir: Absolute path to the data directory.
Returns:
Dict with success status, new total size, and bytes appended
"""
if not filename or ".." in filename or "/" in filename or "\\" in filename:
return {"error": "Invalid filename. Use simple names like 'report.html'"}
if not data_dir:
return {"error": "data_dir is required"}
try:
dir_path = Path(data_dir)
dir_path.mkdir(parents=True, exist_ok=True)
path = dir_path / filename
with open(path, "a", encoding="utf-8") as f:
f.write(data)
appended_bytes = len(data.encode("utf-8"))
total_bytes = path.stat().st_size
return {
"success": True,
"filename": filename,
"size_bytes": total_bytes,
"appended_bytes": appended_bytes,
}
except Exception as e:
return {"error": f"Failed to append data: {str(e)}"}
@mcp.tool()
def edit_data(filename: str, old_text: str, new_text: str, data_dir: str) -> dict:
"""
Purpose
Find and replace a specific text segment in an existing file.
Works like a surgical diff only the matched portion changes.
When to use
Update a section of a previously saved file without rewriting
the entire content. For example, replace a placeholder in an
HTML report or fix a specific paragraph.
Rules & Constraints
old_text must appear exactly once in the file. If it appears
zero times or more than once, the edit is rejected with an
error message.
Args:
filename: The file to edit. Must exist in data_dir.
old_text: The exact text to find (must match exactly once).
new_text: The replacement text.
data_dir: Absolute path to the data directory.
Returns:
Dict with success status and updated file size
"""
if not filename or ".." in filename or "/" in filename or "\\" in filename:
return {"error": "Invalid filename. Use simple names like 'report.html'"}
if not data_dir:
return {"error": "data_dir is required"}
try:
path = Path(data_dir) / filename
if not path.exists():
return {"error": f"File not found: {filename}"}
content = path.read_text(encoding="utf-8")
count = content.count(old_text)
if count == 0:
return {
"error": (
"old_text not found in the file. "
"Make sure you're matching the exact text, "
"including whitespace and newlines."
)
}
if count > 1:
return {
"error": (
f"old_text found {count} times — it must be unique. "
"Include more surrounding context to match exactly once."
)
}
updated = content.replace(old_text, new_text, 1)
path.write_text(updated, encoding="utf-8")
return {
"success": True,
"filename": filename,
"size_bytes": len(updated.encode("utf-8")),
"replacements": 1,
}
except Exception as e:
return {"error": f"Failed to edit data: {str(e)}"}