Compare commits

...

49 Commits

Author SHA1 Message Date
RichardTang-Aden 988922304f Merge pull request #4451 from Ttian18/fix/tina-tui-copy-newline-4423
Release / Create Release (push) Waiting to run
fix(tui): add multiline input and cross-platform clipboard support
2026-02-12 20:27:32 -08:00
RichardTang-Aden e26468aa19 Merge pull request #4587 from Antiarin/feat/codex-integration
feat(codex): add project setup, quickstart bootstrap, and docs for Codex agent support
2026-02-12 17:55:47 -08:00
Richard Tang fe14992696 docs: update documentation 2026-02-12 17:54:08 -08:00
Richard Tang d0775b95c6 fix: remove unused agents.md and quickstart clause 2026-02-12 17:48:31 -08:00
Richard Tang 96121b5757 fix: correct codex setups 2026-02-12 17:35:15 -08:00
Timothy @aden 11c003c48d Merge pull request #4636 from TimothyZhang7/feature/memory-inheritance
Feature: Conversation Memory & Continuous Agent Session
2026-02-12 13:52:19 -08:00
Timothy fbe72c58ae chore: fix ci tests 2026-02-12 13:49:34 -08:00
Timothy 83d7f56728 chore: update agent skills for new design philosophy 2026-02-12 12:10:37 -08:00
Timothy 76deba2a6a feat: consistent memory system 2026-02-12 11:41:22 -08:00
Antiarin d9d048b9e3 docs: update quickstart script for symlink handling and add Codex CLI documentation 2026-02-12 23:34:17 +05:30
Antiarin f9d5c7c751 fix(codex): patch prompt injection, checkout version, and style in codex-issue-triage workflow 2026-02-12 20:25:19 +05:30
Antiarin 8958fb2d88 eat(codex): add project setup, quickstart bootstrap, and docs for Codex agent support 2026-02-12 18:32:58 +05:30
Zhang 3c51f2ac36 fix(tui): address code review feedback on TextArea migration
- Subclass TextArea as ChatTextArea to intercept Enter key before
  the base class swallows it (fixes submission not triggering)
- Remove event.shift access that raises AttributeError on Key events
- Make action_show_sessions directly call _submit_input instead of
  just placing text in the widget
2026-02-11 23:45:30 -08:00
RichardTang-Aden 170a0918f7 Merge pull request #4538 from jaathavan18/docs/issue-4493-remove-changelog-references-v2
docs: remove references to nonexistent CHANGELOG.md
2026-02-11 19:15:00 -08:00
jaathavan18 e3da3b619c docs: remove references to nonexistent CHANGELOG.md (#4493) 2026-02-11 21:48:28 -05:00
RichardTang-Aden 6e32513b79 Merge pull request #4535 from RichardTang-Aden/main
docs: add the automonous agent guide
2026-02-11 17:11:02 -08:00
Richard Tang 520e1963ee docs: add the automonous agent guide 2026-02-11 17:10:21 -08:00
RichardTang-Aden 843b9b55e2 Merge pull request #2862 from himanshu748/feat/antigravity-ide-2571
feat: Antigravity IDE support for MCP servers and skills
2026-02-11 16:59:15 -08:00
Richard Tang ccd305ff96 fix: remove incorrect rules folder 2026-02-11 16:58:02 -08:00
Richard Tang 3bd0d1e48c feat: add Antigravity workflows for all hive skills 2026-02-11 16:56:15 -08:00
Richard Tang d9bfa8e675 docs: rename .antigravity to .agent for Antigravity IDE compatibility 2026-02-11 16:42:26 -08:00
Richard Tang 27746147e2 fix: fix the antigravity config folder 2026-02-11 16:41:21 -08:00
Richard Tang 3a0b642980 fix: update the antigravity config 2026-02-11 16:39:28 -08:00
Richard Tang 8c0241f087 fix: use --directory instead of cwd for Antigravity MCP compatibility 2026-02-11 16:36:12 -08:00
Richard Tang 958d016174 fix: use uv run for MCP servers, script reads from template 2026-02-11 16:03:32 -08:00
Richard Tang 913d318ada docs: emphasize restarting Antigravity after MCP setup, fix config path and skill names 2026-02-11 15:57:11 -08:00
Richard Tang 8212920cb7 fix: correct .antigravity/skills symlinks to point to hive-* directories 2026-02-11 15:52:27 -08:00
Richard Tang 6414be7bd4 fix: change the wrong antigravity mcp path 2026-02-11 15:47:54 -08:00
himanshu748 ac62a82d08 docs: clarify Antigravity setup for everyone
- Define repo root at top; lead with quick start (3 steps)
- Add 'What you get' and prerequisites in one place
- Full setup step-by-step; troubleshooting: problem → fix
- Manual MCP config as single section; verification optional
- Plain language, scannable structure, no duplicate sections
2026-02-11 15:45:56 -08:00
himanshu748 a670548a57 Antigravity: one-command setup script and clearer docs
- Add scripts/setup-antigravity-mcp.sh to auto-detect repo root and write
  ~/.gemini/mcp.json with absolute paths (no manual path editing)
- Lead docs with Quick start (3 steps) and note ./ vs / for the script
- README: point to one-command setup; clarify script runs from repo folder
2026-02-11 15:45:56 -08:00
himanshu748 c4a7463f9d docs(antigravity): global config, absolute cwd, cwd warning note
- Add step to run core/setup_mcp.sh first
- Document that IDE often loads ~/.claude/mcp.json, not project config
- Add Option A: copy to ~/.claude/mcp.json with absolute cwd paths
- Note that cwd schema warning in IDE is a false positive
- Renumber setup steps (1–5)
2026-02-11 15:45:30 -08:00
himanshu748 edf0ac5270 docs: add how-to-verify section for Antigravity setup 2026-02-11 15:45:30 -08:00
himanshu748 8ff6b76f37 feat: Antigravity IDE support for MCP servers and skills (#2571)
- Add .antigravity/mcp_config.json with agent-builder and tools MCP servers
- Add .antigravity/skills/ with symlinks to .claude/skills/ (5 skills)
- Add docs/antigravity-setup.md with setup and troubleshooting
- Update README.md with Antigravity IDE support section
- Update DEVELOPER.md and docs/contributing-lint-setup.md with Antigravity refs

Mirrors Cursor integration for consistent multi-IDE support.
2026-02-11 15:45:30 -08:00
Bryan @ Aden c9f9eb365c Merge pull request #4441 from saurabh007007/feature/docs/spelling
docs:fix typo in docs for the directory in environment steup
2026-02-11 23:37:44 +00:00
Bryan @ Aden 9a2a11055f Merge pull request #4520 from jaathavan18/docs/issue-4495-remove-config-yaml-reference
docs: remove reference to nonexistent config.yaml
2026-02-11 22:36:16 +00:00
jaathavan18 4aef73c1d7 docs: remove reference to nonexistent config.yaml (#4495) 2026-02-11 15:07:04 -05:00
RichardTang-Aden b71b4b0fc2 Merge pull request #4500 from RichardTang-Aden/main
fix: remove unused tools from root .mcp.json
2026-02-11 10:23:45 -08:00
Richard Tang 1bd2510c52 fix: remove unused tools from root .mcp.json 2026-02-11 10:20:20 -08:00
RichardTang-Aden 28b81092f9 Merge pull request #4498 from jaathavan18/docs/issue-4494-fix-env-example-reference
docs: fix .env.example references in tools README
2026-02-11 10:11:53 -08:00
RichardTang-Aden 4b9a3abba6 micro-fix: python lint error (#4499)
* micro-fix: python lint error

* micro-fix: python lint format
2026-02-11 10:09:17 -08:00
Richard Tang 0c76b6dcb1 micro-fix: python lint format 2026-02-11 10:08:40 -08:00
Richard Tang 090a85b41b micro-fix: python lint error 2026-02-11 10:04:25 -08:00
jaathavan18 992d573573 docs: fix .env.example references in tools README (#4494) 2026-02-11 12:58:51 -05:00
e-cesar9 9e768e660b micro-fix: move inline imports to module level in edge.py (#4480)
Fixes #4445

Moved repeated inline imports (logging, json, re) to module-level:
- Eliminates import overhead on every method call
- Follows PEP 8 conventions
- Added module-level logger instance
- re is used at line 259 (re.search)

Changes:
- 4 lines added (imports + logger)
- 13 lines removed (inline imports)
- No functional changes
2026-02-11 09:55:14 -08:00
RichardTang-Aden 26b9ed362e Merge pull request #4479 from e-cesar9/microfix/4444-typo-stirct
micro-fix: fix typo STIRCT → STRICT in safe_eval.py
2026-02-11 09:48:55 -08:00
Timothy @aden 976ae75fde Merge pull request #4487 from adenhq/feature/windows-quickstart
chore(micro-fix): windows quickstart
2026-02-11 08:18:53 -08:00
e-cesar9 d63dd021ab micro-fix: fix typo STIRCT → STRICT in safe_eval.py
Fixes #4444
2026-02-11 14:40:25 +01:00
Zhang 697ba89314 fix(tui): add multiline input and cross-platform clipboard support (#4423)
Replace single-line Input widget with TextArea in chat REPL so
Shift+Enter inserts newlines and multiline paste works correctly.
Add Windows clipboard support (clip.exe) and xsel fallback for Linux.
2026-02-11 00:01:09 -08:00
saurabh007007 373ad77008 docs:fix typo in docs 2026-02-11 11:46:26 +05:30
66 changed files with 3187 additions and 312 deletions
+9
View File
@@ -0,0 +1,9 @@
{
"mcpServers": {
"agent-builder": {
"command": "uv",
"args": ["run", "--directory", "core", "-m", "framework.mcp.agent_builder_server"],
"disabled": false
}
}
}
+1
View File
@@ -0,0 +1 @@
../../.claude/skills/hive
+1
View File
@@ -0,0 +1 @@
../../.claude/skills/hive-concepts
+1
View File
@@ -0,0 +1 @@
../../.claude/skills/hive-create
+1
View File
@@ -0,0 +1 @@
../../.claude/skills/hive-credentials
+1
View File
@@ -0,0 +1 @@
../../.claude/skills/hive-patterns
+1
View File
@@ -0,0 +1 @@
../../.claude/skills/hive-test
+5
View File
@@ -0,0 +1,5 @@
---
description: hive-concepts
---
use hive-concepts skill
+5
View File
@@ -0,0 +1,5 @@
---
description: hive-create
---
use hive-create skill
+5
View File
@@ -0,0 +1,5 @@
---
description: hive-credentials
---
use hive-credentials skill
+5
View File
@@ -0,0 +1,5 @@
---
description: hive-patterns
---
use hive-patterns skill
+5
View File
@@ -0,0 +1,5 @@
---
description: hive-test
---
use hive-test skill
+5
View File
@@ -0,0 +1,5 @@
---
description: hive
---
use hive skill
+1
View File
@@ -0,0 +1 @@
../../.claude/skills/hive
+1
View File
@@ -0,0 +1 @@
../../.claude/skills/hive-concepts
+1
View File
@@ -0,0 +1 @@
../../.claude/skills/hive-create
+1
View File
@@ -0,0 +1 @@
../../.claude/skills/hive-credentials
+1
View File
@@ -0,0 +1 @@
../../.claude/skills/hive-patterns
+1
View File
@@ -0,0 +1 @@
../../.claude/skills/hive-test
+64 -1
View File
@@ -553,6 +553,26 @@ AskUserQuestion(questions=[{
- condition_expr (Python expression, only if conditional)
- priority (positive = forward, negative = feedback/loop-back)
**DETERMINE the graph lifecycle.** Not every agent needs a terminal node:
| Pattern | `terminal_nodes` | When to Use |
|---------|-------------------|-------------|
| **Linear (finish)** | `["last-node"]` | Agent completes a task and exits (batch processing, one-shot generation) |
| **Forever-alive (loop)** | `[]` (empty) | Agent stays alive for continuous interaction (research assistant, personal assistant, monitoring) |
**Forever-alive pattern:** The deep_research_agent example uses `terminal_nodes=[]`. Every leaf node has edges that loop back to earlier nodes, creating a perpetual session. The agent only stops when the user explicitly exits. This is the preferred pattern for interactive, multi-turn agents.
**Key design rules for forever-alive graphs:**
- Every node must have at least one outgoing edge (no dead ends)
- Client-facing nodes block for user input — these are the natural "pause points"
- The user controls when to stop, not the graph
- Sessions accumulate memory across loops — plan for conversation compaction
- Use `conversation_mode="continuous"` to preserve conversation history across node transitions
- `max_iterations` should be set high (e.g., 100) since the agent is designed to run indefinitely
- The agent will NOT enter a "completed" execution state — this is intentional, not a bug
**Ask the user** which lifecycle pattern fits their agent. Default to forever-alive for interactive agents, linear for batch/one-shot tasks.
**RENDER the complete graph as ASCII art.** Make it large and clear — the user needs to see and understand the full workflow at a glance.
**IMPORTANT: Make the ASCII art BIG and READABLE.** Use a box-and-arrow style with generous spacing. Do NOT make it tiny or compressed. Example format:
@@ -912,6 +932,46 @@ result = await executor.execute(graph=graph, goal=goal, input_data=input_data)
---
## REFERENCE: Graph Lifecycle & Conversation Memory
### Terminal vs Forever-Alive Graphs
Agents have two lifecycle patterns:
**Linear (terminal) graphs** have `terminal_nodes=["last-node"]`. Execution ends when the terminal node completes. The session enters a "completed" state. Use for batch processing, one-shot generation, and fire-and-forget tasks.
**Forever-alive graphs** have `terminal_nodes=[]` (empty). Every node has at least one outgoing edge — the graph loops indefinitely. The session **never enters a "completed" state** — this is intentional. The agent stays alive until the user explicitly exits. Use for interactive assistants, research tools, and any agent where the user drives the conversation.
The deep_research_agent example demonstrates this: `report` loops back to either `research` (dig deeper) or `intake` (new topic). The agent is a persistent, interactive assistant.
### Continuous Conversation Mode
When `conversation_mode="continuous"` is set on the GraphSpec, the framework preserves a **single conversation thread** across all node transitions:
**What the framework does automatically:**
- **Inherits conversation**: Same message history carries forward to the next node
- **Composes layered system prompts**: Identity (agent-level) + Narrative (auto-generated state summary) + Focus (per-node instructions)
- **Inserts transition markers**: At each node boundary, a "State of the World" message showing completed phases, current memory, and available data files
- **Accumulates tools**: Once a tool becomes available, it stays available in subsequent nodes
- **Compacts opportunistically**: At phase transitions, old tool results are pruned to stay within token budget
**What this means for agent builders:**
- Nodes don't need to re-explain context — the conversation carries it forward
- Output keys from earlier nodes are available in memory for edge conditions and later nodes
- For forever-alive agents, conversation memory persists across the entire session lifetime
- Plan for compaction: very long sessions will have older tool results summarized automatically
**When to use continuous mode:**
- Interactive agents with client-facing nodes (always)
- Multi-phase workflows where context matters across phases
- Forever-alive agents that loop indefinitely
**When NOT to use continuous mode:**
- Embarrassingly parallel fan-out nodes (each branch should be independent)
- Stateless utility agents that process items independently
---
## REFERENCE: Framework Capabilities for Qualification
Use this reference during STEP 2 to give accurate, honest assessments.
@@ -944,7 +1004,7 @@ Use this reference during STEP 2 to give accurate, honest assessments.
| Use Case | Why It's Problematic | Alternative |
|----------|---------------------|-------------|
| Long-running daemons | Framework is request-response, not persistent | External scheduler + agent |
| Persistent background daemons (no user) | Forever-alive graphs need a user at client-facing nodes; no autonomous background polling without user | External scheduler triggering agent runs |
| Sub-second responses | LLM latency is inherent | Traditional code, no LLM |
| Processing millions of items | Context windows and rate limits | Batch processing + sampling |
| Real-time streaming data | No built-in pub/sub or streaming input | Custom MCP server + agent |
@@ -979,3 +1039,6 @@ Use this reference during STEP 2 to give accurate, honest assessments.
11. **Adding framework gating for LLM behavior** - Fix prompts or use judges, not ad-hoc code
12. **Writing code before user approves the graph** - Always get approval on goal, nodes, and graph BEFORE writing any agent code
13. **Wrong mcp_servers.json format** - Use flat format (no `"mcpServers"` wrapper), `cwd` must be `"../../tools"`, and `command` must be `"uv"` with args `["run", "python", ...]`
14. **Assuming all agents need terminal nodes** - Interactive agents often work best with `terminal_nodes=[]` (forever-alive pattern). The agent never enters "completed" state — this is intentional. Only batch/one-shot agents need terminal nodes
15. **Creating dead-end nodes in forever-alive graphs** - Every node must have at least one outgoing edge. A node with no outgoing edges will cause execution to end unexpectedly, breaking the forever-alive loop
16. **Not using continuous conversation mode for interactive agents** - Multi-phase interactive agents should use `conversation_mode="continuous"` to preserve context across node transitions. Without it, each node starts with a blank conversation and loses all prior context
@@ -1,12 +1,15 @@
"""Agent graph construction for Deep Research Agent."""
from pathlib import Path
from framework.graph import EdgeSpec, EdgeCondition, Goal, SuccessCriterion, Constraint
from framework.graph.edge import GraphSpec
from framework.graph.executor import ExecutionResult, GraphExecutor
from framework.runtime.event_bus import EventBus
from framework.runtime.core import Runtime
from framework.graph.executor import ExecutionResult
from framework.graph.checkpoint_config import CheckpointConfig
from framework.llm import LiteLLMProvider
from framework.runner.tool_registry import ToolRegistry
from framework.runtime.agent_runtime import AgentRuntime, create_agent_runtime
from framework.runtime.execution_stream import EntryPointSpec
from .config import default_config, metadata
from .nodes import (
@@ -120,13 +123,31 @@ edges = [
condition_expr="needs_more_research == False",
priority=2,
),
# report -> research (user wants deeper research on current topic)
EdgeSpec(
id="report-to-research",
source="report",
target="research",
condition=EdgeCondition.CONDITIONAL,
condition_expr="str(next_action).lower() == 'more_research'",
priority=2,
),
# report -> intake (user wants a new topic — default when not more_research)
EdgeSpec(
id="report-to-intake",
source="report",
target="intake",
condition=EdgeCondition.CONDITIONAL,
condition_expr="str(next_action).lower() != 'more_research'",
priority=1,
),
]
# Graph configuration
entry_node = "intake"
entry_points = {"start": "intake"}
pause_nodes = []
terminal_nodes = ["report"]
terminal_nodes = []
class DeepResearchAgent:
@@ -136,6 +157,12 @@ class DeepResearchAgent:
Flow: intake -> research -> review -> report
^ |
+-- feedback loop (if user wants more)
Uses AgentRuntime for proper session management:
- Session-scoped storage (sessions/{session_id}/)
- Checkpointing for resume capability
- Runtime logging
- Data folder for save_data/load_data
"""
def __init__(self, config=None):
@@ -147,10 +174,10 @@ class DeepResearchAgent:
self.entry_points = entry_points
self.pause_nodes = pause_nodes
self.terminal_nodes = terminal_nodes
self._executor: GraphExecutor | None = None
self._graph: GraphSpec | None = None
self._event_bus: EventBus | None = None
self._agent_runtime: AgentRuntime | None = None
self._tool_registry: ToolRegistry | None = None
self._storage_path: Path | None = None
def _build_graph(self) -> GraphSpec:
"""Build the GraphSpec."""
@@ -171,16 +198,20 @@ class DeepResearchAgent:
"max_tool_calls_per_turn": 20,
"max_history_tokens": 32000,
},
conversation_mode="continuous",
identity_prompt=(
"You are a rigorous research agent. You search for information "
"from diverse, authoritative sources, analyze findings critically, "
"and produce well-cited reports. You never fabricate information — "
"every claim must trace back to a source you actually retrieved."
),
)
def _setup(self, mock_mode=False) -> GraphExecutor:
"""Set up the executor with all components."""
from pathlib import Path
def _setup(self, mock_mode=False) -> None:
"""Set up the agent runtime with sessions, checkpoints, and logging."""
self._storage_path = Path.home() / ".hive" / "agents" / "deep_research_agent"
self._storage_path.mkdir(parents=True, exist_ok=True)
storage_path = Path.home() / ".hive" / "agents" / "deep_research_agent"
storage_path.mkdir(parents=True, exist_ok=True)
self._event_bus = EventBus()
self._tool_registry = ToolRegistry()
mcp_config_path = Path(__file__).parent / "mcp_servers.json"
@@ -199,47 +230,63 @@ class DeepResearchAgent:
tools = list(self._tool_registry.get_tools().values())
self._graph = self._build_graph()
runtime = Runtime(storage_path)
self._executor = GraphExecutor(
runtime=runtime,
checkpoint_config = CheckpointConfig(
enabled=True,
checkpoint_on_node_start=False,
checkpoint_on_node_complete=True,
checkpoint_max_age_days=7,
async_checkpoint=True,
)
entry_point_specs = [
EntryPointSpec(
id="default",
name="Default",
entry_node=self.entry_node,
trigger_type="manual",
isolation_level="shared",
)
]
self._agent_runtime = create_agent_runtime(
graph=self._graph,
goal=self.goal,
storage_path=self._storage_path,
entry_points=entry_point_specs,
llm=llm,
tools=tools,
tool_executor=tool_executor,
event_bus=self._event_bus,
storage_path=storage_path,
loop_config=self._graph.loop_config,
checkpoint_config=checkpoint_config,
)
return self._executor
async def start(self, mock_mode=False) -> None:
"""Set up the agent (initialize executor and tools)."""
if self._executor is None:
"""Set up and start the agent runtime."""
if self._agent_runtime is None:
self._setup(mock_mode=mock_mode)
if not self._agent_runtime.is_running:
await self._agent_runtime.start()
async def stop(self) -> None:
"""Clean up resources."""
self._executor = None
self._event_bus = None
"""Stop the agent runtime and clean up."""
if self._agent_runtime and self._agent_runtime.is_running:
await self._agent_runtime.stop()
self._agent_runtime = None
async def trigger_and_wait(
self,
entry_point: str,
input_data: dict,
entry_point: str = "default",
input_data: dict | None = None,
timeout: float | None = None,
session_state: dict | None = None,
) -> ExecutionResult | None:
"""Execute the graph and wait for completion."""
if self._executor is None:
if self._agent_runtime is None:
raise RuntimeError("Agent not started. Call start() first.")
if self._graph is None:
raise RuntimeError("Graph not built. Call start() first.")
return await self._executor.execute(
graph=self._graph,
goal=self.goal,
input_data=input_data,
return await self._agent_runtime.trigger_and_wait(
entry_point_id=entry_point,
input_data=input_data or {},
session_state=session_state,
)
@@ -250,7 +297,7 @@ class DeepResearchAgent:
await self.start(mock_mode=mock_mode)
try:
result = await self.trigger_and_wait(
"start", context, session_state=session_state
"default", context, session_state=session_state
)
return result or ExecutionResult(success=False, error="Execution timeout")
finally:
@@ -10,8 +10,13 @@ intake_node = NodeSpec(
description="Discuss the research topic with the user, clarify scope, and confirm direction",
node_type="event_loop",
client_facing=True,
max_node_visits=0,
input_keys=["topic"],
output_keys=["research_brief"],
success_criteria=(
"The research brief is specific and actionable: it states the topic, "
"the key questions to answer, the desired scope, and depth."
),
system_prompt="""\
You are a research intake specialist. The user wants to research a topic.
Have a brief conversation to clarify what they need.
@@ -38,10 +43,14 @@ research_node = NodeSpec(
name="Research",
description="Search the web, fetch source content, and compile findings",
node_type="event_loop",
max_node_visits=3,
max_node_visits=0,
input_keys=["research_brief", "feedback"],
output_keys=["findings", "sources", "gaps"],
nullable_output_keys=["feedback"],
success_criteria=(
"Findings reference at least 3 distinct sources with URLs. "
"Key claims are substantiated by fetched content, not generated."
),
system_prompt="""\
You are a research agent. Given a research brief, find and analyze sources.
@@ -56,18 +65,19 @@ Work in phases:
and any contradictions between sources.
Important:
- Work in batches of 3-4 tool calls at a time to manage context
- Work in batches of 3-4 tool calls at a time never more than 10 per turn
- After each batch, assess whether you have enough material
- Prefer quality over quantity 5 good sources beat 15 thin ones
- Track which URL each finding comes from (you'll need citations later)
- Call set_output for each key in a SEPARATE turn (not in the same turn as other tool calls)
When done, use set_output:
When done, use set_output (one key at a time, separate turns):
- set_output("findings", "Structured summary: key findings with source URLs for each claim. \
Include themes, contradictions, and confidence levels.")
- set_output("sources", [{"url": "...", "title": "...", "summary": "..."}])
- set_output("gaps", "What aspects of the research brief are NOT well-covered yet, if any.")
""",
tools=["web_search", "web_scrape", "load_data", "save_data", "list_data_files"],
tools=["web_search", "web_scrape", "load_data", "save_data", "append_data", "list_data_files"],
)
# Node 3: Review (client-facing)
@@ -78,9 +88,13 @@ review_node = NodeSpec(
description="Present findings to user and decide whether to research more or write the report",
node_type="event_loop",
client_facing=True,
max_node_visits=3,
max_node_visits=0,
input_keys=["findings", "sources", "gaps", "research_brief"],
output_keys=["needs_more_research", "feedback"],
success_criteria=(
"The user has been presented with findings and has explicitly indicated "
"whether they want more research or are ready for the report."
),
system_prompt="""\
Present the research findings to the user clearly and concisely.
@@ -109,49 +123,66 @@ report_node = NodeSpec(
description="Write a cited HTML report from the findings and present it to the user",
node_type="event_loop",
client_facing=True,
max_node_visits=0,
input_keys=["findings", "sources", "research_brief"],
output_keys=["delivery_status"],
output_keys=["delivery_status", "next_action"],
success_criteria=(
"An HTML report has been saved, the file link has been presented to the user, "
"and the user has indicated what they want to do next."
),
system_prompt="""\
Write a comprehensive research report as an HTML file and present it to the user.
Write a research report as an HTML file and present it to the user.
**STEP 1 Write the HTML report (tool calls, NO text to user yet):**
IMPORTANT: save_data requires TWO separate arguments: filename and data.
Call it like: save_data(filename="report.html", data="<html>...</html>")
Do NOT use _raw, do NOT nest arguments inside a JSON string.
1. Compose a complete, self-contained HTML document with embedded CSS styling.
Use a clean, readable design: max-width container, pleasant typography,
numbered citation links, a table of contents, and a references section.
**STEP 1 Write and save the HTML report (tool calls, NO text to user yet):**
Report structure inside the HTML:
- Title & date
- Executive Summary (2-3 paragraphs)
- Table of Contents
- Findings (organized by theme, with [n] citation links)
- Analysis (synthesis, implications, areas of debate)
- Conclusion (key takeaways, confidence assessment)
- References (numbered list with clickable URLs)
Build a clean HTML document. Keep the HTML concise aim for clarity over length.
Use minimal embedded CSS (a few lines of style, not a full framework).
Requirements:
- Every factual claim must cite its source with [n] notation
- Be objective present multiple viewpoints where sources disagree
- Distinguish well-supported conclusions from speculation
- Answer the original research questions from the brief
Report structure:
- Title & date
- Executive Summary (2-3 paragraphs)
- Key Findings (organized by theme, with [n] citation links)
- Analysis (synthesis, implications)
- Conclusion (key takeaways)
- References (numbered list with clickable URLs)
2. Save the HTML file:
save_data(filename="report.html", data=<your_html>)
Requirements:
- Every factual claim must cite its source with [n] notation
- Be objective present multiple viewpoints where sources disagree
- Answer the original research questions from the brief
3. Get the clickable link:
serve_file_to_user(filename="report.html", label="Research Report")
Save the HTML:
save_data(filename="report.html", data="<html>...</html>")
Then get the clickable link:
serve_file_to_user(filename="report.html", label="Research Report")
If save_data fails, simplify and shorten the HTML, then retry.
**STEP 2 Present the link to the user (text only, NO tool calls):**
Tell the user the report is ready and include the file:// URI from
serve_file_to_user so they can click it to open. Give a brief summary
of what the report covers. Ask if they have questions.
of what the report covers. Ask if they have questions or want to continue.
**STEP 3 After the user responds:**
- Answer follow-up questions from the research material
- When the user is satisfied: set_output("delivery_status", "completed")
- Answer any follow-up questions from the research material
- When the user is ready to move on, ask what they'd like to do next:
- Research a new topic?
- Dig deeper into the current topic?
- Then call set_output:
- set_output("delivery_status", "completed")
- set_output("next_action", "new_topic") if they want a new topic
- set_output("next_action", "more_research") if they want deeper research
""",
tools=["save_data", "serve_file_to_user", "load_data", "list_data_files"],
tools=[
"save_data", "append_data", "edit_data",
"serve_file_to_user", "load_data", "list_data_files",
],
)
__all__ = [
+23 -1
View File
@@ -26,6 +26,17 @@ Use `/hive-debugger` when:
This skill works alongside agents running in TUI mode and provides supervisor-level insights into execution behavior.
### Forever-Alive Agent Awareness
Some agents use `terminal_nodes=[]` (the "forever-alive" pattern), meaning they loop indefinitely and never enter a "completed" execution state. For these agents:
- Sessions with status "in_progress" or "paused" are **normal**, not failures
- High step counts, long durations, and many node visits are expected behavior
- The agent stops only when the user explicitly exits — there is no graph-driven completion
- Debug focus should be on **quality of individual node visits and iterations**, not whether the session reached a terminal state
- Conversation memory accumulates across loops — watch for context overflow and stale data issues
**How to identify forever-alive agents:** Check `agent.py` or `agent.json` for `terminal_nodes=[]` (empty list). If empty, the agent is forever-alive.
---
## Prerequisites
@@ -142,6 +153,7 @@ Store the selected mode for the session.
- Check `attention_summary.categories` for issue types
- Note the `run_id` of problematic sessions
- Check `status` field: "degraded", "failure", "in_progress"
- **For forever-alive agents:** Sessions with status "in_progress" or "paused" are normal — these agents never reach "completed". Only flag sessions with `needs_attention: true` or actual error indicators (tool failures, retry loops, missing outputs). High step counts alone do not indicate a problem.
3. **Attention flag triggers to understand:**
From runtime_logger.py, runs are flagged when:
@@ -199,13 +211,20 @@ Which run would you like to investigate?
| **Tool Errors** | `tool_error_count > 0`, `attention_reasons` contains "tool_failures" | Tool calls failed (API errors, timeouts, auth issues) |
| **Retry Loops** | `retry_count > 3`, `verdict_counts.RETRY > 5` | Judge repeatedly rejecting outputs |
| **Guard Failures** | `guard_reject_count > 0` | Output validation failed (wrong types, missing keys) |
| **Stalled Execution** | `total_steps > 20`, `verdict_counts.CONTINUE > 10` | EventLoopNode not making progress |
| **Stalled Execution** | `total_steps > 20`, `verdict_counts.CONTINUE > 10` | EventLoopNode not making progress. **Caveat:** Forever-alive agents may legitimately have high step counts — check if agent is blocked at a client-facing node (normal) vs genuinely stuck in a loop |
| **High Latency** | `latency_ms > 60000`, `avg_step_latency > 5000` | Slow tool calls or LLM responses |
| **Client-Facing Issues** | `client_input_requested` but no `user_input_received` | Premature set_output before user input |
| **Edge Routing Errors** | `exit_status == "no_valid_edge"`, `attention_reasons` contains "routing_issue" | No edges match current state |
| **Memory/Context Issues** | `tokens_used > 100000`, `context_overflow_count > 0` | Conversation history too long |
| **Constraint Violations** | Compare output against goal constraints | Agent violated goal-level rules |
**Forever-Alive Agent Caveat:** If the agent uses `terminal_nodes=[]`, sessions will never reach "completed" status. This is by design. When debugging these agents, focus on:
- Whether individual node visits succeed (not whether the graph "finishes")
- Quality of each loop iteration — are outputs improving or degrading across loops?
- Whether client-facing nodes are correctly blocking for user input
- Memory accumulation issues: stale data from previous loops, context overflow across many iterations
- Conversation compaction behavior: is the conversation growing unbounded?
3. **Analyze each flagged node:**
- Node ID and name
- Exit status
@@ -1015,6 +1034,9 @@ Your agent should now work correctly!"
3. **Don't ignore edge conditions** - Missing edges cause routing failures
4. **Don't overlook judge configuration** - Mismatched expectations cause retry loops
5. **Don't forget nullable_output_keys** - Optional inputs need explicit marking
6. **Don't diagnose "in_progress" as a failure for forever-alive agents** - Agents with `terminal_nodes=[]` are designed to never enter "completed" state. This is intentional. Focus on quality of individual node visits, not session completion status
7. **Don't ignore conversation memory issues in long-running sessions** - In continuous conversation mode, history grows across node transitions and loop iterations. Watch for context overflow (tokens_used > 100K), stale data from previous loops affecting edge conditions, and compaction failures that cause the LLM to lose important context
8. **Don't confuse "waiting for user" with "stalled"** - Client-facing nodes in forever-alive agents block for user input by design. A session paused at a client-facing node is working correctly, not stalled
---
+7
View File
@@ -0,0 +1,7 @@
# Project-level Codex config for Hive.
# Keep this file minimal: MCP connectivity + skill discovery.
[mcp_servers.agent-builder]
command = "uv"
args = ["run", "--directory", "core", "-m", "framework.mcp.agent_builder_server"]
cwd = "."
-5
View File
@@ -4,11 +4,6 @@
"command": "uv",
"args": ["run", "-m", "framework.mcp.agent_builder_server"],
"cwd": "core"
},
"tools": {
"command": "uv",
"args": ["run", "mcp_server.py", "--stdio"],
"cwd": "tools"
}
}
}
+1 -2
View File
@@ -99,8 +99,7 @@ docs(readme): update installation instructions
2. Update documentation if needed
3. Add tests for new functionality
4. Ensure `make check` and `make test` pass
5. Update the CHANGELOG.md if applicable
6. Request review from maintainers
5. Request review from maintainers
### PR Title Format
+24 -2
View File
@@ -22,7 +22,6 @@
<img src="https://img.shields.io/badge/MCP-102_Tools-00ADD8?style=flat-square" alt="MCP" />
</p>
<p align="center">
<img src="https://img.shields.io/badge/AI_Agents-Self--Improving-brightgreen?style=flat-square" alt="AI Agents" />
<img src="https://img.shields.io/badge/Multi--Agent-Systems-blue?style=flat-square" alt="Multi-Agent" />
@@ -82,7 +81,7 @@ Use Hive when you need:
### Prerequisites
- Python 3.11+ for agent development
- Claude Code or Cursor for utilizing agent skills
- Claude Code, Codex CLI, or Cursor for utilizing agent skills
> **Note for Windows Users:** It is strongly recommended to use **WSL (Windows Subsystem for Linux)** or **Git Bash** to run this framework. Some core automation scripts may not execute correctly in standard Command Prompt or PowerShell.
@@ -121,6 +120,18 @@ hive tui
hive run exports/your_agent_name --input '{"key": "value"}'
```
## Coding Agent Support
### Codex CLI
Hive includes native support for [OpenAI Codex CLI](https://github.com/openai/codex) (v0.101.0+).
1. **Config:** `.codex/config.toml` with `agent-builder` MCP server (tracked in git)
2. **Skills:** `.agents/skills/` symlinks to Hive skills (tracked in git)
3. **Launch:** Run `codex` in the repo root, then type `use hive`
Example:
```
codex> use hive
```
### Opencode
Hive includes native support for [Opencode](https://github.com/opencode-ai/opencode).
@@ -133,6 +144,16 @@ The agent has access to all Hive skills and can scaffold agents, add tools, and
**[📖 Complete Setup Guide](docs/environment-setup.md)** - Detailed instructions for agent development
### Antigravity IDE Support
Skills and MCP servers are also available in [Antigravity IDE](https://antigravity.google/) (Google's AI-powered IDE). **Easiest:** open a terminal in the hive repo folder and run (use `./` — the script is inside the repo):
```bash
./scripts/setup-antigravity-mcp.sh
```
**Important:** Always restart/refresh Antigravity IDE after running the setup script—MCP servers only load on startup. After restart, **agent-builder** and **tools** MCP servers should connect. Skills are under `.agent/skills/` (symlinks to `.claude/skills/`). See [docs/antigravity-setup.md](docs/antigravity-setup.md) for manual setup and troubleshooting.
## Features
- **[Goal-Driven Development](docs/key_concepts/goals_outcome.md)** - Define objectives in natural language; the coding agent generates the agent graph and connection code to achieve them
@@ -312,6 +333,7 @@ subgraph Expansion
j2["Cursor"]
j3["Opencode"]
j4["Antigravity"]
j5["Codex CLI"]
end
subgraph plat["Platform"]
k1["JavaScript/TypeScript SDK"]
+71 -5
View File
@@ -27,6 +27,9 @@ class Message:
tool_use_id: str | None = None
tool_calls: list[dict[str, Any]] | None = None
is_error: bool = False
# Phase-aware compaction metadata (continuous mode)
phase_id: str | None = None
is_transition_marker: bool = False
def to_llm_dict(self) -> dict[str, Any]:
"""Convert to OpenAI-format message dict."""
@@ -60,6 +63,10 @@ class Message:
d["tool_calls"] = self.tool_calls
if self.is_error:
d["is_error"] = self.is_error
if self.phase_id is not None:
d["phase_id"] = self.phase_id
if self.is_transition_marker:
d["is_transition_marker"] = self.is_transition_marker
return d
@classmethod
@@ -72,6 +79,8 @@ class Message:
tool_use_id=data.get("tool_use_id"),
tool_calls=data.get("tool_calls"),
is_error=data.get("is_error", False),
phase_id=data.get("phase_id"),
is_transition_marker=data.get("is_transition_marker", False),
)
@@ -188,6 +197,7 @@ class NodeConversation:
self._next_seq: int = 0
self._meta_persisted: bool = False
self._last_api_input_tokens: int | None = None
self._current_phase: str | None = None
# --- Properties --------------------------------------------------------
@@ -195,6 +205,33 @@ class NodeConversation:
def system_prompt(self) -> str:
return self._system_prompt
def update_system_prompt(self, new_prompt: str) -> None:
"""Update the system prompt.
Used in continuous conversation mode at phase transitions to swap
Layer 3 (focus) while preserving the conversation history.
"""
self._system_prompt = new_prompt
def set_current_phase(self, phase_id: str) -> None:
"""Set the current phase ID. Subsequent messages will be stamped with it."""
self._current_phase = phase_id
async def switch_store(self, new_store: ConversationStore) -> None:
"""Switch to a new persistence store at a phase transition.
Subsequent messages are written to *new_store*. Meta (system
prompt, config) is re-persisted on the next write so the new
store's ``meta.json`` reflects the updated prompt.
"""
self._store = new_store
self._meta_persisted = False
await new_store.write_cursor({"next_seq": self._next_seq})
@property
def current_phase(self) -> str | None:
return self._current_phase
@property
def messages(self) -> list[Message]:
"""Return a defensive copy of the message list."""
@@ -216,8 +253,19 @@ class NodeConversation:
# --- Add messages ------------------------------------------------------
async def add_user_message(self, content: str) -> Message:
msg = Message(seq=self._next_seq, role="user", content=content)
async def add_user_message(
self,
content: str,
*,
is_transition_marker: bool = False,
) -> Message:
msg = Message(
seq=self._next_seq,
role="user",
content=content,
phase_id=self._current_phase,
is_transition_marker=is_transition_marker,
)
self._messages.append(msg)
self._next_seq += 1
await self._persist(msg)
@@ -233,6 +281,7 @@ class NodeConversation:
role="assistant",
content=content,
tool_calls=tool_calls,
phase_id=self._current_phase,
)
self._messages.append(msg)
self._next_seq += 1
@@ -251,6 +300,7 @@ class NodeConversation:
content=content,
tool_use_id=tool_use_id,
is_error=is_error,
phase_id=self._current_phase,
)
self._messages.append(msg)
self._next_seq += 1
@@ -380,6 +430,11 @@ class NodeConversation:
spillover filename reference (if any). Message structure (role,
seq, tool_use_id) stays valid for the LLM API.
Phase-aware behavior (continuous mode): when messages have ``phase_id``
metadata, all messages in the current phase are protected regardless of
token budget. Transition markers are never pruned. Older phases' tool
results are pruned more aggressively.
Error tool results are never pruned they prevent re-calling
failing tools.
@@ -388,13 +443,18 @@ class NodeConversation:
if not self._messages:
return 0
# Phase 1: Walk backward, classify tool results as protected vs pruneable
# Walk backward, classify tool results as protected vs pruneable
protected_tokens = 0
pruneable: list[int] = [] # indices into self._messages
pruneable_tokens = 0
for i in range(len(self._messages) - 1, -1, -1):
msg = self._messages[i]
# Transition markers are never pruned (any role)
if msg.is_transition_marker:
continue
if msg.role != "tool":
continue
if msg.is_error:
@@ -402,6 +462,10 @@ class NodeConversation:
if msg.content.startswith("[Pruned tool result"):
continue # already pruned
# Phase-aware: protect current phase messages
if self._current_phase and msg.phase_id == self._current_phase:
continue
est = len(msg.content) // 4
if protected_tokens < protect_tokens:
protected_tokens += est
@@ -409,11 +473,11 @@ class NodeConversation:
pruneable.append(i)
pruneable_tokens += est
# Phase 2: Only prune if enough to be worthwhile
# Only prune if enough to be worthwhile
if pruneable_tokens < min_prune_tokens:
return 0
# Phase 3: Replace content with compact placeholder
# Replace content with compact placeholder
count = 0
for i in pruneable:
msg = self._messages[i]
@@ -436,6 +500,8 @@ class NodeConversation:
tool_use_id=msg.tool_use_id,
tool_calls=msg.tool_calls,
is_error=msg.is_error,
phase_id=msg.phase_id,
is_transition_marker=msg.is_transition_marker,
)
count += 1
+177
View File
@@ -0,0 +1,177 @@
"""Level 2 Conversation-Aware Judge.
When a node has `success_criteria` set, the implicit judge upgrades:
after Level 0 passes (all output keys set), a fast LLM call evaluates
whether the conversation actually meets the criteria.
This prevents nodes from "checking boxes" (setting output keys) without
doing quality work. The LLM reads the recent conversation and assesses
whether the phase's goal was genuinely accomplished.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass
from typing import Any
from framework.graph.conversation import NodeConversation
from framework.llm.provider import LLMProvider
logger = logging.getLogger(__name__)
@dataclass
class PhaseVerdict:
"""Result of Level 2 conversation-aware evaluation."""
action: str # "ACCEPT" or "RETRY"
confidence: float = 0.8
feedback: str = ""
async def evaluate_phase_completion(
llm: LLMProvider,
conversation: NodeConversation,
phase_name: str,
phase_description: str,
success_criteria: str,
accumulator_state: dict[str, Any],
max_history_tokens: int = 8_196,
) -> PhaseVerdict:
"""Level 2 judge: read the conversation and evaluate quality.
Only called after Level 0 passes (all output keys set).
Args:
llm: LLM provider for evaluation
conversation: The current conversation to evaluate
phase_name: Name of the current phase/node
phase_description: Description of the phase
success_criteria: Natural-language criteria for phase completion
accumulator_state: Current output key values
max_history_tokens: Main conversation token budget (judge gets 20%)
Returns:
PhaseVerdict with action and optional feedback
"""
# Build a compact view of the recent conversation
recent_messages = _extract_recent_context(conversation, max_messages=10)
outputs_summary = _format_outputs(accumulator_state)
system_prompt = (
"You are a quality judge evaluating whether a phase of work is complete. "
"Be concise. Evaluate based on the success criteria, not on style."
)
user_prompt = f"""Evaluate this phase:
PHASE: {phase_name}
DESCRIPTION: {phase_description}
SUCCESS CRITERIA:
{success_criteria}
OUTPUTS SET:
{outputs_summary}
RECENT CONVERSATION:
{recent_messages}
Has this phase accomplished its goal based on the success criteria?
Respond in exactly this format:
ACTION: ACCEPT or RETRY
CONFIDENCE: 0.X
FEEDBACK: (reason if RETRY, empty if ACCEPT)"""
try:
response = llm.complete(
messages=[{"role": "user", "content": user_prompt}],
system=system_prompt,
max_tokens=max(1024, max_history_tokens // 5),
max_retries=1,
)
if not response.content or not response.content.strip():
logger.debug("Level 2 judge: empty response, accepting by default")
return PhaseVerdict(action="ACCEPT", confidence=0.5, feedback="")
return _parse_verdict(response.content)
except Exception as e:
logger.warning(f"Level 2 judge failed, accepting by default: {e}")
# On failure, don't block — Level 0 already passed
return PhaseVerdict(action="ACCEPT", confidence=0.5, feedback="")
def _extract_recent_context(conversation: NodeConversation, max_messages: int = 10) -> str:
"""Extract recent conversation messages for evaluation."""
messages = conversation.messages
recent = messages[-max_messages:] if len(messages) > max_messages else messages
parts = []
for msg in recent:
role = msg.role.upper()
content = msg.content or ""
# Truncate long tool results
if msg.role == "tool" and len(content) > 200:
content = content[:200] + "..."
if content.strip():
parts.append(f"[{role}]: {content.strip()}")
return "\n".join(parts) if parts else "(no messages)"
def _format_outputs(accumulator_state: dict[str, Any]) -> str:
"""Format output key values for evaluation.
Lists and dicts get structural formatting so the judge can assess
quantity and structure, not just a truncated stringification.
"""
if not accumulator_state:
return "(none)"
parts = []
for key, value in accumulator_state.items():
if isinstance(value, list):
# Show count + brief per-item preview so the judge can
# verify quantity without the full serialization.
items_preview = []
for i, item in enumerate(value[:8]):
item_str = str(item)
if len(item_str) > 150:
item_str = item_str[:150] + "..."
items_preview.append(f" [{i}]: {item_str}")
val_str = f"list ({len(value)} items):\n" + "\n".join(items_preview)
if len(value) > 8:
val_str += f"\n ... and {len(value) - 8} more"
elif isinstance(value, dict):
val_str = str(value)
if len(val_str) > 400:
val_str = val_str[:400] + "..."
else:
val_str = str(value)
if len(val_str) > 300:
val_str = val_str[:300] + "..."
parts.append(f" {key}: {val_str}")
return "\n".join(parts)
def _parse_verdict(response: str) -> PhaseVerdict:
"""Parse LLM response into PhaseVerdict."""
action = "ACCEPT"
confidence = 0.8
feedback = ""
for line in response.strip().split("\n"):
line = line.strip()
if line.startswith("ACTION:"):
action_str = line.split(":", 1)[1].strip().upper()
if action_str in ("ACCEPT", "RETRY"):
action = action_str
elif line.startswith("CONFIDENCE:"):
try:
confidence = float(line.split(":", 1)[1].strip())
except ValueError:
pass
elif line.startswith("FEEDBACK:"):
feedback = line.split(":", 1)[1].strip()
return PhaseVerdict(action=action, confidence=confidence, feedback=feedback)
+24 -13
View File
@@ -21,6 +21,9 @@ allowing the LLM to evaluate whether proceeding along an edge makes sense
given the current goal, context, and execution state.
"""
import json
import logging
import re
from enum import StrEnum
from typing import Any
@@ -28,6 +31,8 @@ from pydantic import BaseModel, Field, model_validator
from framework.graph.safe_eval import safe_eval
logger = logging.getLogger(__name__)
DEFAULT_MAX_TOKENS = 8192
@@ -158,9 +163,6 @@ class EdgeSpec(BaseModel):
memory: dict[str, Any],
) -> bool:
"""Evaluate a conditional expression."""
import logging
logger = logging.getLogger(__name__)
if not self.condition_expr:
return True
@@ -217,8 +219,6 @@ class EdgeSpec(BaseModel):
The LLM evaluates whether proceeding to the target node
is the best next step toward achieving the goal.
"""
import json
# Build context for LLM
prompt = f"""You are evaluating whether to proceed along an edge in an agent workflow.
@@ -254,8 +254,6 @@ Respond with ONLY a JSON object:
)
# Parse response
import re
json_match = re.search(r"\{[^{}]*\}", response.content, re.DOTALL)
if json_match:
data = json.loads(json_match.group())
@@ -263,9 +261,6 @@ Respond with ONLY a JSON object:
reasoning = data.get("reasoning", "")
# Log the decision (using basic print for now)
import logging
logger = logging.getLogger(__name__)
logger.info(f" 🤔 LLM routing decision: {'PROCEED' if proceed else 'SKIP'}")
logger.info(f" Reason: {reasoning}")
@@ -273,9 +268,6 @@ Respond with ONLY a JSON object:
except Exception as e:
# Fallback: proceed on success
import logging
logger = logging.getLogger(__name__)
logger.warning(f" ⚠ LLM routing failed, defaulting to on_success: {e}")
return source_success
@@ -443,6 +435,25 @@ class GraphSpec(BaseModel):
description="EventLoopNode configuration (max_iterations, max_tool_calls_per_turn, etc.)",
)
# Conversation mode
conversation_mode: str = Field(
default="continuous",
description=(
"How conversations flow between event_loop nodes. "
"'continuous' (default): one conversation threads through all "
"event_loop nodes with cumulative tools and layered prompt composition. "
"'isolated': each node gets a fresh conversation."
),
)
identity_prompt: str | None = Field(
default=None,
description=(
"Agent-level identity prompt (Layer 1 of the onion model). "
"In continuous mode, this is the static identity that persists "
"unchanged across all node transitions. In isolated mode, ignored."
),
)
# Metadata
description: str = ""
created_by: str = "" # "human" or "builder_agent"
+222 -41
View File
@@ -149,7 +149,7 @@ class EventLoopNode(NodeProtocol):
1. Try to restore from durable state (crash recovery)
2. If no prior state, init from NodeSpec.system_prompt + input_keys
3. Loop: drain injection queue -> stream LLM -> execute tools
-> if client_facing + ask_user called: block for user input
-> if client_facing: block for user input (see below)
-> judge evaluates (acceptance criteria)
(each add_* and set_output writes through to store immediately)
4. Publish events to EventBus at each stage
@@ -157,11 +157,17 @@ class EventLoopNode(NodeProtocol):
6. Terminate when judge returns ACCEPT, shutdown signaled, or max iterations
7. Build output dict from OutputAccumulator
Client-facing blocking: When ``client_facing=True``, a synthetic
``ask_user`` tool is injected. The node blocks for user input ONLY
when the LLM explicitly calls ``ask_user()``. Text-only turns
without ``ask_user`` flow through without blocking, allowing the LLM
to stream progress updates and summaries freely.
Client-facing blocking (``client_facing=True``):
- **Text-only turns** (no real tool calls, no set_output)
automatically block for user input. If the LLM is talking to the
user (not calling tools or setting outputs), it should wait for
the user's response before the judge runs.
- **Work turns** (tool calls or set_output) flow through without
blocking the LLM is making progress, not asking the user.
- A synthetic ``ask_user`` tool is also injected for explicit
blocking when the LLM wants to be deliberate about requesting
input (e.g. mid-tool-call).
Always returns NodeResult with retryable=False semantics. The executor
must NOT retry event loop nodes -- retry is handled internally by the
@@ -234,23 +240,44 @@ class EventLoopNode(NodeProtocol):
return NodeResult(success=False, error=error_msg)
# 2. Restore or create new conversation + accumulator
conversation, accumulator, start_iteration = await self._restore(ctx)
if conversation is None:
system_prompt = ctx.node_spec.system_prompt or ""
# Track whether we're in continuous mode (conversation threaded across nodes)
_is_continuous = getattr(ctx, "continuous_mode", False)
conversation = NodeConversation(
system_prompt=system_prompt,
max_history_tokens=self._config.max_history_tokens,
output_keys=ctx.node_spec.output_keys or None,
store=self._conversation_store,
)
if _is_continuous and ctx.inherited_conversation is not None:
# Continuous mode with inherited conversation from prior node.
# This takes priority over store restoration — when the graph loops
# back to a previously-visited node, the inherited conversation
# carries forward the full thread rather than restoring stale state.
# System prompt already updated by executor. Transition marker
# already inserted by executor. Fresh accumulator for this phase.
# Phase already set by executor via set_current_phase().
conversation = ctx.inherited_conversation
conversation._output_keys = ctx.node_spec.output_keys or None
accumulator = OutputAccumulator(store=self._conversation_store)
start_iteration = 0
else:
# Try crash-recovery restore from store, then fall back to fresh.
conversation, accumulator, start_iteration = await self._restore(ctx)
if conversation is None:
# Fresh conversation: either isolated mode or first node in continuous mode.
system_prompt = ctx.node_spec.system_prompt or ""
# Add initial user message from input data
initial_message = self._build_initial_message(ctx)
if initial_message:
await conversation.add_user_message(initial_message)
conversation = NodeConversation(
system_prompt=system_prompt,
max_history_tokens=self._config.max_history_tokens,
output_keys=ctx.node_spec.output_keys or None,
store=self._conversation_store,
)
# Stamp phase for first node in continuous mode
if _is_continuous:
conversation.set_current_phase(ctx.node_id)
accumulator = OutputAccumulator(store=self._conversation_store)
start_iteration = 0
# Add initial user message from input data
initial_message = self._build_initial_message(ctx)
if initial_message:
await conversation.add_user_message(initial_message)
# 3. Build tool list: node tools + synthetic set_output + ask_user tools
tools = list(ctx.available_tools)
@@ -305,6 +332,7 @@ class EventLoopNode(NodeProtocol):
output=accumulator.to_dict(),
tokens_used=total_input_tokens + total_output_tokens,
latency_ms=latency_ms,
conversation=conversation if _is_continuous else None,
)
# 6b. Drain injection queue
@@ -429,6 +457,7 @@ class EventLoopNode(NodeProtocol):
output=accumulator.to_dict(),
tokens_used=total_input_tokens + total_output_tokens,
latency_ms=latency_ms,
conversation=conversation if _is_continuous else None,
)
# 6f. Stall detection
@@ -479,6 +508,7 @@ class EventLoopNode(NodeProtocol):
output=accumulator.to_dict(),
tokens_used=total_input_tokens + total_output_tokens,
latency_ms=latency_ms,
conversation=conversation if _is_continuous else None,
)
# 6g. Write cursor checkpoint
@@ -486,12 +516,27 @@ class EventLoopNode(NodeProtocol):
# 6h. Client-facing input blocking
#
# Block ONLY when the LLM explicitly calls ask_user().
# Text-only turns and set_output-only turns flow through
# without blocking, allowing progress updates and summaries
# to stream freely. After user input arrives, fall through
# to judge evaluation (6i) — the judge handles acceptance.
if ctx.node_spec.client_facing and user_input_requested:
# Two triggers:
# (a) Explicit ask_user() — always blocks, then falls through
# to judge evaluation (6i).
# (b) Auto-block — a text-only turn (no real tools, no
# set_output) from a client-facing node is addressed to the
# user. Block for their response, then *skip* judge so the
# next LLM turn can process the reply without confusing
# "missing outputs" feedback.
#
# Turns that include tool calls or set_output are *work*, not
# conversation — they flow through without blocking.
_cf_block = False
_cf_auto = False
if ctx.node_spec.client_facing:
if user_input_requested:
_cf_block = True
elif assistant_text and not real_tool_results and not outputs_set:
_cf_block = True
_cf_auto = True
if _cf_block:
if self._shutdown:
await self._publish_loop_completed(stream_id, node_id, iteration + 1)
latency_ms = int((time.time() - start_time) * 1000)
@@ -531,9 +576,15 @@ class EventLoopNode(NodeProtocol):
output=accumulator.to_dict(),
tokens_used=total_input_tokens + total_output_tokens,
latency_ms=latency_ms,
conversation=conversation if _is_continuous else None,
)
logger.info("[%s] iter=%d: blocking for user input...", node_id, iteration)
logger.info(
"[%s] iter=%d: blocking for user input (auto=%s)...",
node_id,
iteration,
_cf_auto,
)
got_input = await self._await_user_input(ctx)
logger.info("[%s] iter=%d: unblocked, got_input=%s", node_id, iteration, got_input)
if not got_input:
@@ -575,11 +626,34 @@ class EventLoopNode(NodeProtocol):
output=accumulator.to_dict(),
tokens_used=total_input_tokens + total_output_tokens,
latency_ms=latency_ms,
conversation=conversation if _is_continuous else None,
)
user_interaction_count += 1
recent_responses.clear()
# Fall through to judge evaluation (6i)
if _cf_auto:
# Auto-block: skip judge — let the LLM process the
# user's response on the next turn without confusing
# "missing outputs" feedback injected between the
# assistant's question and the user's answer.
_continue_count += 1
if ctx.runtime_logger:
iter_latency_ms = int((time.time() - iter_start) * 1000)
ctx.runtime_logger.log_step(
node_id=node_id,
node_type="event_loop",
step_index=iteration,
verdict="CONTINUE",
verdict_feedback="Auto-blocked for user input (pre-interaction)",
tool_calls=logged_tool_calls,
llm_text=assistant_text,
input_tokens=turn_tokens.get("input", 0),
output_tokens=turn_tokens.get("output", 0),
latency_ms=iter_latency_ms,
)
continue
# Explicit ask_user: fall through to judge evaluation (6i)
# 6i. Judge evaluation
should_judge = (
@@ -702,6 +776,7 @@ class EventLoopNode(NodeProtocol):
output=accumulator.to_dict(),
tokens_used=total_input_tokens + total_output_tokens,
latency_ms=latency_ms,
conversation=conversation if _is_continuous else None,
)
elif verdict.action == "ESCALATE":
@@ -746,6 +821,7 @@ class EventLoopNode(NodeProtocol):
output=accumulator.to_dict(),
tokens_used=total_input_tokens + total_output_tokens,
latency_ms=latency_ms,
conversation=conversation if _is_continuous else None,
)
elif verdict.action == "RETRY":
@@ -795,6 +871,7 @@ class EventLoopNode(NodeProtocol):
output=accumulator.to_dict(),
tokens_used=total_input_tokens + total_output_tokens,
latency_ms=latency_ms,
conversation=conversation if _is_continuous else None,
)
async def inject_event(self, content: str) -> None:
@@ -819,8 +896,11 @@ class EventLoopNode(NodeProtocol):
async def _await_user_input(self, ctx: NodeContext) -> bool:
"""Block until user input arrives or shutdown is signaled.
Called when a client_facing node explicitly calls ask_user()
an intentional conversational turn boundary.
Called in two situations:
- The LLM explicitly calls ask_user().
- Auto-block: any text-only turn (no real tools, no set_output)
from a client-facing node ensures the user sees and responds
before the judge runs.
Returns True if input arrived, False if shutdown was signaled.
"""
@@ -893,6 +973,21 @@ class EventLoopNode(NodeProtocol):
await self._compact_tiered(ctx, conversation, accumulator)
messages = conversation.to_llm_messages()
# Defensive guard: ensure messages don't end with an assistant
# message. The Anthropic API rejects "assistant message prefill"
# (conversations must end with a user or tool message). This can
# happen after compaction trims messages leaving an assistant tail,
# or when a conversation is inherited without a transition marker
# (e.g. parallel-branch execution).
if messages and messages[-1].get("role") == "assistant":
logger.info(
"[%s] Messages end with assistant — injecting continuation prompt",
node_id,
)
await conversation.add_user_message("[Continue working on your current task.]")
messages = conversation.to_llm_messages()
accumulated_text = ""
tool_calls: list[ToolCallEvent] = []
@@ -1027,7 +1122,25 @@ class EventLoopNode(NodeProtocol):
)
else:
# --- Real tool execution ---
result = await self._execute_tool(tc)
# Guard: detect truncated tool arguments (_raw fallback
# from litellm when json.loads fails on max_tokens hit).
if "_raw" in tc.tool_input:
result = ToolResult(
tool_use_id=tc.tool_use_id,
content=(
f"Tool call to '{tc.tool_name}' failed: your arguments "
"were truncated (hit output token limit). "
"Simplify or shorten your arguments and try again."
),
is_error=True,
)
logger.warning(
"[%s] Blocked truncated _raw tool call: %s",
node_id,
tc.tool_name,
)
else:
result = await self._execute_tool(tc)
result = self._truncate_tool_result(result, tc.tool_name)
tool_entry = {
"tool_use_id": tc.tool_use_id,
@@ -1307,6 +1420,26 @@ class EventLoopNode(NodeProtocol):
f"Use set_output to set at least one of: {output_keys}"
),
)
# Level 2: conversation-aware quality check (if success_criteria set)
if ctx.node_spec.success_criteria and ctx.llm:
from framework.graph.conversation_judge import evaluate_phase_completion
verdict = await evaluate_phase_completion(
llm=ctx.llm,
conversation=conversation,
phase_name=ctx.node_spec.name,
phase_description=ctx.node_spec.description,
success_criteria=ctx.node_spec.success_criteria,
accumulator_state=accumulator.to_dict(),
max_history_tokens=self._config.max_history_tokens,
)
if verdict.action != "ACCEPT":
return JudgeVerdict(
action=verdict.action,
feedback=verdict.feedback or "Phase criteria not met.",
)
return JudgeVerdict(action="ACCEPT")
else:
return JudgeVerdict(
@@ -1332,26 +1465,43 @@ class EventLoopNode(NodeProtocol):
Used in compaction summaries to prevent the LLM from re-calling
tools it already called. Extracts:
- Tool call counts (e.g. "github_list_pull_requests (6x)")
- Tool call details: name, count, and *inputs* for key tools
(search queries, scrape URLs, loaded filenames)
- Files saved via save_data
- Outputs set via set_output
- Errors encountered
"""
tool_counts: dict[str, int] = {}
# Per-tool: list of input summaries (one per call)
tool_calls_detail: dict[str, list[str]] = {}
files_saved: list[str] = []
outputs_set: list[str] = []
errors: list[str] = []
# Tool-specific input extractors: return a short summary string
def _summarize_input(name: str, args: dict) -> str:
if name == "web_search":
return args.get("query", "")
if name == "web_scrape":
return args.get("url", "")
if name == "load_data":
return args.get("filename", "")
if name == "save_data":
return args.get("filename", "")
return ""
for msg in conversation.messages:
if msg.role == "assistant" and msg.tool_calls:
for tc in msg.tool_calls:
func = tc.get("function", {})
name = func.get("name", "unknown")
tool_counts[name] = tool_counts.get(name, 0) + 1
try:
args = json.loads(func.get("arguments", "{}"))
except (json.JSONDecodeError, TypeError):
args = {}
summary = _summarize_input(name, args)
tool_calls_detail.setdefault(name, []).append(summary)
if name == "save_data" and args.get("filename"):
files_saved.append(args["filename"])
if name == "set_output" and args.get("key"):
@@ -1362,9 +1512,18 @@ class EventLoopNode(NodeProtocol):
errors.append(preview)
parts: list[str] = []
if tool_counts:
lines = [f" {n} ({c}x)" for n, c in tool_counts.items()]
parts.append("TOOLS ALREADY CALLED:\n" + "\n".join(lines[:max_entries]))
if tool_calls_detail:
lines: list[str] = []
for name, inputs in list(tool_calls_detail.items())[:max_entries]:
count = len(inputs)
# Include input details for tools where inputs matter
non_empty = [s for s in inputs if s]
if non_empty:
detail_lines = [f" - {s[:120]}" for s in non_empty[:8]]
lines.append(f" {name} ({count}x):\n" + "\n".join(detail_lines))
else:
lines.append(f" {name} ({count}x)")
parts.append("TOOLS ALREADY CALLED:\n" + "\n".join(lines))
if files_saved:
unique = list(dict.fromkeys(files_saved))
parts.append("FILES SAVED: " + ", ".join(unique))
@@ -1453,6 +1612,12 @@ class EventLoopNode(NodeProtocol):
if limit <= 0 or result.is_error or len(result.content) <= limit:
return result
# load_data is the designated mechanism for reading spilled files.
# The LLM controls chunk size via offset/limit — re-spilling its
# result would create a circular loop.
if tool_name == "load_data":
return result
# Determine a preview size — leave room for the metadata wrapper
preview_chars = max(limit - 300, limit // 2)
preview = result.content[:preview_chars]
@@ -1698,12 +1863,28 @@ class EventLoopNode(NodeProtocol):
if spec.tools:
parts.append(f"AVAILABLE TOOLS: {', '.join(spec.tools)}")
# 5. Spillover files hint
# 5. Spillover files — list actual files so the LLM can load
# them immediately instead of having to call list_data_files first.
if self._config.spillover_dir:
parts.append(
"NOTE: Large tool results were saved to files. "
"Use load_data(filename='<filename>') to read them."
)
try:
from pathlib import Path
data_dir = Path(self._config.spillover_dir)
if data_dir.is_dir():
files = sorted(f.name for f in data_dir.iterdir() if f.is_file())
if files:
file_list = "\n".join(f" - {f}" for f in files[:30])
parts.append("DATA FILES (use load_data to read):\n" + file_list)
else:
parts.append(
"NOTE: Large tool results may have been saved to files. "
"Use list_data_files() to check."
)
except Exception:
parts.append(
"NOTE: Large tool results were saved to files. "
"Use load_data(filename='<filename>') to read them."
)
# 6. Tool call history (prevent re-calling tools)
if conversation is not None:
+151 -7
View File
@@ -186,6 +186,52 @@ class GraphExecutor:
# Pause/resume control
self._pause_requested = asyncio.Event()
def _write_progress(
self,
current_node: str,
path: list[str],
memory: Any,
node_visit_counts: dict[str, int],
) -> None:
"""Update state.json with live progress at node transitions.
Reads the existing state.json (written by ExecutionStream at session
start) and patches the progress fields in-place. This keeps
state.json as the single source of truth readers always see
current progress, not stale initial values.
The write is synchronous and best-effort: never blocks execution.
"""
if not self._storage_path:
return
try:
import json as _json
from datetime import datetime
state_path = self._storage_path / "state.json"
if state_path.exists():
state_data = _json.loads(state_path.read_text(encoding="utf-8"))
else:
state_data = {}
# Patch progress fields
progress = state_data.setdefault("progress", {})
progress["current_node"] = current_node
progress["path"] = list(path)
progress["node_visit_counts"] = dict(node_visit_counts)
progress["steps_executed"] = len(path)
# Update timestamp
timestamps = state_data.setdefault("timestamps", {})
timestamps["updated_at"] = datetime.now().isoformat()
# Memory keys (lightweight — just keys, not values)
state_data["memory_keys"] = list(memory.read_all().keys())
state_path.write_text(_json.dumps(state_data, indent=2), encoding="utf-8")
except Exception:
pass # Best-effort — never block execution
def _validate_tools(self, graph: GraphSpec) -> list[str]:
"""
Validate that all tools declared by nodes are available.
@@ -257,6 +303,12 @@ class GraphExecutor:
# Initialize execution state
memory = SharedMemory()
# Continuous conversation mode state
is_continuous = getattr(graph, "conversation_mode", "isolated") == "continuous"
continuous_conversation = None # NodeConversation threaded across nodes
cumulative_tools: list = [] # Tools accumulate, never removed
cumulative_tool_names: set[str] = set()
# Initialize checkpoint store if checkpointing is enabled
checkpoint_store: CheckpointStore | None = None
if checkpoint_config and checkpoint_config.enabled and self._storage_path:
@@ -273,13 +325,20 @@ class GraphExecutor:
f"{type(memory_data).__name__}, expected dict"
)
else:
# Restore memory from previous session
# Restore memory from previous session.
# Skip validation — this data was already validated when
# originally written, and research text triggers false
# positives on the code-indicator heuristic.
for key, value in memory_data.items():
memory.write(key, value)
memory.write(key, value, validate=False)
self.logger.info(f"📥 Restored session state with {len(memory_data)} memory keys")
# Write new input data to memory (each key individually)
if input_data:
# Write new input data to memory (each key individually).
# Skip when resuming from a paused session — restored memory already
# contains all state including the original input, and re-writing
# input_data would overwrite intermediate results with stale values.
_is_resuming = bool(session_state and session_state.get("paused_at"))
if input_data and not _is_resuming:
for key, value in input_data.items():
memory.write(key, value)
@@ -530,6 +589,13 @@ class GraphExecutor:
self.logger.info(f" Inputs: {node_spec.input_keys}")
self.logger.info(f" Outputs: {node_spec.output_keys}")
# Continuous mode: accumulate tools from this node
if is_continuous and node_spec.tools:
for t in self.tools:
if t.name in node_spec.tools and t.name not in cumulative_tool_names:
cumulative_tools.append(t)
cumulative_tool_names.add(t.name)
# Build context for node
ctx = self._build_context(
node_spec=node_spec,
@@ -537,6 +603,9 @@ class GraphExecutor:
goal=goal,
input_data=input_data or {},
max_tokens=graph.max_tokens,
continuous_mode=is_continuous,
inherited_conversation=continuous_conversation if is_continuous else None,
override_tools=cumulative_tools if is_continuous else None,
)
# Log actual input data being read
@@ -763,6 +832,7 @@ class GraphExecutor:
"memory": saved_memory,
"execution_path": list(path),
"node_visit_counts": dict(node_visit_counts),
"resume_from": current_node_id,
}
return ExecutionResult(
@@ -843,6 +913,7 @@ class GraphExecutor:
# Router explicitly set next node
self.logger.info(f" → Router directing to: {result.next_node}")
current_node_id = result.next_node
self._write_progress(current_node_id, path, memory, node_visit_counts)
else:
# Get all traversable edges for fan-out detection
traversable_edges = self._get_all_traversable_edges(
@@ -886,6 +957,7 @@ class GraphExecutor:
if fan_in_node:
self.logger.info(f" ⑃ Fan-in: converging at {fan_in_node}")
current_node_id = fan_in_node
self._write_progress(current_node_id, path, memory, node_visit_counts)
else:
# No convergence point - branches are terminal
self.logger.info(" → Parallel branches completed (no convergence)")
@@ -940,6 +1012,68 @@ class GraphExecutor:
current_node_id = next_node
# Write progress snapshot at node transition
self._write_progress(current_node_id, path, memory, node_visit_counts)
# Continuous mode: thread conversation forward with transition marker
if is_continuous and result.conversation is not None:
continuous_conversation = result.conversation
# Look up the next node spec for the transition marker
next_spec = graph.get_node(current_node_id)
if next_spec and next_spec.node_type == "event_loop":
from framework.graph.prompt_composer import (
build_narrative,
build_transition_marker,
compose_system_prompt,
)
# Build Layer 2 (narrative) from current state
narrative = build_narrative(memory, path, graph)
# Compose new system prompt (Layer 1 + 2 + 3)
new_system = compose_system_prompt(
identity_prompt=getattr(graph, "identity_prompt", None),
focus_prompt=next_spec.system_prompt,
narrative=narrative,
)
continuous_conversation.update_system_prompt(new_system)
# Switch conversation store to the next node's directory
# so the transition marker and all subsequent messages are
# persisted there instead of the first node's directory.
if self._storage_path:
from framework.storage.conversation_store import (
FileConversationStore,
)
next_store_path = self._storage_path / "conversations" / next_spec.id
next_store = FileConversationStore(base_path=next_store_path)
await continuous_conversation.switch_store(next_store)
# Insert transition marker into conversation
data_dir = str(self._storage_path / "data") if self._storage_path else None
marker = build_transition_marker(
previous_node=node_spec,
next_node=next_spec,
memory=memory,
cumulative_tool_names=sorted(cumulative_tool_names),
data_dir=data_dir,
)
await continuous_conversation.add_user_message(
marker,
is_transition_marker=True,
)
# Set current phase for phase-aware compaction
continuous_conversation.set_current_phase(next_spec.id)
# Opportunistic compaction at transition
if continuous_conversation.usage_ratio() > 0.5:
await continuous_conversation.prune_old_tool_results(
protect_tokens=2000,
)
# Update input_data for next node
input_data = result.output
@@ -1082,6 +1216,7 @@ class GraphExecutor:
"memory": saved_memory,
"execution_path": list(path),
"node_visit_counts": dict(node_visit_counts),
"resume_from": current_node_id,
}
# Mark latest checkpoint for resume on failure
@@ -1134,12 +1269,19 @@ class GraphExecutor:
goal: Goal,
input_data: dict[str, Any],
max_tokens: int = 4096,
continuous_mode: bool = False,
inherited_conversation: Any = None,
override_tools: list | None = None,
) -> NodeContext:
"""Build execution context for a node."""
# Filter tools to those available to this node
available_tools = []
if node_spec.tools:
available_tools = [t for t in self.tools if t.name in node_spec.tools]
if override_tools is not None:
# Continuous mode: use cumulative tool set
available_tools = list(override_tools)
else:
available_tools = []
if node_spec.tools:
available_tools = [t for t in self.tools if t.name in node_spec.tools]
# Create scoped memory view
scoped_memory = memory.with_permissions(
@@ -1160,6 +1302,8 @@ class GraphExecutor:
max_tokens=max_tokens,
runtime_logger=self.runtime_logger,
pause_event=self._pause_requested, # Pass pause event for granular control
continuous_mode=continuous_mode,
inherited_conversation=inherited_conversation,
)
# Valid node types - no ambiguous "llm" type allowed
+17
View File
@@ -238,6 +238,16 @@ class NodeSpec(BaseModel):
description="If True, this node streams output to the end user and can request input.",
)
# Phase completion criteria for conversation-aware judge (Level 2)
success_criteria: str | None = Field(
default=None,
description=(
"Natural-language criteria for phase completion. When set, the "
"implicit judge upgrades to Level 2: after output keys are satisfied, "
"a fast LLM evaluates whether the conversation meets these criteria."
),
)
model_config = {"extra": "allow", "arbitrary_types_allowed": True}
@@ -483,6 +493,10 @@ class NodeContext:
# Pause control (optional) - asyncio.Event for pause requests
pause_event: Any = None # asyncio.Event | None
# Continuous conversation mode
continuous_mode: bool = False # True when graph has conversation_mode="continuous"
inherited_conversation: Any = None # NodeConversation | None (from prior node)
@dataclass
class NodeResult:
@@ -511,6 +525,9 @@ class NodeResult:
# Pydantic validation errors (if any)
validation_errors: list[str] = field(default_factory=list)
# Continuous conversation mode: return conversation for threading to next node
conversation: Any = None # NodeConversation | None
def to_summary(self, node_spec: Any = None) -> str:
"""
Generate a human-readable summary of this node's execution and output.
+185
View File
@@ -0,0 +1,185 @@
"""Prompt composition for continuous agent mode.
Composes the three-layer system prompt (onion model) and generates
transition markers inserted into the conversation at phase boundaries.
Layer 1 Identity (static, defined at agent level, never changes):
"You are a thorough research agent. You prefer clarity over jargon..."
Layer 2 Narrative (auto-generated from conversation/memory state):
"We've finished scoping the project. The user wants to focus on..."
Layer 3 Focus (per-node system_prompt, reframed as focus directive):
"Your current attention: synthesize findings into a report..."
"""
from __future__ import annotations
import logging
from pathlib import Path
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from framework.graph.edge import GraphSpec
from framework.graph.node import NodeSpec, SharedMemory
logger = logging.getLogger(__name__)
def compose_system_prompt(
identity_prompt: str | None,
focus_prompt: str | None,
narrative: str | None = None,
) -> str:
"""Compose the three-layer system prompt.
Args:
identity_prompt: Layer 1 static agent identity (from GraphSpec).
focus_prompt: Layer 3 per-node focus directive (from NodeSpec.system_prompt).
narrative: Layer 2 auto-generated from conversation state.
Returns:
Composed system prompt with all layers present.
"""
parts: list[str] = []
# Layer 1: Identity (always first, anchors the personality)
if identity_prompt:
parts.append(identity_prompt)
# Layer 2: Narrative (what's happened so far)
if narrative:
parts.append(f"\n--- Context (what has happened so far) ---\n{narrative}")
# Layer 3: Focus (current phase directive)
if focus_prompt:
parts.append(f"\n--- Current Focus ---\n{focus_prompt}")
return "\n".join(parts) if parts else ""
def build_narrative(
memory: SharedMemory,
execution_path: list[str],
graph: GraphSpec,
) -> str:
"""Build Layer 2 (narrative) from structured state.
Deterministic no LLM call. Reads SharedMemory and execution path
to describe what has happened so far. Cheap and fast.
Args:
memory: Current shared memory state.
execution_path: List of node IDs visited so far.
graph: Graph spec (for node names/descriptions).
Returns:
Narrative string describing the session state.
"""
parts: list[str] = []
# Describe execution path
if execution_path:
phase_descriptions: list[str] = []
for node_id in execution_path:
node_spec = graph.get_node(node_id)
if node_spec:
phase_descriptions.append(f"- {node_spec.name}: {node_spec.description}")
else:
phase_descriptions.append(f"- {node_id}")
parts.append("Phases completed:\n" + "\n".join(phase_descriptions))
# Describe key memory values (skip very long values)
all_memory = memory.read_all()
if all_memory:
memory_lines: list[str] = []
for key, value in all_memory.items():
if value is None:
continue
val_str = str(value)
if len(val_str) > 200:
val_str = val_str[:200] + "..."
memory_lines.append(f"- {key}: {val_str}")
if memory_lines:
parts.append("Current state:\n" + "\n".join(memory_lines))
return "\n\n".join(parts) if parts else ""
def build_transition_marker(
previous_node: NodeSpec,
next_node: NodeSpec,
memory: SharedMemory,
cumulative_tool_names: list[str],
data_dir: Path | str | None = None,
) -> str:
"""Build a 'State of the World' transition marker.
Inserted into the conversation as a user message at phase boundaries.
Gives the LLM full situational awareness: what happened, what's stored,
what tools are available, and what to focus on next.
Args:
previous_node: NodeSpec of the phase just completed.
next_node: NodeSpec of the phase about to start.
memory: Current shared memory state.
cumulative_tool_names: All tools available (cumulative set).
data_dir: Path to spillover data directory.
Returns:
Transition marker message text.
"""
sections: list[str] = []
# Header
sections.append(f"--- PHASE TRANSITION: {previous_node.name}{next_node.name} ---")
# What just completed
sections.append(f"\nCompleted: {previous_node.name}")
sections.append(f" {previous_node.description}")
# Outputs in memory
all_memory = memory.read_all()
if all_memory:
memory_lines: list[str] = []
for key, value in all_memory.items():
if value is None:
continue
val_str = str(value)
if len(val_str) > 300:
val_str = val_str[:300] + "..."
memory_lines.append(f" {key}: {val_str}")
if memory_lines:
sections.append("\nOutputs available:\n" + "\n".join(memory_lines))
# Files in data directory
if data_dir:
data_path = Path(data_dir)
if data_path.exists():
files = sorted(data_path.iterdir())
if files:
file_lines = [
f" {f.name} ({f.stat().st_size:,} bytes)" for f in files if f.is_file()
]
if file_lines:
sections.append(
"\nData files (use load_data to access):\n" + "\n".join(file_lines)
)
# Available tools
if cumulative_tool_names:
sections.append("\nAvailable tools: " + ", ".join(sorted(cumulative_tool_names)))
# Next phase
sections.append(f"\nNow entering: {next_node.name}")
sections.append(f" {next_node.description}")
# Reflection prompt (engineered metacognition)
sections.append(
"\nBefore proceeding, briefly reflect: what went well in the "
"previous phase? Are there any gaps or surprises worth noting?"
)
sections.append("\n--- END TRANSITION ---")
return "\n".join(sections)
+1 -1
View File
@@ -145,7 +145,7 @@ class SafeEvalVisitor(ast.NodeVisitor):
def visit_Attribute(self, node: ast.Attribute) -> Any:
# value.attr
# STIRCT CHECK: No access to private attributes (starting with _)
# STRICT CHECK: No access to private attributes (starting with _)
if node.attr.startswith("_"):
raise ValueError(f"Access to private attribute '{node.attr}' is not allowed")
+2
View File
@@ -70,6 +70,7 @@ class AnthropicProvider(LLMProvider):
max_tokens: int = 1024,
response_format: dict[str, Any] | None = None,
json_mode: bool = False,
max_retries: int | None = None,
) -> LLMResponse:
"""Generate a completion from Claude (via LiteLLM)."""
return self._provider.complete(
@@ -79,6 +80,7 @@ class AnthropicProvider(LLMProvider):
max_tokens=max_tokens,
response_format=response_format,
json_mode=json_mode,
max_retries=max_retries,
)
def complete_with_tools(
+13 -9
View File
@@ -150,10 +150,13 @@ class LiteLLMProvider(LLMProvider):
"LiteLLM is not installed. Please install it with: uv pip install litellm"
)
def _completion_with_rate_limit_retry(self, **kwargs: Any) -> Any:
def _completion_with_rate_limit_retry(
self, max_retries: int | None = None, **kwargs: Any
) -> Any:
"""Call litellm.completion with retry on 429 rate limit errors and empty responses."""
model = kwargs.get("model", self.model)
for attempt in range(RATE_LIMIT_MAX_RETRIES + 1):
retries = max_retries if max_retries is not None else RATE_LIMIT_MAX_RETRIES
for attempt in range(retries + 1):
try:
response = litellm.completion(**kwargs) # type: ignore[union-attr]
@@ -194,9 +197,9 @@ class LiteLLMProvider(LLMProvider):
f"Full request dumped to: {dump_path}"
)
if attempt == RATE_LIMIT_MAX_RETRIES:
if attempt == retries:
logger.error(
f"[retry] GAVE UP on {model} after {RATE_LIMIT_MAX_RETRIES + 1} "
f"[retry] GAVE UP on {model} after {retries + 1} "
f"attempts — empty response "
f"(finish_reason={finish_reason}, "
f"choices={len(response.choices) if response.choices else 0})"
@@ -209,7 +212,7 @@ class LiteLLMProvider(LLMProvider):
f"choices={len(response.choices) if response.choices else 0}) — "
f"likely rate limited or quota exceeded. "
f"Retrying in {wait}s "
f"(attempt {attempt + 1}/{RATE_LIMIT_MAX_RETRIES})"
f"(attempt {attempt + 1}/{retries})"
)
time.sleep(wait)
continue
@@ -225,9 +228,9 @@ class LiteLLMProvider(LLMProvider):
error_type="rate_limit",
attempt=attempt,
)
if attempt == RATE_LIMIT_MAX_RETRIES:
if attempt == retries:
logger.error(
f"[retry] GAVE UP on {model} after {RATE_LIMIT_MAX_RETRIES + 1} "
f"[retry] GAVE UP on {model} after {retries + 1} "
f"attempts — rate limit error: {e!s}. "
f"~{token_count} tokens ({token_method}). "
f"Full request dumped to: {dump_path}"
@@ -239,7 +242,7 @@ class LiteLLMProvider(LLMProvider):
f"~{token_count} tokens ({token_method}). "
f"Full request dumped to: {dump_path}. "
f"Retrying in {wait}s "
f"(attempt {attempt + 1}/{RATE_LIMIT_MAX_RETRIES})"
f"(attempt {attempt + 1}/{retries})"
)
time.sleep(wait)
# unreachable, but satisfies type checker
@@ -253,6 +256,7 @@ class LiteLLMProvider(LLMProvider):
max_tokens: int = 1024,
response_format: dict[str, Any] | None = None,
json_mode: bool = False,
max_retries: int | None = None,
) -> LLMResponse:
"""Generate a completion using LiteLLM."""
# Prepare messages with system prompt
@@ -293,7 +297,7 @@ class LiteLLMProvider(LLMProvider):
kwargs["response_format"] = response_format
# Make the call
response = self._completion_with_rate_limit_retry(**kwargs)
response = self._completion_with_rate_limit_retry(max_retries=max_retries, **kwargs)
# Extract content
content = response.choices[0].message.content or ""
+1
View File
@@ -120,6 +120,7 @@ class MockLLMProvider(LLMProvider):
max_tokens: int = 1024,
response_format: dict[str, Any] | None = None,
json_mode: bool = False,
max_retries: int | None = None,
) -> LLMResponse:
"""
Generate a mock completion without calling a real LLM.
+3
View File
@@ -65,6 +65,7 @@ class LLMProvider(ABC):
max_tokens: int = 1024,
response_format: dict[str, Any] | None = None,
json_mode: bool = False,
max_retries: int | None = None,
) -> LLMResponse:
"""
Generate a completion from the LLM.
@@ -79,6 +80,8 @@ class LLMProvider(ABC):
- {"type": "json_schema", "json_schema": {"name": "...", "schema": {...}}}
for strict JSON schema enforcement
json_mode: If True, request structured JSON output from the LLM
max_retries: Override retry count for rate-limit/empty-response retries.
None uses the provider default.
Returns:
LLMResponse with content and metadata
+2
View File
@@ -362,6 +362,7 @@ def _load_resume_state(
except (json.JSONDecodeError, OSError):
return None
return {
"resume_session_id": session_id,
"memory": cp_data.get("shared_memory", {}),
"paused_at": cp_data.get("next_node") or cp_data.get("current_node"),
"execution_path": cp_data.get("execution_path", []),
@@ -379,6 +380,7 @@ def _load_resume_state(
progress = state_data.get("progress", {})
paused_at = progress.get("paused_at") or progress.get("resume_from")
return {
"resume_session_id": session_id,
"memory": state_data.get("memory", {}),
"paused_at": paused_at,
"execution_path": progress.get("path", []),
+3 -1
View File
@@ -686,7 +686,9 @@ class AgentRunner:
else:
# Fall back to environment variable
# First check api_key_env_var from config (set by quickstart)
api_key_env = llm_config.get("api_key_env_var") or self._get_api_key_env_var(self.model)
api_key_env = llm_config.get("api_key_env_var") or self._get_api_key_env_var(
self.model
)
if api_key_env and os.environ.get(api_key_env):
self._llm = LiteLLMProvider(model=self.model)
else:
+23 -4
View File
@@ -293,8 +293,13 @@ class ExecutionStream:
if not self._running:
raise RuntimeError(f"ExecutionStream '{self.stream_id}' is not running")
# Generate execution ID using unified session format
if self._session_store:
# When resuming, reuse the original session ID so the execution
# continues in the same session directory instead of creating a new one.
resume_session_id = session_state.get("resume_session_id") if session_state else None
if resume_session_id:
execution_id = resume_session_id
elif self._session_store:
execution_id = self._session_store.generate_session_id()
else:
# Fallback to old format if SessionStore not available (shouldn't happen)
@@ -597,10 +602,22 @@ class ExecutionStream:
entry_point=self.entry_spec.id,
)
else:
# Create initial state
from framework.schemas.session_state import SessionTimestamps
# Create initial state — when resuming, preserve the previous
# execution's progress so crashes don't lose track of state.
from framework.schemas.session_state import (
SessionProgress,
SessionTimestamps,
)
now = datetime.now().isoformat()
ss = ctx.session_state or {}
progress = SessionProgress(
current_node=ss.get("paused_at") or ss.get("resume_from"),
paused_at=ss.get("paused_at"),
resume_from=ss.get("paused_at") or ss.get("resume_from"),
path=ss.get("execution_path", []),
node_visit_counts=ss.get("node_visit_counts", {}),
)
state = SessionState(
session_id=execution_id,
stream_id=self.stream_id,
@@ -613,6 +630,8 @@ class ExecutionStream:
started_at=ctx.started_at.isoformat(),
updated_at=now,
),
progress=progress,
memory=ss.get("memory", {}),
input_data=ctx.input_data,
)
+8 -2
View File
@@ -156,8 +156,14 @@ class SessionState(BaseModel):
@computed_field
@property
def is_resumable(self) -> bool:
"""Can this session be resumed?"""
return self.status == SessionStatus.PAUSED and self.progress.resume_from is not None
"""Can this session be resumed?
A session is resumable when it stopped mid-execution (paused or
failed) and we know which node to resume from.
"""
if self.status not in (SessionStatus.PAUSED, SessionStatus.FAILED):
return False
return self.progress.resume_from is not None or self.progress.paused_at is not None
@computed_field
@property
+7 -14
View File
@@ -6,7 +6,7 @@ import time
from textual.app import App, ComposeResult
from textual.binding import Binding
from textual.containers import Container, Horizontal, Vertical
from textual.widgets import Footer, Input, Label
from textual.widgets import Footer, Label
from framework.runtime.agent_runtime import AgentRuntime
from framework.runtime.event_bus import AgentEvent, EventType
@@ -180,13 +180,13 @@ class AdenTUI(App):
scrollbar-color: $primary;
}
Input {
ChatTextArea {
background: $surface;
border: tall $primary;
margin-top: 1;
}
Input:focus {
ChatTextArea:focus {
border: tall $accent;
}
@@ -504,8 +504,8 @@ class AdenTUI(App):
original_chat_border = chat_widget.styles.border_left
chat_widget.styles.border_left = ("none", "transparent")
# Hide all Input widget borders
input_widgets = self.query("Input")
# Hide all TextArea widget borders
input_widgets = self.query("ChatTextArea")
original_input_borders = []
for input_widget in input_widgets:
original_input_borders.append(input_widget.styles.border)
@@ -575,19 +575,12 @@ class AdenTUI(App):
timeout=5,
)
def action_show_sessions(self) -> None:
async def action_show_sessions(self) -> None:
"""Show sessions list (bound to Ctrl+R)."""
# Send /sessions command to chat input
try:
chat_repl = self.query_one(ChatRepl)
chat_input = chat_repl.query_one("#chat-input", Input)
chat_input.value = "/sessions"
# Trigger submission
self.notify(
"💡 Type /sessions in the chat to see all sessions",
severity="information",
timeout=3,
)
await chat_repl._submit_input("/sessions")
except Exception:
self.notify(
"Use /sessions command to see all sessions",
+55 -21
View File
@@ -22,12 +22,38 @@ from typing import Any
from textual.app import ComposeResult
from textual.containers import Vertical
from textual.widgets import Input, Label
from textual.message import Message
from textual.widgets import Label, TextArea
from framework.runtime.agent_runtime import AgentRuntime
from framework.tui.widgets.selectable_rich_log import SelectableRichLog as RichLog
class ChatTextArea(TextArea):
"""TextArea that submits on Enter and inserts newlines on Shift+Enter."""
class Submitted(Message):
"""Posted when the user presses Enter."""
def __init__(self, text: str) -> None:
super().__init__()
self.text = text
async def _on_key(self, event) -> None:
if event.key == "enter":
text = self.text.strip()
self.clear()
if text:
self.post_message(self.Submitted(text))
event.stop()
event.prevent_default()
elif event.key == "shift+enter":
event.key = "enter"
await super()._on_key(event)
else:
await super()._on_key(event)
class ChatRepl(Vertical):
"""Widget for interactive chat/REPL."""
@@ -56,16 +82,17 @@ class ChatRepl(Vertical):
display: none;
}
ChatRepl > Input {
ChatRepl > ChatTextArea {
width: 100%;
height: auto;
max-height: 7;
dock: bottom;
background: $surface;
border: tall $primary;
margin-top: 1;
}
ChatRepl > Input:focus {
ChatRepl > ChatTextArea:focus {
border: tall $accent;
}
"""
@@ -108,7 +135,7 @@ class ChatRepl(Vertical):
min_width=0,
)
yield Label("Agent is processing...", id="processing-indicator")
yield Input(placeholder="Enter input for agent...", id="chat-input")
yield ChatTextArea(id="chat-input", placeholder="Enter input for agent...")
# Regex for file:// URIs that are NOT already inside Rich [link=...] markup
_FILE_URI_RE = re.compile(r"(?<!\[link=)(file://[^\s)\]>*]+)")
@@ -451,6 +478,7 @@ class ChatRepl(Vertical):
if paused_at:
# Has paused_at - resume from there
resume_session_state = {
"resume_session_id": session_id,
"paused_at": paused_at,
"memory": state.get("memory", {}),
"execution_path": progress.get("path", []),
@@ -458,8 +486,13 @@ class ChatRepl(Vertical):
}
resume_info = f"From node: [cyan]{paused_at}[/cyan]"
else:
# No paused_at - just retry with same input
resume_session_state = {}
# No paused_at - retry with same input but reuse session directory
resume_session_state = {
"resume_session_id": session_id,
"memory": state.get("memory", {}),
"execution_path": progress.get("path", []),
"node_visit_counts": progress.get("node_visit_counts", {}),
}
resume_info = "Retrying with same input"
# Display resume info
@@ -485,7 +518,7 @@ class ChatRepl(Vertical):
indicator.display = True
# Update placeholder
chat_input = self.query_one("#chat-input", Input)
chat_input = self.query_one("#chat-input", ChatTextArea)
chat_input.placeholder = "Commands: /pause, /sessions (agent resuming...)"
# Trigger execution with resume state
@@ -563,6 +596,7 @@ class ChatRepl(Vertical):
# Create session_state for checkpoint recovery
recover_session_state = {
"resume_session_id": session_id,
"resume_from_checkpoint": checkpoint_id,
}
@@ -572,7 +606,7 @@ class ChatRepl(Vertical):
indicator.display = True
# Update placeholder
chat_input = self.query_one("#chat-input", Input)
chat_input = self.query_one("#chat-input", ChatTextArea)
chat_input.placeholder = "Commands: /pause, /sessions (agent recovering...)"
# Trigger execution with checkpoint recovery
@@ -739,9 +773,12 @@ class ChatRepl(Vertical):
# Silently fail - don't block TUI startup
pass
async def on_input_submitted(self, message: Input.Submitted) -> None:
"""Handle input submission — either start new execution or inject input."""
user_input = message.value.strip()
async def on_chat_text_area_submitted(self, message: ChatTextArea.Submitted) -> None:
"""Handle chat input submission."""
await self._submit_input(message.text)
async def _submit_input(self, user_input: str) -> None:
"""Handle submitted text — either start new execution or inject input."""
if not user_input:
return
@@ -749,16 +786,14 @@ class ChatRepl(Vertical):
# Commands work during execution, during client-facing input, anytime
if user_input.startswith("/"):
await self._handle_command(user_input)
message.input.value = ""
return
# Client-facing input: route to the waiting node
if self._waiting_for_input and self._input_node_id:
self._write_history(f"[bold green]You:[/bold green] {user_input}")
message.input.value = ""
# Keep input enabled for commands (but change placeholder)
chat_input = self.query_one("#chat-input", Input)
chat_input = self.query_one("#chat-input", ChatTextArea)
chat_input.placeholder = "Commands: /pause, /sessions (agent processing...)"
self._waiting_for_input = False
@@ -785,9 +820,8 @@ class ChatRepl(Vertical):
indicator = self.query_one("#processing-indicator", Label)
# Append user message and clear input
# Append user message
self._write_history(f"[bold green]You:[/bold green] {user_input}")
message.input.value = ""
try:
# Get entry point
@@ -813,7 +847,7 @@ class ChatRepl(Vertical):
indicator.display = True
# Keep input enabled for commands during execution
chat_input = self.query_one("#chat-input", Input)
chat_input = self.query_one("#chat-input", ChatTextArea)
chat_input.placeholder = "Commands available: /pause, /sessions, /help"
# Submit execution to the dedicated agent loop so blocking
@@ -834,7 +868,7 @@ class ChatRepl(Vertical):
indicator.display = False
self._current_exec_id = None
# Re-enable input on error
chat_input = self.query_one("#chat-input", Input)
chat_input = self.query_one("#chat-input", ChatTextArea)
chat_input.disabled = False
self._write_history(f"[bold red]Error:[/bold red] {e}")
@@ -910,7 +944,7 @@ class ChatRepl(Vertical):
self._pending_ask_question = ""
# Re-enable input
chat_input = self.query_one("#chat-input", Input)
chat_input = self.query_one("#chat-input", ChatTextArea)
chat_input.disabled = False
chat_input.placeholder = "Enter input for agent..."
chat_input.focus()
@@ -930,7 +964,7 @@ class ChatRepl(Vertical):
self._input_node_id = None
# Re-enable input
chat_input = self.query_one("#chat-input", Input)
chat_input = self.query_one("#chat-input", ChatTextArea)
chat_input.disabled = False
chat_input.placeholder = "Enter input for agent..."
chat_input.focus()
@@ -961,7 +995,7 @@ class ChatRepl(Vertical):
indicator = self.query_one("#processing-indicator", Label)
indicator.update("Waiting for your input...")
chat_input = self.query_one("#chat-input", Input)
chat_input = self.query_one("#chat-input", ChatTextArea)
chat_input.disabled = False
chat_input.placeholder = "Type your response..."
chat_input.focus()
@@ -195,12 +195,27 @@ def _copy_to_clipboard(text: str) -> None:
try:
if sys.platform == "darwin":
subprocess.run(["pbcopy"], input=text.encode(), check=True, timeout=5)
elif sys.platform.startswith("linux"):
elif sys.platform == "win32":
subprocess.run(
["xclip", "-selection", "clipboard"],
input=text.encode(),
["clip.exe"],
input=text.encode("utf-16le"),
check=True,
timeout=5,
)
elif sys.platform.startswith("linux"):
try:
subprocess.run(
["xclip", "-selection", "clipboard"],
input=text.encode(),
check=True,
timeout=5,
)
except (subprocess.SubprocessError, FileNotFoundError):
subprocess.run(
["xsel", "--clipboard", "--input"],
input=text.encode(),
check=True,
timeout=5,
)
except (subprocess.SubprocessError, FileNotFoundError):
pass
+538
View File
@@ -0,0 +1,538 @@
"""Tests for the Continuous Agent architecture (conversation threading + cumulative tools).
Validates:
- conversation_mode="isolated" preserves existing behavior
- conversation_mode="continuous" threads one conversation across nodes
- Transition markers are inserted at phase boundaries
- System prompt updates at each transition (layered prompt composition)
- Tools accumulate across nodes in continuous mode
- prompt_composer functions work correctly
"""
from __future__ import annotations
from collections.abc import AsyncIterator
from typing import Any
from unittest.mock import MagicMock
import pytest
from framework.graph.conversation import NodeConversation
from framework.graph.edge import EdgeCondition, EdgeSpec, GraphSpec
from framework.graph.executor import GraphExecutor
from framework.graph.goal import Goal
from framework.graph.node import NodeResult, NodeSpec, SharedMemory
from framework.graph.prompt_composer import (
build_narrative,
build_transition_marker,
compose_system_prompt,
)
from framework.llm.provider import LLMProvider, LLMResponse, Tool
from framework.llm.stream_events import FinishEvent, TextDeltaEvent, ToolCallEvent
from framework.runtime.core import Runtime
# ---------------------------------------------------------------------------
# Mock LLM
# ---------------------------------------------------------------------------
class MockStreamingLLM(LLMProvider):
"""Mock LLM that yields pre-programmed StreamEvent sequences."""
def __init__(self, scenarios: list[list] | None = None):
self.scenarios = scenarios or []
self._call_index = 0
self.stream_calls: list[dict] = []
async def stream(
self,
messages: list[dict[str, Any]],
system: str = "",
tools: list[Tool] | None = None,
max_tokens: int = 4096,
) -> AsyncIterator:
self.stream_calls.append({"messages": messages, "system": system, "tools": tools})
if not self.scenarios:
return
events = self.scenarios[self._call_index % len(self.scenarios)]
self._call_index += 1
for event in events:
yield event
def complete(self, messages, system="", **kwargs) -> LLMResponse:
return LLMResponse(content="Summary.", model="mock", stop_reason="stop")
def complete_with_tools(self, messages, system, tools, tool_executor, **kwargs) -> LLMResponse:
return LLMResponse(content="", model="mock", stop_reason="stop")
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _set_output_scenario(key: str, value: str) -> list:
"""LLM calls set_output then finishes."""
return [
ToolCallEvent(
tool_use_id=f"call_{key}",
tool_name="set_output",
tool_input={"key": key, "value": value},
),
FinishEvent(stop_reason="tool_calls", input_tokens=10, output_tokens=5, model="mock"),
]
def _text_then_set_output(text: str, key: str, value: str) -> list:
"""LLM produces text, then calls set_output, then finishes (2 turns needed)."""
return [
TextDeltaEvent(content=text, snapshot=text),
ToolCallEvent(
tool_use_id=f"call_{key}",
tool_name="set_output",
tool_input={"key": key, "value": value},
),
FinishEvent(stop_reason="tool_calls", input_tokens=10, output_tokens=5, model="mock"),
]
def _text_finish(text: str) -> list:
"""LLM produces text and stops (triggers judge)."""
return [
TextDeltaEvent(content=text, snapshot=text),
FinishEvent(stop_reason="stop", input_tokens=10, output_tokens=5, model="mock"),
]
def _make_runtime():
rt = MagicMock(spec=Runtime)
rt.start_run = MagicMock(return_value="run_1")
rt.end_run = MagicMock()
rt.report_problem = MagicMock()
rt.decide = MagicMock(return_value="dec_1")
rt.record_outcome = MagicMock()
rt.set_node = MagicMock()
return rt
def _make_goal():
return Goal(id="g1", name="test", description="test goal")
def _make_tool(name: str) -> Tool:
return Tool(
name=name,
description=f"Tool {name}",
parameters={"type": "object", "properties": {}},
)
# ===========================================================================
# prompt_composer unit tests
# ===========================================================================
class TestComposeSystemPrompt:
def test_all_layers(self):
result = compose_system_prompt(
identity_prompt="I am a research agent.",
focus_prompt="Focus on writing the report.",
narrative="We found 5 sources on topic X.",
)
assert "I am a research agent." in result
assert "Focus on writing the report." in result
assert "We found 5 sources on topic X." in result
# Identity comes first
assert result.index("I am a research agent.") < result.index("Focus on writing")
def test_identity_only(self):
result = compose_system_prompt(identity_prompt="I am an agent.", focus_prompt=None)
assert result == "I am an agent."
def test_focus_only(self):
result = compose_system_prompt(identity_prompt=None, focus_prompt="Do the thing.")
assert "Current Focus" in result
assert "Do the thing." in result
def test_empty(self):
result = compose_system_prompt(identity_prompt=None, focus_prompt=None)
assert result == ""
class TestBuildNarrative:
def test_with_execution_path(self):
memory = SharedMemory()
memory.write("findings", "some findings")
node_a = NodeSpec(
id="a", name="Research", description="Research the topic", node_type="event_loop"
)
node_b = NodeSpec(id="b", name="Report", description="Write report", node_type="event_loop")
graph = GraphSpec(
id="g1",
goal_id="g1",
entry_node="a",
nodes=[node_a, node_b],
edges=[],
)
result = build_narrative(memory, ["a"], graph)
assert "Research" in result
assert "findings" in result
def test_empty_state(self):
memory = SharedMemory()
graph = GraphSpec(id="g1", goal_id="g1", entry_node="a", nodes=[], edges=[])
result = build_narrative(memory, [], graph)
assert result == ""
class TestBuildTransitionMarker:
def test_basic_marker(self):
prev = NodeSpec(
id="research", name="Research", description="Find sources", node_type="event_loop"
)
next_n = NodeSpec(
id="report", name="Report", description="Write report", node_type="event_loop"
)
memory = SharedMemory()
memory.write("findings", "important stuff")
marker = build_transition_marker(
previous_node=prev,
next_node=next_n,
memory=memory,
cumulative_tool_names=["web_search", "save_data"],
)
assert "PHASE TRANSITION" in marker
assert "Research" in marker
assert "Report" in marker
assert "findings" in marker
assert "web_search" in marker
assert "reflect" in marker.lower()
# ===========================================================================
# NodeConversation.update_system_prompt
# ===========================================================================
class TestUpdateSystemPrompt:
def test_update(self):
conv = NodeConversation(system_prompt="original")
assert conv.system_prompt == "original"
conv.update_system_prompt("updated")
assert conv.system_prompt == "updated"
# ===========================================================================
# Conversation threading through executor
# ===========================================================================
class TestContinuousConversation:
"""Test that conversation_mode='continuous' threads a single conversation."""
@pytest.mark.asyncio
async def test_isolated_mode_no_conversation_in_result(self):
"""In isolated mode, NodeResult.conversation should be None."""
runtime = _make_runtime()
llm = MockStreamingLLM(
scenarios=[
_set_output_scenario("result", "done"),
_text_finish("accepted"),
]
)
spec = NodeSpec(
id="n1",
name="Node1",
description="test",
node_type="event_loop",
output_keys=["result"],
)
graph = GraphSpec(
id="g1",
goal_id="g1",
entry_node="n1",
nodes=[spec],
edges=[],
conversation_mode="isolated",
)
executor = GraphExecutor(runtime=runtime, llm=llm)
result = await executor.execute(graph=graph, goal=_make_goal())
assert result.success
@pytest.mark.asyncio
async def test_continuous_threads_conversation(self):
"""In continuous mode, second node sees messages from first node."""
runtime = _make_runtime()
# Node A: set_output("brief", "the brief"), then finish (accept)
# Node B: set_output("report", "the report"), then finish (accept)
llm = MockStreamingLLM(
scenarios=[
_text_then_set_output("I'll research this.", "brief", "the brief"),
_text_finish(""), # triggers accept for node A (all keys set)
_text_then_set_output("Here's the report.", "report", "the report"),
_text_finish(""), # triggers accept for node B
]
)
node_a = NodeSpec(
id="a",
name="Intake",
description="Gather requirements",
node_type="event_loop",
output_keys=["brief"],
)
node_b = NodeSpec(
id="b",
name="Report",
description="Write report",
node_type="event_loop",
input_keys=["brief"],
output_keys=["report"],
)
graph = GraphSpec(
id="g1",
goal_id="g1",
entry_node="a",
nodes=[node_a, node_b],
edges=[EdgeSpec(id="e1", source="a", target="b", condition=EdgeCondition.ON_SUCCESS)],
terminal_nodes=["b"],
conversation_mode="continuous",
identity_prompt="You are a thorough research agent.",
)
executor = GraphExecutor(runtime=runtime, llm=llm)
result = await executor.execute(graph=graph, goal=_make_goal())
assert result.success
assert result.path == ["a", "b"]
# Verify the LLM saw the identity prompt in system messages
# The second node's system prompt should contain the identity
if len(llm.stream_calls) >= 3:
system_at_node_b = llm.stream_calls[2]["system"]
assert "thorough research agent" in system_at_node_b
@pytest.mark.asyncio
async def test_continuous_transition_marker_present(self):
"""Transition marker should appear in messages when switching nodes."""
runtime = _make_runtime()
llm = MockStreamingLLM(
scenarios=[
_text_then_set_output("Research done.", "brief", "the brief"),
_text_finish(""),
_text_then_set_output("Report done.", "report", "the report"),
_text_finish(""),
]
)
node_a = NodeSpec(
id="a",
name="Research",
description="Do research",
node_type="event_loop",
output_keys=["brief"],
)
node_b = NodeSpec(
id="b",
name="Report",
description="Write report",
node_type="event_loop",
input_keys=["brief"],
output_keys=["report"],
)
graph = GraphSpec(
id="g1",
goal_id="g1",
entry_node="a",
nodes=[node_a, node_b],
edges=[EdgeSpec(id="e1", source="a", target="b", condition=EdgeCondition.ON_SUCCESS)],
terminal_nodes=["b"],
conversation_mode="continuous",
)
executor = GraphExecutor(runtime=runtime, llm=llm)
result = await executor.execute(graph=graph, goal=_make_goal())
assert result.success
# When node B's first LLM call happens, its messages should contain
# the transition marker from the executor
if len(llm.stream_calls) >= 3:
node_b_messages = llm.stream_calls[2]["messages"]
all_content = " ".join(
m.get("content", "") for m in node_b_messages if isinstance(m.get("content"), str)
)
assert "PHASE TRANSITION" in all_content
# ===========================================================================
# Cumulative tools
# ===========================================================================
class TestCumulativeTools:
"""Test that tools accumulate in continuous mode."""
@pytest.mark.asyncio
async def test_isolated_mode_tools_scoped(self):
"""In isolated mode, each node only gets its own declared tools."""
runtime = _make_runtime()
tool_a = _make_tool("web_search")
tool_b = _make_tool("save_data")
llm = MockStreamingLLM(
scenarios=[
_text_then_set_output("Done.", "brief", "brief"),
_text_finish(""),
_text_then_set_output("Done.", "report", "report"),
_text_finish(""),
]
)
node_a = NodeSpec(
id="a",
name="Research",
description="Research",
node_type="event_loop",
output_keys=["brief"],
tools=["web_search"],
)
node_b = NodeSpec(
id="b",
name="Report",
description="Report",
node_type="event_loop",
input_keys=["brief"],
output_keys=["report"],
tools=["save_data"],
)
graph = GraphSpec(
id="g1",
goal_id="g1",
entry_node="a",
nodes=[node_a, node_b],
edges=[EdgeSpec(id="e1", source="a", target="b", condition=EdgeCondition.ON_SUCCESS)],
terminal_nodes=["b"],
conversation_mode="isolated",
)
executor = GraphExecutor(
runtime=runtime,
llm=llm,
tools=[tool_a, tool_b],
)
result = await executor.execute(graph=graph, goal=_make_goal())
assert result.success
# In isolated mode, node B should NOT have web_search
if len(llm.stream_calls) >= 3:
node_b_tools = llm.stream_calls[2].get("tools") or []
tool_names = [t.name for t in node_b_tools]
assert "save_data" in tool_names or "set_output" in tool_names
# web_search should NOT be present (only set_output + save_data)
real_tools = [n for n in tool_names if n != "set_output"]
assert "web_search" not in real_tools
@pytest.mark.asyncio
async def test_continuous_mode_tools_accumulate(self):
"""In continuous mode, node B should have both web_search and save_data."""
runtime = _make_runtime()
tool_a = _make_tool("web_search")
tool_b = _make_tool("save_data")
llm = MockStreamingLLM(
scenarios=[
_text_then_set_output("Done.", "brief", "brief"),
_text_finish(""),
_text_then_set_output("Done.", "report", "report"),
_text_finish(""),
]
)
node_a = NodeSpec(
id="a",
name="Research",
description="Research",
node_type="event_loop",
output_keys=["brief"],
tools=["web_search"],
)
node_b = NodeSpec(
id="b",
name="Report",
description="Report",
node_type="event_loop",
input_keys=["brief"],
output_keys=["report"],
tools=["save_data"],
)
graph = GraphSpec(
id="g1",
goal_id="g1",
entry_node="a",
nodes=[node_a, node_b],
edges=[EdgeSpec(id="e1", source="a", target="b", condition=EdgeCondition.ON_SUCCESS)],
terminal_nodes=["b"],
conversation_mode="continuous",
)
executor = GraphExecutor(
runtime=runtime,
llm=llm,
tools=[tool_a, tool_b],
)
result = await executor.execute(graph=graph, goal=_make_goal())
assert result.success
# In continuous mode, node B should have BOTH tools
if len(llm.stream_calls) >= 3:
node_b_tools = llm.stream_calls[2].get("tools") or []
tool_names = [t.name for t in node_b_tools]
real_tools = [n for n in tool_names if n != "set_output"]
assert "web_search" in real_tools
assert "save_data" in real_tools
# ===========================================================================
# Schema field defaults
# ===========================================================================
class TestSchemaDefaults:
def test_graphspec_defaults(self):
"""New fields should have safe defaults."""
graph = GraphSpec(
id="g1",
goal_id="g1",
entry_node="n1",
nodes=[],
edges=[],
)
assert graph.conversation_mode == "continuous"
assert graph.identity_prompt is None
def test_nodespec_defaults(self):
"""NodeSpec.success_criteria should default to None."""
spec = NodeSpec(
id="n1",
name="test",
description="test",
node_type="event_loop",
)
assert spec.success_criteria is None
def test_noderesult_defaults(self):
"""NodeResult.conversation should default to None."""
result = NodeResult(success=True)
assert result.conversation is None
+380
View File
@@ -0,0 +1,380 @@
"""Tests for Level 2 conversation-aware judge.
Validates:
- No success_criteria Level 0 only (existing behavior)
- success_criteria set, good conversation Level 2 ACCEPT
- success_criteria set, poor conversation Level 2 RETRY with feedback
- Custom explicit judge takes priority over Level 2
- Level 2 fires only when Level 0 passes (all keys set)
- _parse_verdict correctly parses LLM responses
"""
from __future__ import annotations
from collections.abc import AsyncIterator
from typing import Any
from unittest.mock import MagicMock
import pytest
from framework.graph.conversation import NodeConversation
from framework.graph.conversation_judge import (
_parse_verdict,
evaluate_phase_completion,
)
from framework.graph.edge import GraphSpec
from framework.graph.executor import GraphExecutor
from framework.graph.goal import Goal
from framework.graph.node import NodeSpec
from framework.llm.provider import LLMProvider, LLMResponse, Tool
from framework.llm.stream_events import FinishEvent, TextDeltaEvent, ToolCallEvent
from framework.runtime.core import Runtime
# ---------------------------------------------------------------------------
# Mock LLM
# ---------------------------------------------------------------------------
class MockStreamingLLM(LLMProvider):
"""Mock LLM that yields pre-programmed StreamEvent sequences."""
def __init__(self, scenarios: list[list] | None = None, complete_response: str = ""):
self.scenarios = scenarios or []
self._call_index = 0
self.stream_calls: list[dict] = []
self.complete_response = complete_response
self.complete_calls: list[dict] = []
async def stream(
self,
messages: list[dict[str, Any]],
system: str = "",
tools: list[Tool] | None = None,
max_tokens: int = 4096,
) -> AsyncIterator:
self.stream_calls.append({"messages": messages, "system": system, "tools": tools})
if not self.scenarios:
return
events = self.scenarios[self._call_index % len(self.scenarios)]
self._call_index += 1
for event in events:
yield event
def complete(self, messages, system="", **kwargs) -> LLMResponse:
self.complete_calls.append({"messages": messages, "system": system})
return LLMResponse(content=self.complete_response, model="mock", stop_reason="stop")
def complete_with_tools(self, messages, system, tools, tool_executor, **kwargs) -> LLMResponse:
return LLMResponse(content="", model="mock", stop_reason="stop")
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _set_output_scenario(key: str, value: str) -> list:
return [
ToolCallEvent(
tool_use_id=f"call_{key}",
tool_name="set_output",
tool_input={"key": key, "value": value},
),
FinishEvent(stop_reason="tool_calls", input_tokens=10, output_tokens=5, model="mock"),
]
def _text_then_set_output(text: str, key: str, value: str) -> list:
return [
TextDeltaEvent(content=text, snapshot=text),
ToolCallEvent(
tool_use_id=f"call_{key}",
tool_name="set_output",
tool_input={"key": key, "value": value},
),
FinishEvent(stop_reason="tool_calls", input_tokens=10, output_tokens=5, model="mock"),
]
def _text_finish(text: str) -> list:
return [
TextDeltaEvent(content=text, snapshot=text),
FinishEvent(stop_reason="stop", input_tokens=10, output_tokens=5, model="mock"),
]
def _make_runtime():
rt = MagicMock(spec=Runtime)
rt.start_run = MagicMock(return_value="run_1")
rt.end_run = MagicMock()
rt.report_problem = MagicMock()
rt.decide = MagicMock(return_value="dec_1")
rt.record_outcome = MagicMock()
rt.set_node = MagicMock()
return rt
def _make_goal():
return Goal(id="g1", name="test", description="test goal")
# ===========================================================================
# Unit tests for _parse_verdict
# ===========================================================================
class TestParseVerdict:
def test_accept(self):
v = _parse_verdict("ACTION: ACCEPT\nCONFIDENCE: 0.9\nFEEDBACK:")
assert v.action == "ACCEPT"
assert v.confidence == 0.9
assert v.feedback == ""
def test_retry_with_feedback(self):
v = _parse_verdict("ACTION: RETRY\nCONFIDENCE: 0.6\nFEEDBACK: Research is too shallow.")
assert v.action == "RETRY"
assert v.confidence == 0.6
assert "shallow" in v.feedback
def test_defaults_on_garbage(self):
v = _parse_verdict("some random text\nno structured output")
assert v.action == "ACCEPT" # default
assert v.confidence == 0.8 # default
def test_invalid_action_defaults_to_accept(self):
v = _parse_verdict("ACTION: ESCALATE\nCONFIDENCE: 0.5")
assert v.action == "ACCEPT" # ESCALATE not valid for Level 2
# ===========================================================================
# Unit tests for evaluate_phase_completion
# ===========================================================================
class TestEvaluatePhaseCompletion:
@pytest.mark.asyncio
async def test_accept_on_good_response(self):
"""LLM says ACCEPT → verdict is ACCEPT."""
llm = MockStreamingLLM(complete_response="ACTION: ACCEPT\nCONFIDENCE: 0.95\nFEEDBACK:")
conv = NodeConversation(system_prompt="test")
await conv.add_user_message("Do research on topic X")
await conv.add_assistant_message("I found 5 high-quality sources on X.")
verdict = await evaluate_phase_completion(
llm=llm,
conversation=conv,
phase_name="Research",
phase_description="Research the topic",
success_criteria="Find at least 3 credible sources",
accumulator_state={"findings": "5 sources found"},
)
assert verdict.action == "ACCEPT"
assert verdict.confidence == 0.95
@pytest.mark.asyncio
async def test_retry_on_poor_response(self):
"""LLM says RETRY → verdict is RETRY with feedback."""
llm = MockStreamingLLM(
complete_response=(
"ACTION: RETRY\nCONFIDENCE: 0.4\nFEEDBACK: Only found 1 source, need 3."
)
)
conv = NodeConversation(system_prompt="test")
await conv.add_user_message("Do research")
await conv.add_assistant_message("I found 1 source.")
verdict = await evaluate_phase_completion(
llm=llm,
conversation=conv,
phase_name="Research",
phase_description="Research the topic",
success_criteria="Find at least 3 credible sources",
accumulator_state={"findings": "1 source"},
)
assert verdict.action == "RETRY"
assert "1 source" in verdict.feedback
@pytest.mark.asyncio
async def test_llm_failure_defaults_to_accept(self):
"""When LLM fails, Level 2 should not block (Level 0 already passed)."""
llm = MockStreamingLLM()
# Make complete() raise an exception
llm.complete = MagicMock(side_effect=RuntimeError("LLM unavailable"))
conv = NodeConversation(system_prompt="test")
await conv.add_assistant_message("Done.")
verdict = await evaluate_phase_completion(
llm=llm,
conversation=conv,
phase_name="Test",
phase_description="Test phase",
success_criteria="Do the thing",
accumulator_state={"result": "done"},
)
assert verdict.action == "ACCEPT"
assert verdict.confidence == 0.5
# ===========================================================================
# Integration: Level 2 in EventLoopNode implicit judge
# ===========================================================================
class TestLevel2InImplicitJudge:
@pytest.mark.asyncio
async def test_no_success_criteria_level0_only(self):
"""Without success_criteria, Level 0 accepts normally (existing behavior)."""
runtime = _make_runtime()
llm = MockStreamingLLM(
scenarios=[
_set_output_scenario("result", "done"),
_text_finish("accepted"),
]
)
spec = NodeSpec(
id="n1",
name="Node1",
description="test",
node_type="event_loop",
output_keys=["result"],
# No success_criteria!
)
graph = GraphSpec(
id="g1",
goal_id="g1",
entry_node="n1",
nodes=[spec],
edges=[],
)
executor = GraphExecutor(runtime=runtime, llm=llm)
result = await executor.execute(graph=graph, goal=_make_goal())
assert result.success
# LLM.complete should NOT have been called for Level 2
assert len(llm.complete_calls) == 0
@pytest.mark.asyncio
async def test_success_criteria_accept(self):
"""With success_criteria and good work, Level 2 accepts."""
runtime = _make_runtime()
llm = MockStreamingLLM(
scenarios=[
_text_then_set_output("I did thorough research.", "result", "done"),
_text_finish(""), # triggers judge
],
complete_response="ACTION: ACCEPT\nCONFIDENCE: 0.9\nFEEDBACK:",
)
spec = NodeSpec(
id="n1",
name="Research",
description="Do research",
node_type="event_loop",
output_keys=["result"],
success_criteria="Provide thorough research with multiple sources.",
)
graph = GraphSpec(
id="g1",
goal_id="g1",
entry_node="n1",
nodes=[spec],
edges=[],
)
executor = GraphExecutor(runtime=runtime, llm=llm)
result = await executor.execute(graph=graph, goal=_make_goal())
assert result.success
# LLM.complete should have been called for Level 2
assert len(llm.complete_calls) >= 1
@pytest.mark.asyncio
async def test_success_criteria_retry_then_accept(self):
"""Level 2 rejects first attempt, LLM tries again, Level 2 accepts."""
runtime = _make_runtime()
# Track complete calls to alternate responses
complete_responses = [
"ACTION: RETRY\nCONFIDENCE: 0.4\nFEEDBACK: Need more detail.",
"ACTION: ACCEPT\nCONFIDENCE: 0.9\nFEEDBACK:",
]
call_count = [0]
class SequentialLLM(MockStreamingLLM):
def complete(self, messages, system="", **kwargs):
idx = call_count[0]
call_count[0] += 1
resp = complete_responses[idx % len(complete_responses)]
return LLMResponse(content=resp, model="mock", stop_reason="stop")
llm = SequentialLLM(
scenarios=[
# Turn 1: set output, then stop → Level 2 RETRY
_text_then_set_output("Brief research.", "result", "brief"),
_text_finish(""), # triggers judge → Level 2 RETRY
# Turn 2: after retry feedback, set output again, stop → Level 2 ACCEPT
_text_then_set_output("Much more detailed research.", "result", "detailed"),
_text_finish(""), # triggers judge → Level 2 ACCEPT
]
)
spec = NodeSpec(
id="n1",
name="Research",
description="Do research",
node_type="event_loop",
output_keys=["result"],
success_criteria="Provide thorough research with multiple sources.",
)
graph = GraphSpec(
id="g1",
goal_id="g1",
entry_node="n1",
nodes=[spec],
edges=[],
)
executor = GraphExecutor(runtime=runtime, llm=llm)
result = await executor.execute(graph=graph, goal=_make_goal())
assert result.success
# Should have had 2 complete calls (first RETRY, second ACCEPT)
assert call_count[0] >= 2
@pytest.mark.asyncio
async def test_level2_only_fires_when_level0_passes(self):
"""Level 2 should NOT fire when output keys are missing."""
runtime = _make_runtime()
llm = MockStreamingLLM(
scenarios=[
# Turn 1: just text, no set_output → Level 0 RETRY (missing keys)
_text_finish("I did some thinking."),
# Turn 2: set output → Level 0 ACCEPT, Level 2 check
_text_then_set_output("Now I have output.", "result", "done"),
_text_finish(""), # triggers judge
],
complete_response="ACTION: ACCEPT\nCONFIDENCE: 0.9\nFEEDBACK:",
)
spec = NodeSpec(
id="n1",
name="Research",
description="Do research",
node_type="event_loop",
output_keys=["result"],
success_criteria="Provide results.",
)
graph = GraphSpec(
id="g1",
goal_id="g1",
entry_node="n1",
nodes=[spec],
edges=[],
)
executor = GraphExecutor(runtime=runtime, llm=llm)
result = await executor.execute(graph=graph, goal=_make_goal())
assert result.success
# Level 2 should only fire once (when Level 0 passes)
assert len(llm.complete_calls) == 1
@@ -929,6 +929,7 @@ async def test_context_handoff_between_nodes(runtime):
@pytest.mark.asyncio
@pytest.mark.skip(reason="Hangs in non-interactive shells (client-facing blocks on stdin)")
async def test_client_facing_node_streams_output():
"""Client-facing node emits CLIENT_OUTPUT_DELTA events."""
recorded: list[AgentEvent] = []
+3
View File
@@ -425,6 +425,7 @@ class TestEventBusLifecycle:
assert EventType.NODE_LOOP_COMPLETED in received_events
@pytest.mark.asyncio
@pytest.mark.skip(reason="Hangs in non-interactive shells (client-facing blocks on stdin)")
async def test_client_facing_uses_client_output_delta(self, runtime, memory):
"""client_facing=True should emit CLIENT_OUTPUT_DELTA instead of LLM_TEXT_DELTA."""
spec = NodeSpec(
@@ -475,6 +476,7 @@ class TestClientFacingBlocking:
)
@pytest.mark.asyncio
@pytest.mark.skip(reason="Hangs in non-interactive shells (client-facing blocks on stdin)")
async def test_text_only_no_blocking(self, runtime, memory, client_spec):
"""client_facing + text-only (no ask_user) should NOT block."""
llm = MockStreamingLLM(
@@ -630,6 +632,7 @@ class TestClientFacingBlocking:
assert received[0].type == EventType.CLIENT_INPUT_REQUESTED
@pytest.mark.asyncio
@pytest.mark.skip(reason="Hangs in non-interactive shells (client-facing blocks on stdin)")
async def test_ask_user_with_real_tools(self, runtime, memory):
"""ask_user alongside real tool calls still triggers blocking."""
spec = NodeSpec(
+2
View File
@@ -143,6 +143,7 @@ def _has_api_key(env_var: str) -> bool:
# ---------------------------------------------------------------------------
# Real API tests — text streaming
# ---------------------------------------------------------------------------
@pytest.mark.skip(reason="Requires valid live API keys — run manually")
class TestRealAPITextStreaming:
"""Stream a simple text response from each provider and dump events."""
@@ -204,6 +205,7 @@ class TestRealAPITextStreaming:
# ---------------------------------------------------------------------------
# Real API tests — tool call streaming
# ---------------------------------------------------------------------------
@pytest.mark.skip(reason="Requires valid live API keys — run manually")
class TestRealAPIToolCallStreaming:
"""Stream a tool call response from each provider and dump events."""
+280
View File
@@ -0,0 +1,280 @@
"""Tests for phase-aware compaction in continuous conversation mode.
Validates:
- Phase tags persist through storage roundtrip
- Transition markers survive compaction
- Current phase messages protected during compaction
- Older phase tool results pruned first
- Phase metadata fields have safe defaults
"""
from __future__ import annotations
import pytest
from framework.graph.conversation import Message, NodeConversation
class TestPhaseMetadata:
"""Phase metadata on Message dataclass."""
def test_defaults(self):
msg = Message(seq=0, role="user", content="hello")
assert msg.phase_id is None
assert msg.is_transition_marker is False
def test_set_phase(self):
msg = Message(seq=0, role="user", content="hello", phase_id="research")
assert msg.phase_id == "research"
def test_transition_marker(self):
msg = Message(
seq=0,
role="user",
content="PHASE TRANSITION",
is_transition_marker=True,
phase_id="report",
)
assert msg.is_transition_marker is True
assert msg.phase_id == "report"
def test_storage_roundtrip(self):
"""Phase metadata should survive to_storage_dict → from_storage_dict."""
msg = Message(
seq=5,
role="user",
content="transition",
phase_id="review",
is_transition_marker=True,
)
d = msg.to_storage_dict()
assert d["phase_id"] == "review"
assert d["is_transition_marker"] is True
restored = Message.from_storage_dict(d)
assert restored.phase_id == "review"
assert restored.is_transition_marker is True
def test_storage_roundtrip_no_phase(self):
"""Messages without phase metadata should roundtrip cleanly."""
msg = Message(seq=0, role="assistant", content="hello")
d = msg.to_storage_dict()
assert "phase_id" not in d
assert "is_transition_marker" not in d
restored = Message.from_storage_dict(d)
assert restored.phase_id is None
assert restored.is_transition_marker is False
def test_to_llm_dict_no_metadata(self):
"""Phase metadata should NOT appear in LLM-facing dicts."""
msg = Message(
seq=0,
role="user",
content="hello",
phase_id="research",
is_transition_marker=True,
)
d = msg.to_llm_dict()
assert "phase_id" not in d
assert "is_transition_marker" not in d
assert d == {"role": "user", "content": "hello"}
class TestPhaseStamping:
"""Messages are stamped with current phase."""
@pytest.mark.asyncio
async def test_messages_stamped_with_phase(self):
conv = NodeConversation(system_prompt="test")
conv.set_current_phase("research")
msg1 = await conv.add_user_message("search for X")
msg2 = await conv.add_assistant_message("Found it.")
assert msg1.phase_id == "research"
assert msg2.phase_id == "research"
@pytest.mark.asyncio
async def test_phase_changes_stamp(self):
conv = NodeConversation(system_prompt="test")
conv.set_current_phase("research")
msg1 = await conv.add_user_message("research msg")
conv.set_current_phase("report")
msg2 = await conv.add_user_message("report msg")
assert msg1.phase_id == "research"
assert msg2.phase_id == "report"
@pytest.mark.asyncio
async def test_no_phase_no_stamp(self):
conv = NodeConversation(system_prompt="test")
msg = await conv.add_user_message("no phase")
assert msg.phase_id is None
@pytest.mark.asyncio
async def test_transition_marker_flag(self):
conv = NodeConversation(system_prompt="test")
conv.set_current_phase("report")
msg = await conv.add_user_message(
"PHASE TRANSITION: Research → Report",
is_transition_marker=True,
)
assert msg.is_transition_marker is True
assert msg.phase_id == "report"
@pytest.mark.asyncio
async def test_tool_result_stamped(self):
conv = NodeConversation(system_prompt="test")
conv.set_current_phase("research")
msg = await conv.add_tool_result("call_1", "tool output here")
assert msg.phase_id == "research"
class TestPhaseAwareCompaction:
"""prune_old_tool_results protects current phase and transition markers."""
@pytest.mark.asyncio
async def test_transition_marker_survives_compaction(self):
"""Transition markers should never be pruned."""
conv = NodeConversation(system_prompt="test")
# Old phase with a big tool result
conv.set_current_phase("research")
await conv.add_assistant_message(
"calling tool",
tool_calls=[
{
"id": "call_1",
"type": "function",
"function": {"name": "search", "arguments": "{}"},
}
],
)
await conv.add_tool_result("call_1", "x" * 20000) # big tool result
# Transition marker
await conv.add_user_message(
"PHASE TRANSITION: Research → Report",
is_transition_marker=True,
)
# New phase
conv.set_current_phase("report")
await conv.add_assistant_message(
"calling another tool",
tool_calls=[
{
"id": "call_2",
"type": "function",
"function": {"name": "save", "arguments": "{}"},
}
],
)
await conv.add_tool_result("call_2", "y" * 200)
pruned = await conv.prune_old_tool_results(protect_tokens=0, min_prune_tokens=100)
assert pruned >= 1
# Transition marker should still be intact
marker_msgs = [m for m in conv.messages if m.is_transition_marker]
assert len(marker_msgs) == 1
assert "PHASE TRANSITION" in marker_msgs[0].content
@pytest.mark.asyncio
async def test_current_phase_protected(self):
"""Tool results in the current phase should not be pruned."""
conv = NodeConversation(system_prompt="test")
# Old phase
conv.set_current_phase("research")
await conv.add_assistant_message(
"tool call",
tool_calls=[
{"id": "c1", "type": "function", "function": {"name": "s", "arguments": "{}"}}
],
)
await conv.add_tool_result("c1", "old_data " * 5000)
# Current phase
conv.set_current_phase("report")
await conv.add_assistant_message(
"tool call",
tool_calls=[
{"id": "c2", "type": "function", "function": {"name": "s", "arguments": "{}"}}
],
)
await conv.add_tool_result("c2", "current_data " * 5000)
await conv.prune_old_tool_results(protect_tokens=0, min_prune_tokens=100)
# Old phase's tool result should be pruned
msgs = conv.messages
old_tool = [m for m in msgs if m.role == "tool" and m.phase_id == "research"]
assert len(old_tool) == 1
assert old_tool[0].content.startswith("[Pruned tool result")
# Current phase's tool result should be intact
current_tool = [m for m in msgs if m.role == "tool" and m.phase_id == "report"]
assert len(current_tool) == 1
assert "current_data" in current_tool[0].content
@pytest.mark.asyncio
async def test_no_phase_metadata_works_normally(self):
"""Without phase metadata, compaction works as before (no regression)."""
conv = NodeConversation(system_prompt="test")
# No phase set — messages have phase_id=None
await conv.add_assistant_message(
"tool call",
tool_calls=[
{"id": "c1", "type": "function", "function": {"name": "s", "arguments": "{}"}}
],
)
await conv.add_tool_result("c1", "data " * 5000) # ~6250 tokens
await conv.add_assistant_message(
"another tool call",
tool_calls=[
{"id": "c2", "type": "function", "function": {"name": "s", "arguments": "{}"}}
],
)
await conv.add_tool_result("c2", "more " * 100) # ~125 tokens
# protect_tokens=100: c2 (~125 tokens) fills the budget,
# c1 (~6250 tokens) becomes pruneable
pruned = await conv.prune_old_tool_results(protect_tokens=100, min_prune_tokens=100)
assert pruned >= 1
@pytest.mark.asyncio
async def test_pruned_message_preserves_phase_metadata(self):
"""Pruned messages should keep their phase_id."""
conv = NodeConversation(system_prompt="test")
conv.set_current_phase("research")
await conv.add_assistant_message(
"tool call",
tool_calls=[
{"id": "c1", "type": "function", "function": {"name": "s", "arguments": "{}"}}
],
)
await conv.add_tool_result("c1", "data " * 5000)
# Switch to new phase so research messages become pruneable
conv.set_current_phase("report")
await conv.add_assistant_message(
"recent",
tool_calls=[
{"id": "c2", "type": "function", "function": {"name": "s", "arguments": "{}"}}
],
)
await conv.add_tool_result("c2", "x" * 200)
await conv.prune_old_tool_results(protect_tokens=0, min_prune_tokens=100)
pruned_msg = [m for m in conv.messages if m.content.startswith("[Pruned")][0]
assert pruned_msg.phase_id == "research"
+214
View File
@@ -0,0 +1,214 @@
# Antigravity IDE Setup
Use the Hive agent framework (MCP servers and skills) inside [Antigravity IDE](https://antigravity.google/) (Googles AI IDE).
---
## Quick start (3 steps)
**Repo root** = the folder that contains `core/`, `tools/`, and `.agent/` (where you cloned the project).
1. **Open a terminal** and go to the hive repo root (e.g. `cd ~/hive`).
2. **Run the setup script** (use `./` so the script runs from this repo; don't use `/scripts/...`):
```bash
./scripts/setup-antigravity-mcp.sh
```
3. **Restart Antigravity IDE.** You should see **agent-builder** and **tools** as available MCP servers.
> **Important:** Always restart/refresh Antigravity IDE after running the setup script or making any changes to MCP configuration. The IDE only loads MCP servers on startup.
Done. For details, prerequisites, and troubleshooting, read on.
---
## What you get after setup
- **agent-builder** Create and manage agents (goals, nodes, edges).
- **tools** File operations, web search, and other agent tools.
- **Skills** Guided docs for building and testing agents (in `.agent/skills/` or `.claude/skills/`).
---
## Prerequisites
- [Antigravity IDE](https://antigravity.google/) installed.
- **Python 3.11+** and project dependencies. If you havent set up the repo yet, from repo root run:
```bash
./scripts/setup-python.sh
```
- **MCP server dependencies** (one-time). From repo root:
```bash
cd core && ./setup_mcp.sh
```
---
## Full setup (step by step)
### Step 1: Install MCP dependencies (one-time)
From the **repo root**:
```bash
cd core
./setup_mcp.sh
```
This installs the framework and MCP packages and checks that the server can start.
### Step 2: Register MCP servers with Antigravity
Antigravity reads MCP config from your **user config file** (`~/.gemini/antigravity/mcp_config.json`), not from the project. The easiest way is to run the setup script from the **hive repo folder**:
```bash
./scripts/setup-antigravity-mcp.sh
```
The script finds the repo root, writes `~/.gemini/antigravity/mcp_config.json` with the right paths, and you don't edit any paths by hand.
> **Important:** Always restart/refresh Antigravity IDE after running the setup script. MCP servers are only loaded on IDE startup.
The **agent-builder** and **tools** servers should show up after restart.
**Using Claude Code instead?** Run:
```bash
./scripts/setup-antigravity-mcp.sh --claude
```
That writes `~/.claude/mcp.json` as well.
**Prefer to do it manually?** See [Manual MCP config](#manual-mcp-config-template) below. Youll create `~/.gemini/mcp.json` (or `~/.claude/mcp.json`) with absolute paths to your repos `core` and `tools` folders.
### Step 3: Use skills
Skills are guides (workflow, building, testing) in `.agent/skills/` (they point to `.claude/skills/`). If Antigravity doesnt show a “skills” UI, open those folders in the project and use the files as reference while you use the MCP tools.
| Skill | What it's for |
|-------|----------------|
| **hive** | End-to-end workflow for building and testing agents |
| **hive-concepts** | Core ideas for goal-driven agents |
| **hive-create** | Step-by-step agent construction |
| **hive-patterns** | Patterns and best practices |
| **hive-test** | Goal-based evaluation and testing |
| **hive-credentials** | Set up and manage agent credentials |
---
## Whats in the repo (`.agent/`)
```
.agent/
├── mcp_config.json # Template for MCP servers (agent-builder, tools)
└── skills/ # Symlinks to .claude/skills/
```
The **setup script** writes your **user** config (`~/.gemini/antigravity/mcp_config.json`) using paths from **this repo**. The file in `.agent/` is the template; Antigravity itself uses the file in your home directory.
---
## Troubleshooting
**MCP servers dont connect**
- Run the setup script again from the hive repo root: `./scripts/setup-antigravity-mcp.sh`, then restart Antigravity.
- Make sure Python and deps are installed: from repo root run `./scripts/setup-python.sh`.
- Check that the servers can start: from repo root run
`cd core && uv run -m framework.mcp.agent_builder_server` (Ctrl+C to stop), and in another terminal
`cd tools && uv run mcp_server.py --stdio` (Ctrl+C to stop).
If those fail, fix the errors first (e.g. install deps with `uv sync`).
**"Module not found" or import errors**
- Open the **repo root** as the project in the IDE (the folder that has `core/` and `tools/`).
- If you edited `~/.gemini/antigravity/mcp_config.json` by hand, make sure `--directory` paths are **absolute** (e.g. `/Users/you/hive/core` and `/Users/you/hive/tools`).
**Skills dont show up in the UI**
- Antigravity may not have a dedicated “skills” panel. Use the files in `.claude/skills/` or `.agent/skills/` as docs; the MCP tools (agent-builder, tools) still work.
---
## Verification prompt (optional)
Paste this into Antigravity to check that MCP and skills are set up. It doesnt use your machines paths; anyone can use it.
```
Check the Hive + Antigravity integration:
1. MCP: List available MCP servers/tools. Confirm that "agent-builder" and "tools" (or equivalent) are connected. If not, tell the user to run ./scripts/setup-antigravity-mcp.sh from the hive repo root, then restart Antigravity (see docs/antigravity-setup.md).
2. Skills: Confirm that the project has .agent/skills/ (or .claude/skills/) with: hive, hive-concepts, hive-create, hive-patterns, hive-test, hive-credentials.
3. Result: Reply with PASS (MCP + skills OK), PARTIAL (only skills or only MCP), or FAIL (neither), and one line on what to fix if not PASS.
```
If you get **PARTIAL** (e.g. MCP not connected), run `./scripts/setup-antigravity-mcp.sh` from the repo root and restart Antigravity.
---
## Manual MCP config template
Use this only if you dont want to run the setup script. Replace `/path/to/hive` with your actual repo root (e.g. the output of `pwd` when youre in the hive folder).
Save as `~/.gemini/antigravity/mcp_config.json` (Antigravity) or `~/.claude/mcp.json` (Claude Code), then **restart the IDE** to load the new configuration.
```json
{
"mcpServers": {
"agent-builder": {
"command": "uv",
"args": ["run", "--directory", "/path/to/hive/core", "-m", "framework.mcp.agent_builder_server"],
"disabled": false
},
"tools": {
"command": "uv",
"args": ["run", "--directory", "/path/to/hive/tools", "mcp_server.py", "--stdio"],
"disabled": false
}
}
}
```
Make sure `uv` is installed and available in your PATH. Note: Use `--directory` in args instead of `cwd` for Antigravity compatibility.
---
## Verify from the command line (optional)
From the **repo root**:
**Check that config and skills exist**
```bash
test -f .agent/mcp_config.json && echo "OK: mcp_config.json" || echo "MISSING"
for s in hive hive-concepts hive-create hive-patterns hive-test hive-credentials; do
test -L .agent/skills/$s && test -d .agent/skills/$s && echo "OK: $s" || echo "BROKEN: $s"
done
```
**Check that the config is valid JSON**
```bash
python3 -c "import json; json.load(open('.agent/mcp_config.json')); print('OK: valid JSON')"
```
**Test that MCP servers start** (two terminals)
```bash
# Terminal 1
cd core && uv run -m framework.mcp.agent_builder_server
# Terminal 2
cd tools && uv run mcp_server.py --stdio
```
If both start without errors, the config is fine.
---
## See also
- [Cursor IDE support](../README.md#cursor-ide-support) Same MCP servers and skills for Cursor
- [MCP Integration Guide](../core/MCP_INTEGRATION_GUIDE.md) How the framework MCP works
- [Environment setup](../ENVIRONMENT_SETUP.md) Repo and Python setup
+8
View File
@@ -102,6 +102,14 @@ The repository includes a `.claude/settings.json` hook that automatically runs `
The `.cursorrules` file at the repo root tells Cursor's AI the project's style rules (line length, import order, quote style, etc.) so generated code follows convention.
### Antigravity IDE
Antigravity IDE (Google's AI-powered IDE) is supported via `.antigravity/mcp_config.json` and `.antigravity/skills/` (symlinks to `.claude/skills/`). See [antigravity-setup.md](antigravity-setup.md) for setup and troubleshooting.
### Codex CLI
Codex CLI (OpenAI, v0.101.0+) is supported via `.codex/config.toml` (MCP server config) and `.agents/skills/` (symlinks to `.claude/skills/`). These files are tracked in git. Run `codex` in the repo root and type `use hive` to start. See the [Codex CLI section in the README](../README.md#codex-cli) for details.
---
## CI Pipeline
+26 -4
View File
@@ -25,7 +25,8 @@ Aden Agent Framework is a Python-based system for building goal-driven, self-imp
| **framework** | `/core` | Core runtime, graph executor, protocols | Python 3.11+ |
| **tools** | `/tools` | MCP tools for agent capabilities | Python 3.11+ |
| **exports** | `/exports` | Agent packages (user-created, gitignored) | Python 3.11+ |
| **skills** | `.claude` | Claude Code skills for building/testing | Markdown |
| **skills** | `.claude`, `.agents`, `.agent` | Shared skills for Claude/Codex/other coding agents | Markdown |
| **codex** | `.codex` | Codex CLI project configuration (MCP servers) | TOML |
### Key Principles
@@ -46,7 +47,8 @@ Ensure you have installed:
- **Python 3.11+** - [Download](https://www.python.org/downloads/) (3.12 or 3.13 recommended)
- **uv** - Python package manager ([Install](https://docs.astral.sh/uv/getting-started/installation/))
- **git** - Version control
- **Claude Code** - [Install](https://docs.anthropic.com/claude/docs/claude-code) (optional, for using building skills)
- **Claude Code** - [Install](https://docs.anthropic.com/claude/docs/claude-code) (optional)
- **Codex CLI** - [Install](https://github.com/openai/codex) (optional)
Verify installation:
@@ -116,6 +118,23 @@ Skills are also available in Cursor. To enable:
3. Restart Cursor to load the MCP servers from `.cursor/mcp.json`
4. Type `/` in Agent chat and search for skills (e.g., `/hive-create`)
### Codex CLI Support
Hive supports [OpenAI Codex CLI](https://github.com/openai/codex) (v0.101.0+).
Configuration files are tracked in git:
- `.codex/config.toml` — MCP server config (`agent-builder`)
- `.agents/skills/` — Symlinks to Hive skills
To use Codex with Hive:
1. Run `codex` in the repo root
2. Type `use hive` to start the agent workflow
Example:
```
codex> use hive
```
### Opencode Support
To enable Opencode integration:
@@ -164,6 +183,10 @@ hive/ # Repository root
│ ├── hive-concepts/ # Fundamental concepts
│ ├── hive-patterns/ # Best practices
│ └── hive-test/ # Test and validate agents
├── .codex/ # Codex CLI project config
│ └── config.toml # Codex MCP server definitions
├── .agents/ # Shared skill mountpoint
│ └── skills/ # Symlinks to Hive skills
├── core/ # CORE FRAMEWORK PACKAGE
│ ├── framework/ # Main package code
@@ -215,10 +238,10 @@ hive/ # Repository root
├── scripts/ # Utility scripts
│ └── auto-close-duplicates.ts # GitHub duplicate issue closer
├── .agent/ # Antigravity IDE: mcp_config.json + skills (symlinks)
├── quickstart.sh # Interactive setup wizard
├── README.md # Project overview
├── CONTRIBUTING.md # Contribution guidelines
├── CHANGELOG.md # Version history
├── LICENSE # Apache 2.0 License
├── docs/CODE_OF_CONDUCT.md # Community guidelines
└── SECURITY.md # Security policy
@@ -656,7 +679,6 @@ lsof -i :4000
# Kill process
kill -9 <PID>
# Or change ports in config.yaml and regenerate
```
### Environment Variables Not Loading
+39 -10
View File
@@ -131,15 +131,15 @@ hive run exports/my_agent --tui
### CLI Command Reference
| Command | Description |
|---------|-------------|
| `hive tui` | Browse agents and launch TUI dashboard |
| `hive run <path>` | Execute an agent (`--tui`, `--model`, `--mock`, `--quiet`, `--verbose`) |
| `hive shell [path]` | Interactive REPL (`--multi`, `--no-approve`) |
| `hive info <path>` | Show agent details |
| `hive validate <path>` | Validate agent structure |
| `hive list [dir]` | List available agents |
| `hive dispatch [dir]` | Multi-agent orchestration |
| Command | Description |
| ---------------------- | ----------------------------------------------------------------------- |
| `hive tui` | Browse agents and launch TUI dashboard |
| `hive run <path>` | Execute an agent (`--tui`, `--model`, `--mock`, `--quiet`, `--verbose`) |
| `hive shell [path]` | Interactive REPL (`--multi`, `--no-approve`) |
| `hive info <path>` | Show agent details |
| `hive validate <path>` | Validate agent structure |
| `hive list [dir]` | List available agents |
| `hive dispatch [dir]` | Multi-agent orchestration |
### Using Python directly (alternative)
@@ -184,10 +184,16 @@ Skills are also available in Cursor. To enable:
### 2. Build an Agent
**Claude Code:**
```
claude> /hive
```
**Codex CLI:**
```
codex> use hive
```
Follow the prompts to:
1. Define your agent's goal
@@ -521,15 +527,38 @@ export ADEN_CREDENTIALS_PATH="/custom/path"
# Agent storage location (default: /tmp)
export AGENT_STORAGE_PATH="/custom/storage"
```
## Opencode Setup
[Opencode](https://github.com/opencode-ai/opencode) is fully supported as a coding agent.
### Automatic Setup
Run the quickstart script in the root directorys:
Run the quickstart script in the root directory:
```bash
./quickstart.sh
```
## Codex Setup
[OpenAI Codex CLI](https://github.com/openai/codex) (v0.101.0+) is supported with project-level config:
- `.codex/config.toml` — MCP server configuration (`agent-builder`)
- `.agents/skills/` — Symlinks to Hive skills
These files are tracked in git and available on clone. To use Codex with Hive:
1. Run `codex` in the repo root
2. Type `use hive` to start the agent workflow
Quick verification:
```bash
test -f .codex/config.toml && echo "OK: Codex config" || echo "MISSING: .codex/config.toml"
test -d .agents/skills/hive && echo "OK: Skills" || echo "MISSING: .agents/skills/"
```
## Additional Resources
- **Framework Documentation:** [core/README.md](../core/README.md)
+11
View File
@@ -236,6 +236,17 @@ By default, Hive supports deployment through Docker.
If you want zero-ops deployment, easier integration and credential management, and built-in logging, Aden Cloud is ideal. You get secure defaults, scaling, and observability out of the box—at the cost of less low-level control and some vendor lock-in.
## Autonomous Agent Deployment
Hive is designed to support
- Memory sustainalibility (what are the memory to keep and what to discard)
- Event source management
- Recoverablility
- Repeatability
- Volume - Multiple approach to support batch operation
## Deployment Strategy
Autonomous and interactive modes look different, but the core remains the same, and your deployment strategy should be consistent across both.
+2
View File
@@ -68,6 +68,7 @@ subgraph Expansion
j2["Cursor"]
j3["Opencode"]
j4["Antigravity"]
j5["Codex CLI"]
end
subgraph plat["Platform"]
k1["JavaScript/TypeScript SDK"]
@@ -254,6 +255,7 @@ classDef done fill:#9e9e9e,color:#fff,stroke:#757575
- [ ] Cursor
- [ ] Opencode
- [ ] Antigravity
- [ ] Codex CLI (in progress)
### File System Enhancement
- [ ] Semantic Search integration
@@ -17,6 +17,8 @@
"terminal_nodes": [
"report"
],
"conversation_mode": "continuous",
"identity_prompt": "You are a rigorous research agent. You search for information from diverse, authoritative sources, analyze findings critically, and produce well-cited reports. You never fabricate information \u2014 every claim must trace back to a source you actually retrieved.",
"nodes": [
{
"id": "intake",
@@ -32,7 +34,8 @@
"nullable_output_keys": [],
"input_schema": {},
"output_schema": {},
"system_prompt": "You are a research intake specialist. The user wants to research a topic.\nHave a brief conversation to clarify what they need.\n\n**STEP 1 \u2014 Read and respond (text only, NO tool calls):**\n1. Read the topic provided\n2. If it's vague, ask 1-2 clarifying questions (scope, angle, depth)\n3. If it's already clear, confirm your understanding and ask the user to confirm\n\nKeep it short. Don't over-ask.\n\nAfter your message, call ask_user() to wait for the user's response.\n\n**STEP 2 \u2014 After the user confirms, call set_output:**\n- set_output(\"research_brief\", \"A clear paragraph describing exactly what to research, what questions to answer, what scope to cover, and how deep to go.\")",
"success_criteria": "The research brief is specific and actionable: it states the topic, the key questions to answer, the desired scope, and depth.",
"system_prompt": "You are a research intake specialist. The user wants to research a topic.\nHave a brief conversation to clarify what they need.\n\n**STEP 1 \u2014 Read and respond (text only, NO tool calls):**\n1. Read the topic provided\n2. If it's vague, ask 1-2 clarifying questions (scope, angle, depth)\n3. If it's already clear, confirm your understanding and ask the user to confirm\n\nKeep it short. Don't over-ask.\n\n**STEP 2 \u2014 After the user confirms, call set_output:**\n- set_output(\"research_brief\", \"A clear paragraph describing exactly what to research, what questions to answer, what scope to cover, and how deep to go.\")",
"tools": [],
"model": null,
"function": null,
@@ -63,7 +66,8 @@
],
"input_schema": {},
"output_schema": {},
"system_prompt": "You are a research agent. Given a research brief, find and analyze sources.\n\nIf feedback is provided, this is a follow-up round \u2014 focus on the gaps identified.\n\nWork in phases:\n1. **Search**: Use web_search with 3-5 diverse queries covering different angles.\n Prioritize authoritative sources (.edu, .gov, established publications).\n2. **Fetch**: Use web_scrape on the most promising URLs (aim for 5-8 sources).\n Skip URLs that fail. Extract the substantive content.\n3. **Analyze**: Review what you've collected. Identify key findings, themes,\n and any contradictions between sources.\n\nImportant:\n- Work in batches of 3-4 tool calls at a time to manage context\n- After each batch, assess whether you have enough material\n- Prefer quality over quantity \u2014 5 good sources beat 15 thin ones\n- Track which URL each finding comes from (you'll need citations later)\n\nWhen done, use set_output:\n- set_output(\"findings\", \"Structured summary: key findings with source URLs for each claim. Include themes, contradictions, and confidence levels.\")\n- set_output(\"sources\", [{\"url\": \"...\", \"title\": \"...\", \"summary\": \"...\"}])\n- set_output(\"gaps\", \"What aspects of the research brief are NOT well-covered yet, if any.\")",
"success_criteria": "Findings reference at least 3 distinct sources with URLs. Key claims are substantiated by fetched content, not generated.",
"system_prompt": "You are a research agent. Given a research brief, find and analyze sources.\n\nIf feedback is provided, this is a follow-up round \u2014 focus on the gaps identified.\n\nWork in phases:\n1. **Search**: Use web_search with 3-5 diverse queries covering different angles.\n Prioritize authoritative sources (.edu, .gov, established publications).\n2. **Fetch**: Use web_scrape on the most promising URLs (aim for 5-8 sources).\n Skip URLs that fail. Extract the substantive content.\n3. **Analyze**: Review what you've collected. Identify key findings, themes,\n and any contradictions between sources.\n\nImportant:\n- Work in batches of 3-4 tool calls at a time \u2014 never more than 10 per turn\n- After each batch, assess whether you have enough material\n- Prefer quality over quantity \u2014 5 good sources beat 15 thin ones\n- Track which URL each finding comes from (you'll need citations later)\n- Call set_output for each key in a SEPARATE turn (not in the same turn as other tool calls)\n\nWhen done, use set_output (one key at a time, separate turns):\n- set_output(\"findings\", \"Structured summary: key findings with source URLs for each claim. Include themes, contradictions, and confidence levels.\")\n- set_output(\"sources\", [{\"url\": \"...\", \"title\": \"...\", \"summary\": \"...\"}])\n- set_output(\"gaps\", \"What aspects of the research brief are NOT well-covered yet, if any.\")",
"tools": [
"web_search",
"web_scrape",
@@ -99,7 +103,8 @@
"nullable_output_keys": [],
"input_schema": {},
"output_schema": {},
"system_prompt": "Present the research findings to the user clearly and concisely.\n\n**STEP 1 \u2014 Present (your first message, text only, NO tool calls):**\n1. **Summary** (2-3 sentences of what was found)\n2. **Key Findings** (bulleted, with confidence levels)\n3. **Sources Used** (count and quality assessment)\n4. **Gaps** (what's still unclear or under-covered)\n\nEnd by asking: Are they satisfied, or do they want deeper research? Should we proceed to writing the final report?\n\nAfter your presentation, call ask_user() to wait for the user's response.\n\n**STEP 2 \u2014 After the user responds, call set_output:**\n- set_output(\"needs_more_research\", \"true\") \u2014 if they want more\n- set_output(\"needs_more_research\", \"false\") \u2014 if they're satisfied\n- set_output(\"feedback\", \"What the user wants explored further, or empty string\")",
"success_criteria": "The user has been presented with findings and has explicitly indicated whether they want more research or are ready for the report.",
"system_prompt": "Present the research findings to the user clearly and concisely.\n\n**STEP 1 \u2014 Present (your first message, text only, NO tool calls):**\n1. **Summary** (2-3 sentences of what was found)\n2. **Key Findings** (bulleted, with confidence levels)\n3. **Sources Used** (count and quality assessment)\n4. **Gaps** (what's still unclear or under-covered)\n\nEnd by asking: Are they satisfied, or do they want deeper research? Should we proceed to writing the final report?\n\n**STEP 2 \u2014 After the user responds, call set_output:**\n- set_output(\"needs_more_research\", \"true\") \u2014 if they want more\n- set_output(\"needs_more_research\", \"false\") \u2014 if they're satisfied\n- set_output(\"feedback\", \"What the user wants explored further, or empty string\")",
"tools": [],
"model": null,
"function": null,
@@ -127,7 +132,8 @@
"nullable_output_keys": [],
"input_schema": {},
"output_schema": {},
"system_prompt": "Write a comprehensive research report as an HTML file and present it to the user.\n\n**STEP 1 \u2014 Write the HTML report (tool calls, NO text to user yet):**\n\n1. Compose a complete, self-contained HTML document with embedded CSS styling.\n Use a clean, readable design: max-width container, pleasant typography,\n numbered citation links, a table of contents, and a references section.\n\n Report structure inside the HTML:\n - Title & date\n - Executive Summary (2-3 paragraphs)\n - Table of Contents\n - Findings (organized by theme, with [n] citation links)\n - Analysis (synthesis, implications, areas of debate)\n - Conclusion (key takeaways, confidence assessment)\n - References (numbered list with clickable URLs)\n\n Requirements:\n - Every factual claim must cite its source with [n] notation\n - Be objective \u2014 present multiple viewpoints where sources disagree\n - Distinguish well-supported conclusions from speculation\n - Answer the original research questions from the brief\n\n2. Save the HTML file:\n save_data(filename=\"report.html\", data=<your_html>)\n\n3. Get the clickable link:\n serve_file_to_user(filename=\"report.html\", label=\"Research Report\")\n\n**STEP 2 \u2014 Present the link to the user (text only, NO tool calls):**\n\nTell the user the report is ready and include the file:// URI from\nserve_file_to_user so they can click it to open. Give a brief summary\nof what the report covers. Ask if they have questions.\n\nAfter presenting the link, call ask_user() to wait for the user's response.\n\n**STEP 3 \u2014 After the user responds:**\n- Answer follow-up questions from the research material\n- Call ask_user() again if they might have more questions\n- When the user is satisfied: set_output(\"delivery_status\", \"completed\")",
"success_criteria": "An HTML report has been saved, the file link has been presented to the user, and the user has acknowledged receipt.",
"system_prompt": "Write a research report as an HTML file and present it to the user.\n\nIMPORTANT: save_data requires TWO separate arguments: filename and data.\nCall it like: save_data(filename=\"report.html\", data=\"<html>...</html>\")\nDo NOT use _raw, do NOT nest arguments inside a JSON string.\n\n**STEP 1 \u2014 Write and save the HTML report (tool calls, NO text to user yet):**\n\nBuild a clean HTML document. Keep the HTML concise \u2014 aim for clarity over length.\nUse minimal embedded CSS (a few lines of style, not a full framework).\n\nReport structure:\n- Title & date\n- Executive Summary (2-3 paragraphs)\n- Key Findings (organized by theme, with [n] citation links)\n- Analysis (synthesis, implications)\n- Conclusion (key takeaways)\n- References (numbered list with clickable URLs)\n\nRequirements:\n- Every factual claim must cite its source with [n] notation\n- Be objective \u2014 present multiple viewpoints where sources disagree\n- Answer the original research questions from the brief\n\nSave the HTML:\n save_data(filename=\"report.html\", data=\"<html>...</html>\")\n\nThen get the clickable link:\n serve_file_to_user(filename=\"report.html\", label=\"Research Report\")\n\nIf save_data fails, simplify and shorten the HTML, then retry.\n\n**STEP 2 \u2014 Present the link to the user (text only, NO tool calls):**\n\nTell the user the report is ready and include the file:// URI from\nserve_file_to_user so they can click it to open. Give a brief summary\nof what the report covers. Ask if they have questions.\n\n**STEP 3 \u2014 After the user responds:**\n- Answer follow-up questions from the research material\n- When the user is satisfied: set_output(\"delivery_status\", \"completed\")",
"tools": [
"save_data",
"serve_file_to_user",
+101 -50
View File
@@ -1,12 +1,15 @@
"""Agent graph construction for Deep Research Agent."""
from pathlib import Path
from framework.graph import EdgeSpec, EdgeCondition, Goal, SuccessCriterion, Constraint
from framework.graph.edge import GraphSpec
from framework.graph.executor import ExecutionResult, GraphExecutor
from framework.runtime.event_bus import EventBus
from framework.runtime.core import Runtime
from framework.graph.executor import ExecutionResult
from framework.graph.checkpoint_config import CheckpointConfig
from framework.llm import LiteLLMProvider
from framework.runner.tool_registry import ToolRegistry
from framework.runtime.agent_runtime import AgentRuntime, create_agent_runtime
from framework.runtime.execution_stream import EntryPointSpec
from .config import default_config, metadata
from .nodes import (
@@ -102,22 +105,40 @@ edges = [
condition=EdgeCondition.ON_SUCCESS,
priority=1,
),
# review -> research (feedback loop, checked first)
# review -> research (feedback loop)
EdgeSpec(
id="review-to-research-feedback",
source="review",
target="research",
condition=EdgeCondition.CONDITIONAL,
condition_expr="str(needs_more_research).lower() == 'true'",
priority=2,
condition_expr="needs_more_research == True",
priority=1,
),
# review -> report (complementary condition — proceed to report when no more research needed)
# review -> report (user satisfied)
EdgeSpec(
id="review-to-report",
source="review",
target="report",
condition=EdgeCondition.CONDITIONAL,
condition_expr="str(needs_more_research).lower() != 'true'",
condition_expr="needs_more_research == False",
priority=2,
),
# report -> research (user wants deeper research on current topic)
EdgeSpec(
id="report-to-research",
source="report",
target="research",
condition=EdgeCondition.CONDITIONAL,
condition_expr="str(next_action).lower() == 'more_research'",
priority=2,
),
# report -> intake (user wants a new topic — default when not more_research)
EdgeSpec(
id="report-to-intake",
source="report",
target="intake",
condition=EdgeCondition.CONDITIONAL,
condition_expr="str(next_action).lower() != 'more_research'",
priority=1,
),
]
@@ -126,7 +147,7 @@ edges = [
entry_node = "intake"
entry_points = {"start": "intake"}
pause_nodes = []
terminal_nodes = ["report"]
terminal_nodes = []
class DeepResearchAgent:
@@ -136,6 +157,12 @@ class DeepResearchAgent:
Flow: intake -> research -> review -> report
^ |
+-- feedback loop (if user wants more)
Uses AgentRuntime for proper session management:
- Session-scoped storage (sessions/{session_id}/)
- Checkpointing for resume capability
- Runtime logging
- Data folder for save_data/load_data
"""
def __init__(self, config=None):
@@ -147,10 +174,10 @@ class DeepResearchAgent:
self.entry_points = entry_points
self.pause_nodes = pause_nodes
self.terminal_nodes = terminal_nodes
self._executor: GraphExecutor | None = None
self._graph: GraphSpec | None = None
self._event_bus: EventBus | None = None
self._agent_runtime: AgentRuntime | None = None
self._tool_registry: ToolRegistry | None = None
self._storage_path: Path | None = None
def _build_graph(self) -> GraphSpec:
"""Build the GraphSpec."""
@@ -171,82 +198,106 @@ class DeepResearchAgent:
"max_tool_calls_per_turn": 20,
"max_history_tokens": 32000,
},
conversation_mode="continuous",
identity_prompt=(
"You are a rigorous research agent. You search for information "
"from diverse, authoritative sources, analyze findings critically, "
"and produce well-cited reports. You never fabricate information — "
"every claim must trace back to a source you actually retrieved."
),
)
def _setup(self) -> GraphExecutor:
"""Set up the executor with all components."""
from pathlib import Path
def _setup(self, mock_mode=False) -> None:
"""Set up the agent runtime with sessions, checkpoints, and logging."""
self._storage_path = Path.home() / ".hive" / "agents" / "deep_research_agent"
self._storage_path.mkdir(parents=True, exist_ok=True)
storage_path = Path.home() / ".hive" / "agents" / "deep_research_agent"
storage_path.mkdir(parents=True, exist_ok=True)
self._event_bus = EventBus()
self._tool_registry = ToolRegistry()
mcp_config_path = Path(__file__).parent / "mcp_servers.json"
if mcp_config_path.exists():
self._tool_registry.load_mcp_config(mcp_config_path)
llm = LiteLLMProvider(
model=self.config.model,
api_key=self.config.api_key,
api_base=self.config.api_base,
)
llm = None
if not mock_mode:
llm = LiteLLMProvider(
model=self.config.model,
api_key=self.config.api_key,
api_base=self.config.api_base,
)
tool_executor = self._tool_registry.get_executor()
tools = list(self._tool_registry.get_tools().values())
self._graph = self._build_graph()
runtime = Runtime(storage_path)
self._executor = GraphExecutor(
runtime=runtime,
checkpoint_config = CheckpointConfig(
enabled=True,
checkpoint_on_node_start=False,
checkpoint_on_node_complete=True,
checkpoint_max_age_days=7,
async_checkpoint=True,
)
entry_point_specs = [
EntryPointSpec(
id="default",
name="Default",
entry_node=self.entry_node,
trigger_type="manual",
isolation_level="shared",
)
]
self._agent_runtime = create_agent_runtime(
graph=self._graph,
goal=self.goal,
storage_path=self._storage_path,
entry_points=entry_point_specs,
llm=llm,
tools=tools,
tool_executor=tool_executor,
event_bus=self._event_bus,
storage_path=storage_path,
loop_config=self._graph.loop_config,
checkpoint_config=checkpoint_config,
)
return self._executor
async def start(self) -> None:
"""Set up the agent (initialize executor and tools)."""
if self._executor is None:
self._setup()
async def start(self, mock_mode=False) -> None:
"""Set up and start the agent runtime."""
if self._agent_runtime is None:
self._setup(mock_mode=mock_mode)
if not self._agent_runtime.is_running:
await self._agent_runtime.start()
async def stop(self) -> None:
"""Clean up resources."""
self._executor = None
self._event_bus = None
"""Stop the agent runtime and clean up."""
if self._agent_runtime and self._agent_runtime.is_running:
await self._agent_runtime.stop()
self._agent_runtime = None
async def trigger_and_wait(
self,
entry_point: str,
input_data: dict,
entry_point: str = "default",
input_data: dict | None = None,
timeout: float | None = None,
session_state: dict | None = None,
) -> ExecutionResult | None:
"""Execute the graph and wait for completion."""
if self._executor is None:
if self._agent_runtime is None:
raise RuntimeError("Agent not started. Call start() first.")
if self._graph is None:
raise RuntimeError("Graph not built. Call start() first.")
return await self._executor.execute(
graph=self._graph,
goal=self.goal,
input_data=input_data,
return await self._agent_runtime.trigger_and_wait(
entry_point_id=entry_point,
input_data=input_data or {},
session_state=session_state,
)
async def run(self, context: dict, session_state=None) -> ExecutionResult:
async def run(
self, context: dict, mock_mode=False, session_state=None
) -> ExecutionResult:
"""Run the agent (convenience method for single execution)."""
await self.start()
await self.start(mock_mode=mock_mode)
try:
result = await self.trigger_and_wait(
"start", context, session_state=session_state
"default", context, session_state=session_state
)
return result or ExecutionResult(success=False, error="Execution timeout")
finally:
@@ -10,8 +10,13 @@ intake_node = NodeSpec(
description="Discuss the research topic with the user, clarify scope, and confirm direction",
node_type="event_loop",
client_facing=True,
max_node_visits=0,
input_keys=["topic"],
output_keys=["research_brief"],
success_criteria=(
"The research brief is specific and actionable: it states the topic, "
"the key questions to answer, the desired scope, and depth."
),
system_prompt="""\
You are a research intake specialist. The user wants to research a topic.
Have a brief conversation to clarify what they need.
@@ -23,8 +28,6 @@ Have a brief conversation to clarify what they need.
Keep it short. Don't over-ask.
After your message, call ask_user() to wait for the user's response.
**STEP 2 After the user confirms, call set_output:**
- set_output("research_brief", "A clear paragraph describing exactly what to research, \
what questions to answer, what scope to cover, and how deep to go.")
@@ -40,10 +43,14 @@ research_node = NodeSpec(
name="Research",
description="Search the web, fetch source content, and compile findings",
node_type="event_loop",
max_node_visits=3,
max_node_visits=0,
input_keys=["research_brief", "feedback"],
output_keys=["findings", "sources", "gaps"],
nullable_output_keys=["feedback"],
success_criteria=(
"Findings reference at least 3 distinct sources with URLs. "
"Key claims are substantiated by fetched content, not generated."
),
system_prompt="""\
You are a research agent. Given a research brief, find and analyze sources.
@@ -58,18 +65,19 @@ Work in phases:
and any contradictions between sources.
Important:
- Work in batches of 3-4 tool calls at a time to manage context
- Work in batches of 3-4 tool calls at a time never more than 10 per turn
- After each batch, assess whether you have enough material
- Prefer quality over quantity 5 good sources beat 15 thin ones
- Track which URL each finding comes from (you'll need citations later)
- Call set_output for each key in a SEPARATE turn (not in the same turn as other tool calls)
When done, use set_output:
When done, use set_output (one key at a time, separate turns):
- set_output("findings", "Structured summary: key findings with source URLs for each claim. \
Include themes, contradictions, and confidence levels.")
- set_output("sources", [{"url": "...", "title": "...", "summary": "..."}])
- set_output("gaps", "What aspects of the research brief are NOT well-covered yet, if any.")
""",
tools=["web_search", "web_scrape", "load_data", "save_data", "list_data_files"],
tools=["web_search", "web_scrape", "load_data", "save_data", "append_data", "list_data_files"],
)
# Node 3: Review (client-facing)
@@ -80,9 +88,13 @@ review_node = NodeSpec(
description="Present findings to user and decide whether to research more or write the report",
node_type="event_loop",
client_facing=True,
max_node_visits=3,
max_node_visits=0,
input_keys=["findings", "sources", "gaps", "research_brief"],
output_keys=["needs_more_research", "feedback"],
success_criteria=(
"The user has been presented with findings and has explicitly indicated "
"whether they want more research or are ready for the report."
),
system_prompt="""\
Present the research findings to the user clearly and concisely.
@@ -95,8 +107,6 @@ Present the research findings to the user clearly and concisely.
End by asking: Are they satisfied, or do they want deeper research? \
Should we proceed to writing the final report?
After your presentation, call ask_user() to wait for the user's response.
**STEP 2 After the user responds, call set_output:**
- set_output("needs_more_research", "true") if they want more
- set_output("needs_more_research", "false") if they're satisfied
@@ -113,52 +123,66 @@ report_node = NodeSpec(
description="Write a cited HTML report from the findings and present it to the user",
node_type="event_loop",
client_facing=True,
max_node_visits=0,
input_keys=["findings", "sources", "research_brief"],
output_keys=["delivery_status"],
output_keys=["delivery_status", "next_action"],
success_criteria=(
"An HTML report has been saved, the file link has been presented to the user, "
"and the user has indicated what they want to do next."
),
system_prompt="""\
Write a comprehensive research report as an HTML file and present it to the user.
Write a research report as an HTML file and present it to the user.
**STEP 1 Write the HTML report (tool calls, NO text to user yet):**
IMPORTANT: save_data requires TWO separate arguments: filename and data.
Call it like: save_data(filename="report.html", data="<html>...</html>")
Do NOT use _raw, do NOT nest arguments inside a JSON string.
1. Compose a complete, self-contained HTML document with embedded CSS styling.
Use a clean, readable design: max-width container, pleasant typography,
numbered citation links, a table of contents, and a references section.
**STEP 1 Write and save the HTML report (tool calls, NO text to user yet):**
Report structure inside the HTML:
- Title & date
- Executive Summary (2-3 paragraphs)
- Table of Contents
- Findings (organized by theme, with [n] citation links)
- Analysis (synthesis, implications, areas of debate)
- Conclusion (key takeaways, confidence assessment)
- References (numbered list with clickable URLs)
Build a clean HTML document. Keep the HTML concise aim for clarity over length.
Use minimal embedded CSS (a few lines of style, not a full framework).
Requirements:
- Every factual claim must cite its source with [n] notation
- Be objective present multiple viewpoints where sources disagree
- Distinguish well-supported conclusions from speculation
- Answer the original research questions from the brief
Report structure:
- Title & date
- Executive Summary (2-3 paragraphs)
- Key Findings (organized by theme, with [n] citation links)
- Analysis (synthesis, implications)
- Conclusion (key takeaways)
- References (numbered list with clickable URLs)
2. Save the HTML file:
save_data(filename="report.html", data=<your_html>)
Requirements:
- Every factual claim must cite its source with [n] notation
- Be objective present multiple viewpoints where sources disagree
- Answer the original research questions from the brief
3. Get the clickable link:
serve_file_to_user(filename="report.html", label="Research Report")
Save the HTML:
save_data(filename="report.html", data="<html>...</html>")
Then get the clickable link:
serve_file_to_user(filename="report.html", label="Research Report")
If save_data fails, simplify and shorten the HTML, then retry.
**STEP 2 Present the link to the user (text only, NO tool calls):**
Tell the user the report is ready and include the file:// URI from
serve_file_to_user so they can click it to open. Give a brief summary
of what the report covers. Ask if they have questions.
After presenting the link, call ask_user() to wait for the user's response.
of what the report covers. Ask if they have questions or want to continue.
**STEP 3 After the user responds:**
- Answer follow-up questions from the research material
- Call ask_user() again if they might have more questions
- When the user is satisfied: set_output("delivery_status", "completed")
- Answer any follow-up questions from the research material
- When the user is ready to move on, ask what they'd like to do next:
- Research a new topic?
- Dig deeper into the current topic?
- Then call set_output:
- set_output("delivery_status", "completed")
- set_output("next_action", "new_topic") if they want a new topic
- set_output("next_action", "more_research") if they want deeper research
""",
tools=["save_data", "serve_file_to_user", "load_data", "list_data_files"],
tools=[
"save_data", "append_data", "edit_data",
"serve_file_to_user", "load_data", "list_data_files",
],
)
__all__ = [
+31 -1
View File
@@ -945,6 +945,24 @@ else
echo -e "${YELLOW}--${NC}"
fi
echo -n " ⬡ codex CLI... "
if command -v codex > /dev/null 2>&1; then
CODEX_VERSION=$(codex --version 2>/dev/null | head -1 | grep -oE '[0-9]+\.[0-9]+\.[0-9]+' || echo "0.0.0")
# Compare version >= 0.101.0
CODEX_MAJOR=$(echo "$CODEX_VERSION" | cut -d. -f1)
CODEX_MINOR=$(echo "$CODEX_VERSION" | cut -d. -f2)
if [ "$CODEX_MAJOR" -gt 0 ] 2>/dev/null || { [ "$CODEX_MAJOR" -eq 0 ] && [ "$CODEX_MINOR" -ge 101 ]; } 2>/dev/null; then
echo -e "${GREEN}${CODEX_VERSION}${NC}"
CODEX_AVAILABLE=true
else
echo -e "${YELLOW}${CODEX_VERSION} (upgrade to 0.101.0+)${NC}"
CODEX_AVAILABLE=false
fi
else
echo -e "${YELLOW}--${NC}"
CODEX_AVAILABLE=false
fi
echo -n " ⬡ local settings... "
if [ -f "$SCRIPT_DIR/.claude/settings.local.json" ]; then
echo -e "${GREEN}ok${NC}"
@@ -1035,7 +1053,7 @@ if [ -n "$HIVE_CREDENTIAL_KEY" ]; then
echo ""
fi
echo -e "${BOLD}Build a New Agent:${NC}"
echo -e "${BOLD}Build a New Agent (Claude):${NC}"
echo ""
echo -e " 1. Open Claude Code in this directory:"
echo -e " ${CYAN}claude${NC}"
@@ -1046,6 +1064,18 @@ echo ""
echo -e " 3. Test an existing agent:"
echo -e " ${CYAN}/hive-test${NC}"
echo ""
# Show Codex instructions if available
if [ "$CODEX_AVAILABLE" = true ]; then
echo -e "${BOLD}Build a New Agent (Codex):${NC}"
echo ""
echo -e " Codex ${GREEN}${CODEX_VERSION}${NC} is available. To use it with Hive:"
echo -e " 1. Restart your terminal (or open a new one)"
echo -e " 2. Run: ${CYAN}codex${NC}"
echo -e " 3. Type: ${CYAN}use hive${NC}"
echo ""
fi
echo -e "${BOLD}Run an Agent:${NC}"
echo ""
echo -e " Launch the interactive dashboard to browse and run agents:"
+61
View File
@@ -0,0 +1,61 @@
#!/usr/bin/env bash
#
# setup-antigravity-mcp.sh - Write Antigravity/Claude MCP config with auto-detected paths
#
# Run from anywhere inside the hive repo. Generates ~/.gemini/antigravity/mcp_config.json
# based on .agent/mcp_config.json template, with absolute paths so the IDE can
# connect to agent-builder and tools MCP servers without manual path editing.
#
set -e
# Find repo root
REPO_ROOT=""
if git rev-parse --show-toplevel &>/dev/null; then
REPO_ROOT="$(git rev-parse --show-toplevel)"
elif [ -f ".agent/mcp_config.json" ]; then
REPO_ROOT="$(pwd)"
else
d="$(pwd)"
while [ -n "$d" ] && [ "$d" != "/" ]; do
[ -f "$d/.agent/mcp_config.json" ] && REPO_ROOT="$d" && break
d="$(dirname "$d")"
done
fi
if [ -z "$REPO_ROOT" ] || [ ! -d "$REPO_ROOT/core" ] || [ ! -d "$REPO_ROOT/tools" ]; then
echo "Error: Run this script from inside the hive repo (could not find repo root with core/ and tools/)." >&2
exit 1
fi
TEMPLATE="$REPO_ROOT/.agent/mcp_config.json"
if [ ! -f "$TEMPLATE" ]; then
echo "Error: Template not found at $TEMPLATE" >&2
exit 1
fi
CORE_DIR="$(cd "$REPO_ROOT/core" && pwd)"
TOOLS_DIR="$(cd "$REPO_ROOT/tools" && pwd)"
mkdir -p "$HOME/.gemini/antigravity"
# Generate config from template with absolute paths
# Replace relative "core" and "tools" with absolute paths in --directory args
sed -e "s|\"--directory\", \"core\"|\"--directory\", \"$CORE_DIR\"|g" \
-e "s|\"--directory\", \"tools\"|\"--directory\", \"$TOOLS_DIR\"|g" \
"$TEMPLATE" > "$HOME/.gemini/antigravity/mcp_config.json"
echo "Wrote $HOME/.gemini/antigravity/mcp_config.json (from $TEMPLATE)"
echo " core -> $CORE_DIR"
echo " tools -> $TOOLS_DIR"
if [ "$1" = "--claude" ]; then
mkdir -p "$HOME/.claude"
cp "$HOME/.gemini/antigravity/mcp_config.json" "$HOME/.claude/mcp.json"
echo "Wrote $HOME/.claude/mcp.json"
fi
echo ""
echo "Next: Restart Antigravity IDE so it loads the MCP config."
echo " Then open this repo; agent-builder and tools should appear."
echo ""
echo "For Claude Code, run: $0 --claude"
+4 -4
View File
@@ -16,10 +16,10 @@ uv pip install -e "tools[dev]"
## Environment Setup
Some tools require API keys to function. Copy the example file and add your credentials:
Some tools require API keys to function. Credentials are managed through the encrypted credential store at `~/.hive/credentials`, which is configured automatically during initial setup:
```bash
cp .env.example .env
./quickstart.sh
```
| Variable | Required For | Get Key |
@@ -31,14 +31,14 @@ cp .env.example .env
> **Note:** `web_search` supports multiple providers. Set either Brave OR Google credentials. Brave is preferred for backward compatibility.
Alternatively, export as environment variables:
Alternatively, export credentials as environment variables:
```bash
export ANTHROPIC_API_KEY=your-key-here
export BRAVE_SEARCH_API_KEY=your-key-here
```
See [.env.example](.env.example) for details.
See the [credentials module](src/aden_tools/credentials/) for details on how credentials are resolved.
## Quick Start
+2
View File
@@ -109,6 +109,8 @@ def register_all_tools(
"execute_command_tool",
"load_data",
"save_data",
"append_data",
"edit_data",
"list_data_files",
"serve_file_to_user",
"csv_read",
@@ -237,3 +237,115 @@ def register_tools(mcp: FastMCP) -> None:
return {"files": files}
except Exception as e:
return {"error": f"Failed to list data files: {str(e)}"}
@mcp.tool()
def append_data(filename: str, data: str, data_dir: str) -> dict:
"""
Purpose
Append data to the end of an existing file, or create it if it
doesn't exist yet.
When to use
Build large files incrementally instead of writing everything in
one save_data call. For example, write an HTML skeleton first,
then append each section separately to stay within token limits.
Rules & Constraints
filename must be a simple name like 'report.html' no paths or '..'
Args:
filename: Simple filename to append to. No paths or '..'.
data: The string data to append.
data_dir: Absolute path to the data directory.
Returns:
Dict with success status, new total size, and bytes appended
"""
if not filename or ".." in filename or "/" in filename or "\\" in filename:
return {"error": "Invalid filename. Use simple names like 'report.html'"}
if not data_dir:
return {"error": "data_dir is required"}
try:
dir_path = Path(data_dir)
dir_path.mkdir(parents=True, exist_ok=True)
path = dir_path / filename
with open(path, "a", encoding="utf-8") as f:
f.write(data)
appended_bytes = len(data.encode("utf-8"))
total_bytes = path.stat().st_size
return {
"success": True,
"filename": filename,
"size_bytes": total_bytes,
"appended_bytes": appended_bytes,
}
except Exception as e:
return {"error": f"Failed to append data: {str(e)}"}
@mcp.tool()
def edit_data(filename: str, old_text: str, new_text: str, data_dir: str) -> dict:
"""
Purpose
Find and replace a specific text segment in an existing file.
Works like a surgical diff only the matched portion changes.
When to use
Update a section of a previously saved file without rewriting
the entire content. For example, replace a placeholder in an
HTML report or fix a specific paragraph.
Rules & Constraints
old_text must appear exactly once in the file. If it appears
zero times or more than once, the edit is rejected with an
error message.
Args:
filename: The file to edit. Must exist in data_dir.
old_text: The exact text to find (must match exactly once).
new_text: The replacement text.
data_dir: Absolute path to the data directory.
Returns:
Dict with success status and updated file size
"""
if not filename or ".." in filename or "/" in filename or "\\" in filename:
return {"error": "Invalid filename. Use simple names like 'report.html'"}
if not data_dir:
return {"error": "data_dir is required"}
try:
path = Path(data_dir) / filename
if not path.exists():
return {"error": f"File not found: {filename}"}
content = path.read_text(encoding="utf-8")
count = content.count(old_text)
if count == 0:
return {
"error": (
"old_text not found in the file. "
"Make sure you're matching the exact text, "
"including whitespace and newlines."
)
}
if count > 1:
return {
"error": (
f"old_text found {count} times — it must be unique. "
"Include more surrounding context to match exactly once."
)
}
updated = content.replace(old_text, new_text, 1)
path.write_text(updated, encoding="utf-8")
return {
"success": True,
"filename": filename,
"size_bytes": len(updated.encode("utf-8")),
"replacements": 1,
}
except Exception as e:
return {"error": f"Failed to edit data: {str(e)}"}