Compare commits

...

120 Commits

Author SHA1 Message Date
bryan 4ad0d0e077 fix: align the credential functions to be the same 2026-03-09 10:14:21 -07:00
Timothy @aden b55a77634b Delete .github/ISSUE_TEMPLATE/link-discord.yml 2026-03-08 19:44:48 -07:00
Bryan @ Aden f7db603922 Merge pull request #6048 from aden-hive/fix/draft-email-tool
(micro-fix): draft email tool
2026-03-09 02:26:58 +00:00
bryan b4a47a12ff fix: linter formatting 2026-03-08 19:26:06 -07:00
bryan 2228851b16 feat: added reply in thread to draft email tool 2026-03-08 19:24:38 -07:00
Bryan @ Aden ed0a211906 Merge pull request #6047 from aden-hive/fix/reply-email-tool
(micro-fix): reply email tool
2026-03-09 02:00:03 +00:00
bryan 63744ddaef fix: update to pass linter 2026-03-08 18:58:50 -07:00
bryan 82331acb77 feat: update reply email tool to contain the email thread in the body 2026-03-08 18:53:53 -07:00
Timothy @aden b96bbcaa72 Merge pull request #6044 from Amdev-5/fix/e501-coder-tools-server-6043
fix: E501 line too long in coder_tools_server.py
2026-03-08 17:39:59 -07:00
Timothy edfa49bf7a fix: ci test 2026-03-08 17:29:36 -07:00
RichardTang-Aden eb9e4ed23c Merge pull request #5955 from akshajtiwari/ci-first-issue
CI: add uv caching, improve PR requirements workflow
2026-03-08 17:17:22 -07:00
Amdev-5 fed9e90271 fix: E501 line too long in coder_tools_server.py
Break ternary expression across multiple lines to satisfy
the 100-char line length limit.

Fixes #6043
2026-03-09 05:35:45 +05:30
Timothy ca565ae664 fix: validate agent package for orphaned nodes 2026-03-07 09:29:48 -08:00
Timothy 42ce97e0fc fix: agent package validation - no orphaned nodes 2026-03-07 08:47:01 -08:00
Akshaj Tiwari bea17b5f79 simplify label creation logic by assuming label pre-exists 2026-03-07 19:02:04 +05:30
Akshaj Tiwari ab0d5ce8d3 change pr.updated_at to pr.created_at for the grace period check 2026-03-07 18:58:36 +05:30
Akshaj Tiwari b374d5119a resolving the ci.yml issues by using enable-cache instead of manual caching 2026-03-07 18:49:17 +05:30
RichardTang-Aden 9129b4a42e Merge pull request #5975 from aden-hive/feat/queen-responsibility
Release / Create Release (push) Waiting to run
feat: separate queen responsibility by phases
2026-03-06 19:21:53 -08:00
Richard Tang e906646d49 Merge remote-tracking branch 'origin/feature/thinking-hook' into feat/queen-responsibility 2026-03-06 19:15:29 -08:00
Timothy 086a532521 fix: skip queen judge, turn off aggressive compaction 2026-03-06 19:14:25 -08:00
Richard Tang 19dd40ed3a chore: ruff lint 2026-03-06 19:11:53 -08:00
Richard Tang 196f3d645f feat: building phase prompts improvements 2026-03-06 18:59:12 -08:00
Richard Tang 80fd91d175 feat: building phase prompt optimization 2026-03-06 18:53:33 -08:00
Richard Tang 695410f880 Merge remote-tracking branch 'origin/feature/thinking-hook' into feat/queen-responsibility 2026-03-06 18:39:21 -08:00
Timothy 009c62dac6 chore: re-organize hooks 2026-03-06 18:38:20 -08:00
Richard Tang 27c8904341 feat: limit the tool description in simple mode 2026-03-06 18:37:08 -08:00
Richard Tang ddfce58071 feat: simplify building prompts 2026-03-06 18:29:44 -08:00
Richard Tang 1bb850bdbe Merge remote-tracking branch 'origin/feature/thinking-hook' into feat/queen-responsibility 2026-03-06 17:59:33 -08:00
Richard Tang 5019633ba3 fix: remove wrong tool examples 2026-03-06 17:58:50 -08:00
Timothy @aden b0fd8b83f0 Merge pull request #5896 from VasuBansal7576/codex/pr-minimax-single
fix: add minimax provider mapping and stream fallback
2026-03-06 17:54:41 -08:00
Richard Tang 2dc58eeeb0 fix: add missing gcu prompts 2026-03-06 17:48:37 -08:00
Richard Tang 50ab55ded5 feat: loading improvement 2026-03-06 17:35:17 -08:00
Timothy 9f656577a2 fix: turn signal edge case 2026-03-06 17:18:54 -08:00
Richard Tang 5c87b4b194 Merge remote-tracking branch 'origin/feature/thinking-hook' into feat/queen-responsibility 2026-03-06 17:10:25 -08:00
Timothy 7f866b24a1 Merge branch 'feat/queen-responsibility' into feature/thinking-hook 2026-03-06 17:07:12 -08:00
Timothy 5eb623e931 fix: back to back compaction edge case 2026-03-06 17:06:53 -08:00
Richard Tang 5583896429 fix: add back gcu instruction 2026-03-06 17:05:47 -08:00
Timothy 3a8321b975 chore: fix compaction reference 2026-03-06 16:59:44 -08:00
Richard Tang 8443ec87a6 fix: output key that terminated the queen 2026-03-06 16:55:10 -08:00
Richard Tang 1e06e87f4c feat: improve validation 2026-03-06 16:34:28 -08:00
Richard Tang e2558e3f95 fix: llm friendly input 2026-03-06 16:04:58 -08:00
Richard Tang 1344d3bb8e feat: add node parameters and fix problems for initialize_agent_package 2026-03-06 15:54:27 -08:00
Timothy dc7ec6c058 Merge branch 'feat/queen-responsibility' into feature/thinking-hook 2026-03-06 15:35:17 -08:00
Richard Tang 0ba781609a refactor: remove unused builder functions 2026-03-06 15:32:45 -08:00
Richard Tang 5ce230b0a6 refactor: move the coder tools 2026-03-06 14:56:19 -08:00
Timothy ff1527a77a fix: thinking hook max token 2026-03-06 14:53:32 -08:00
Timothy 252dea0bc3 Merge branch 'feat/queen-responsibility' into feature/thinking-hook 2026-03-06 14:37:02 -08:00
Timothy 126cbe529f feat: queen thinking hook 2026-03-06 14:30:10 -08:00
Richard Tang 207a0a0ca5 feat: fix for mcp tools and templates 2026-03-06 14:27:20 -08:00
Richard Tang d9f502173b feat: queen building improvements 2026-03-06 13:51:49 -08:00
Richard Tang f090ce4d5a fix: duplicated session calls 2026-03-06 12:28:37 -08:00
Richard Tang 1f7efcd940 feat: queen prompt optimization 2026-03-06 12:27:08 -08:00
Akshaj Tiwari fbbbaadd1e remove workflow_dispatch trigger from PR requirements workflows(forgot this commit) 2026-03-07 00:59:56 +05:30
Richard Tang 4de140a170 Merge remote-tracking branch 'origin/main' into feat/queen-responsibility 2026-03-06 11:17:03 -08:00
Richard Tang ed5cfb93a4 fix: catch Cannot write to closing transport error 2026-03-06 11:15:13 -08:00
Richard Tang 891bf08477 feat: re-organized queen prompt 2026-03-06 11:13:15 -08:00
Akshaj Tiwari 37651e534f add PR requirements warning and enforcement workflow and remove the workflow dispatch trigger 2026-03-07 00:39:35 +05:30
Akshaj Tiwari df63c3e781 add the pr requirement changes and remove the workflow dispatch option from ci.yml(tested) 2026-03-06 23:44:45 +05:30
Akshaj Tiwari 838da4a16e style: fix ruff import ordering 2026-03-06 22:57:14 +05:30
Akshaj Tiwari e916d573f6 adding workflow dispatch for testing 2026-03-06 22:51:19 +05:30
Richard Tang 08d51bb377 refactor: remove the old coderagent for TUI 2026-03-06 09:20:49 -08:00
Akshaj Tiwari fa5ebf19a4 first commit with the cache and working directory attributes 2026-03-06 22:49:06 +05:30
Richard Tang 17de0efcaf feat: rename escalation tool 2026-03-06 08:48:16 -08:00
Timothy 4099603a91 chore: lint 2026-03-06 08:21:40 -08:00
Vasu Bansal 988a58c1b7 fix: harden legacy agent.json loading error handling 2026-03-06 20:31:59 +05:30
Vasu Bansal cbc7ec3a32 fix: resolve aden client import duplication after rebase 2026-03-06 16:13:49 +05:30
Vasu Bansal 07d4bf8044 fix: resolve ruff import-order failure in aden client 2026-03-06 16:10:02 +05:30
Vasu Bansal e302e93ac9 chore: retrigger ci 2026-03-06 16:10:02 +05:30
Vasu Bansal 80f5a363d2 fix: address minimax review feedback in quickstart and provider wiring 2026-03-06 16:10:02 +05:30
Vasu Bansal 7b5b6d2c51 fix: add minimax provider mapping and stream fallback 2026-03-06 16:10:02 +05:30
Timothy 0b1fd72e49 chore: lint 2026-03-05 21:28:17 -08:00
Timothy 353f5c31a2 chore: lint 2026-03-05 21:24:21 -08:00
Richard Tang ad40b049ae feat: update the escalate tool 2026-03-05 20:53:33 -08:00
Richard Tang 42c9a11b1a feat: remove the terminal node for queen 2026-03-05 20:42:28 -08:00
Richard Tang c3fb1885c3 feat: make terminal node validation warning 2026-03-05 20:20:49 -08:00
Richard Tang bb413bad1f feat: prompts to allow user to override 2026-03-05 19:50:11 -08:00
Timothy afef3cb66a Merge branch 'fix/output-cleaner' into feat/queen-responsibility 2026-03-05 19:48:28 -08:00
Timothy b1f3d931cd fix: remove output cleaner 2026-03-05 19:48:13 -08:00
Richard Tang 297b24e061 feat: instruction for running phase to handle escalation 2026-03-05 19:47:13 -08:00
Richard Tang 775a0fa511 chore: prompt debug tool 2026-03-05 19:35:52 -08:00
Richard Tang 77abed89b9 feat: queen identity 2026-03-05 19:29:04 -08:00
Richard Tang e2aeb72d49 feat: queen identity 2026-03-05 19:25:59 -08:00
Richard Tang 2b440f84f0 feat: add the termination session back to queen 2026-03-05 19:10:49 -08:00
Richard Tang 8fd66a12c5 Merge remote-tracking branch 'origin/feat/queen-responsibility' into feat/queen-responsibility 2026-03-05 19:03:27 -08:00
Richard Tang f23d5a3ff5 feat: add terminal node back in graph 2026-03-05 19:02:41 -08:00
Timothy be8ec867e5 fix: better stall detection 2026-03-05 18:51:36 -08:00
Timothy b2ba42e541 Merge branch 'feat/queen-responsibility' into feat/worker-progressive-disclosure 2026-03-05 15:26:09 -08:00
Richard Tang 94d0038e03 chore: remove duplicates in anti-patterns 2026-03-05 14:54:46 -08:00
Timothy e1bf300e3c feat: progressive disclosure of runtime data 2026-03-05 14:44:02 -08:00
Timothy @aden f36add83f0 Merge pull request #5901 from aden-hive/fix/bom-safe-json-load
fix(micro-fix): bom safe json loading
2026-03-05 14:29:37 -08:00
Timothy Zhang a57d58e8d4 fix: bom safe json loading 2026-03-05 14:27:15 -08:00
Richard Tang c6b922e831 feat: condense framework guide 2026-03-05 14:26:03 -08:00
Richard Tang 71d12a7904 feat: condensed queen building prompts 2026-03-05 14:18:03 -08:00
Richard Tang 24c25d408c feat: remove unused prompts 2026-03-05 14:13:02 -08:00
Richard Tang 2e99fc9fe5 feat: change graph guidelines 2026-03-05 14:08:52 -08:00
Richard Tang c1f066b8ba feat: add gcu and validation in initialize_agent_package 2026-03-05 14:01:43 -08:00
Richard Tang e7a6074800 fix: prevent duplicate session creation when starting from home 2026-03-05 13:44:39 -08:00
Richard Tang 719942d29a fix: bug for multiple session calls 2026-03-05 13:04:17 -08:00
Richard Tang 190450a2b2 refactor: skip judge logic improvement 2026-03-05 12:38:18 -08:00
Richard Tang 44d609b719 feat: allow judge to wait queen input 2026-03-05 12:33:27 -08:00
Richard Tang 8c9892f9f6 feat: re-organized the tools for list mcp tools 2026-03-05 12:06:57 -08:00
Bryan @ Aden 85c204a442 Merge pull request #5403 from jackthepunished/feat/telegram-tool-expansion
feat(tools): expand Telegram tool with message management, media, and chat info operations
2026-03-05 19:48:05 +00:00
jackthepunished 653d24df9d fix: address review — use POST for getChat, return raw API responses
- Change get_chat client method from httpx.get+params to httpx.post+json
  to avoid URL-encoding issues with @username chat IDs
- Remove {"success": True} normalization from delete_message,
  send_chat_action, pin_message, and unpin_message MCP tools;
  return raw Telegram API response consistently
- Update corresponding test mocks and assertions to match
2026-03-05 20:40:46 +03:00
jackthepunished b687fa9e94 feat(tools): expand Telegram tool with message management, media, and chat info operations
Add 8 new operations to the Telegram Bot tool, bringing it from 2 to 10
operations. This covers message lifecycle (edit, delete, forward), media
(send photo), chat info (get chat), UX (typing indicators), and pin
management — making the tool practical for agent workflows beyond
fire-and-forget messaging.

New operations:
- telegram_edit_message: edit previously sent messages
- telegram_delete_message: delete messages
- telegram_forward_message: forward between chats
- telegram_send_photo: send photos via URL or file_id
- telegram_send_chat_action: show typing/uploading indicators
- telegram_get_chat: retrieve chat metadata
- telegram_pin_message: pin important messages
- telegram_unpin_message: unpin stale messages

Also includes input validation for chat actions, credential spec updates,
central registry wiring, and 31 new tests (52 total).

Closes #4808
2026-03-05 20:40:45 +03:00
Richard Tang 6ade844722 feat: escalation implementation 2026-03-04 19:59:02 -08:00
Richard Tang b9a3c67fea feat: dynamically load the system prompt in the context 2026-03-04 19:40:20 -08:00
Richard Tang 219bbe00fc feat: move guardrail to validation 2026-03-04 19:11:37 -08:00
Richard Tang ef6af5404f refactor: new builder flow 2026-03-04 18:42:20 -08:00
Richard Tang b7d57f3d49 feat: fix run_command and remove agent search 2026-03-04 18:04:42 -08:00
Richard Tang 58c892babb Merge remote-tracking branch 'origin/main' into feat/queen-responsibility 2026-03-04 18:03:03 -08:00
Richard Tang 9e2004e33b Merge remote-tracking branch 'origin/main' into feat/queen-responsibility 2026-03-04 17:30:09 -08:00
Richard Tang b8be3056ed feat: list agent tool instruction 2026-03-04 17:03:45 -08:00
Richard Tang 39029b82d6 refactor: remove the coding agent check in quickstart 2026-03-04 16:27:33 -08:00
Richard Tang 232890b970 docs: remove the reference of the old agent skills 2026-03-04 16:23:30 -08:00
Richard Tang 13a8e28ae2 refactor: remove all old unused skills 2026-03-04 16:18:28 -08:00
Richard Tang 34a44aa83c chore: update remaining reference for queen mode 2026-03-04 16:11:28 -08:00
Richard Tang 8468c45dc2 refactor: rename the queen mode to queen phase for clarity 2026-03-04 16:10:15 -08:00
Richard Tang d2c3649566 refactor: re-organize the agent initialization tools 2026-03-04 16:06:00 -08:00
Richard Tang 71226d9625 fix: logger schema mismatch 2026-03-04 10:55:28 -08:00
Richard Tang 9102328d1c feat: make LLM logger by default on 2026-03-04 10:30:17 -08:00
153 changed files with 6913 additions and 15014 deletions
-9
View File
@@ -1,9 +0,0 @@
{
"mcpServers": {
"agent-builder": {
"command": "uv",
"args": ["run", "--directory", "core", "-m", "framework.mcp.agent_builder_server"],
"disabled": false
}
}
}
-1
View File
@@ -1 +0,0 @@
../../.claude/skills/hive
-1
View File
@@ -1 +0,0 @@
../../.claude/skills/hive-concepts
-1
View File
@@ -1 +0,0 @@
../../.claude/skills/hive-create
-1
View File
@@ -1 +0,0 @@
../../.claude/skills/hive-credentials
-1
View File
@@ -1 +0,0 @@
../../.claude/skills/hive-patterns
-1
View File
@@ -1 +0,0 @@
../../.claude/skills/hive-test
-5
View File
@@ -1,5 +0,0 @@
---
description: hive-concepts
---
use hive-concepts skill
-5
View File
@@ -1,5 +0,0 @@
---
description: hive-create
---
use hive-create skill
-5
View File
@@ -1,5 +0,0 @@
---
description: hive-credentials
---
use hive-credentials skill
-5
View File
@@ -1,5 +0,0 @@
---
description: hive-patterns
---
use hive-patterns skill
-5
View File
@@ -1,5 +0,0 @@
---
description: hive-test
---
use hive-test skill
-5
View File
@@ -1,5 +0,0 @@
---
description: hive
---
use hive skill
-1
View File
@@ -1 +0,0 @@
../../.claude/skills/hive
-1
View File
@@ -1 +0,0 @@
../../.claude/skills/hive-concepts
-1
View File
@@ -1 +0,0 @@
../../.claude/skills/hive-create
-1
View File
@@ -1 +0,0 @@
../../.claude/skills/hive-credentials
-1
View File
@@ -1 +0,0 @@
../../.claude/skills/hive-patterns
-1
View File
@@ -1 +0,0 @@
../../.claude/skills/hive-test
+2 -20
View File
@@ -1,34 +1,16 @@
{
"permissions": {
"allow": [
"mcp__agent-builder__create_session",
"mcp__agent-builder__set_goal",
"mcp__agent-builder__add_node",
"mcp__agent-builder__add_edge",
"mcp__agent-builder__configure_loop",
"mcp__agent-builder__add_mcp_server",
"mcp__agent-builder__validate_graph",
"mcp__agent-builder__export_graph",
"mcp__agent-builder__load_session_by_id",
"Bash(git status:*)",
"Bash(gh run view:*)",
"Bash(uv run:*)",
"Bash(env:*)",
"mcp__agent-builder__test_node",
"mcp__agent-builder__list_mcp_tools",
"Bash(python -m py_compile:*)",
"Bash(python -m pytest:*)",
"Bash(source:*)",
"mcp__agent-builder__update_node",
"mcp__agent-builder__check_missing_credentials",
"mcp__agent-builder__list_stored_credentials",
"Bash(find:*)",
"mcp__agent-builder__run_tests",
"Bash(PYTHONPATH=core:exports:tools/src uv run pytest:*)",
"mcp__agent-builder__list_agent_sessions",
"mcp__agent-builder__generate_constraint_tests",
"mcp__agent-builder__generate_success_tests"
"Bash(PYTHONPATH=core:exports:tools/src uv run pytest:*)"
]
},
"enabledMcpjsonServers": ["agent-builder", "tools"]
"enabledMcpjsonServers": ["tools"]
}
-399
View File
@@ -1,399 +0,0 @@
---
name: hive-concepts
description: Core concepts for goal-driven agents - architecture, node types (event_loop, function), tool discovery, and workflow overview. Use when starting agent development or need to understand agent fundamentals.
license: Apache-2.0
metadata:
author: hive
version: "2.0"
type: foundational
part_of: hive
---
# Building Agents - Core Concepts
Foundational knowledge for building goal-driven agents as Python packages.
## Architecture: Python Services (Not JSON Configs)
Agents are built as Python packages:
```
exports/my_agent/
├── __init__.py # Package exports
├── __main__.py # CLI (run, info, validate, shell)
├── agent.py # Graph construction (goal, edges, agent class)
├── nodes/__init__.py # Node definitions (NodeSpec)
├── config.py # Runtime config
└── README.md # Documentation
```
**Key Principle: Agent is visible and editable during build**
- Files created immediately as components are approved
- User can watch files grow in their editor
- No session state - just direct file writes
- No "export" step - agent is ready when build completes
## Core Concepts
### Goal
Success criteria and constraints (written to agent.py)
```python
goal = Goal(
id="research-goal",
name="Technical Research Agent",
description="Research technical topics thoroughly",
success_criteria=[
SuccessCriterion(
id="completeness",
description="Cover all aspects of topic",
metric="coverage_score",
target=">=0.9",
weight=0.4,
),
# 3-5 success criteria total
],
constraints=[
Constraint(
id="accuracy",
description="All information must be verified",
constraint_type="hard",
category="quality",
),
# 1-5 constraints total
],
)
```
### Node
Unit of work (written to nodes/__init__.py)
**Node Types:**
- `event_loop` — Multi-turn streaming loop with tool execution and judge-based evaluation. Works with or without tools.
- `function` — Deterministic Python operations. No LLM involved.
```python
search_node = NodeSpec(
id="search-web",
name="Search Web",
description="Search for information and extract results",
node_type="event_loop",
input_keys=["query"],
output_keys=["search_results"],
system_prompt="Search the web for: {query}. Use the web_search tool to find results, then call set_output to store them.",
tools=["web_search"],
)
```
**NodeSpec Fields for Event Loop Nodes:**
| Field | Default | Description |
|-------|---------|-------------|
| `client_facing` | `False` | If True, streams output to user and blocks for input between turns |
| `nullable_output_keys` | `[]` | Output keys that may remain unset (for mutually exclusive outputs) |
| `max_node_visits` | `1` | Max times this node executes per run. Set >1 for feedback loop targets |
### Edge
Connection between nodes (written to agent.py)
**Edge Conditions:**
- `on_success` — Proceed if node succeeds (most common)
- `on_failure` — Handle errors
- `always` — Always proceed
- `conditional` — Based on expression evaluating node output
**Edge Priority:**
Priority controls evaluation order when multiple edges leave the same node. Higher priority edges are evaluated first. Use negative priority for feedback edges (edges that loop back to earlier nodes).
```python
# Forward edge (evaluated first)
EdgeSpec(
id="review-to-campaign",
source="review",
target="campaign-builder",
condition=EdgeCondition.CONDITIONAL,
condition_expr="output.get('approved_contacts') is not None",
priority=1,
)
# Feedback edge (evaluated after forward edges)
EdgeSpec(
id="review-feedback",
source="review",
target="extractor",
condition=EdgeCondition.CONDITIONAL,
condition_expr="output.get('redo_extraction') is not None",
priority=-1,
)
```
### Client-Facing Nodes
For multi-turn conversations with the user, set `client_facing=True` on a node. The node will:
- Stream its LLM output directly to the end user
- Block for user input between conversational turns
- Resume when new input is injected via `inject_event()`
```python
intake_node = NodeSpec(
id="intake",
name="Intake",
description="Gather requirements from the user",
node_type="event_loop",
client_facing=True,
input_keys=[],
output_keys=["repo_url", "project_url"],
system_prompt="You are the intake agent. Ask the user for the repo URL and project URL.",
)
```
> **Legacy Note:** The old `pause_nodes` / `entry_points` pattern still works but `client_facing=True` is preferred for new agents.
**STEP 1 / STEP 2 Prompt Pattern:** For client-facing nodes, structure the system prompt with two explicit phases:
```python
system_prompt="""\
**STEP 1 — Respond to the user (text only, NO tool calls):**
[Present information, ask questions, etc.]
**STEP 2 — After the user responds, call set_output:**
[Call set_output with the structured outputs]
"""
```
This prevents the LLM from calling `set_output` prematurely before the user has had a chance to respond.
### Node Design: Fewer, Richer Nodes
Prefer fewer nodes that do more work over many thin single-purpose nodes:
- **Bad**: 8 thin nodes (parse query → search → fetch → evaluate → synthesize → write → check → save)
- **Good**: 4 rich nodes (intake → research → review → report)
Why: Each node boundary requires serializing outputs and passing context. Fewer nodes means the LLM retains full context of its work within the node. A research node that searches, fetches, and analyzes keeps all the source material in its conversation history.
### nullable_output_keys for Cross-Edge Inputs
When a node receives inputs that only arrive on certain edges (e.g., `feedback` only comes from a review → research feedback loop, not from intake → research), mark those keys as `nullable_output_keys`:
```python
research_node = NodeSpec(
id="research",
input_keys=["research_brief", "feedback"],
nullable_output_keys=["feedback"], # Not present on first visit
max_node_visits=3,
...
)
```
## Event Loop Architecture Concepts
### How EventLoopNode Works
An event loop node runs a multi-turn loop:
1. LLM receives system prompt + conversation history
2. LLM responds (text and/or tool calls)
3. Tool calls are executed, results added to conversation
4. Judge evaluates: ACCEPT (exit loop), RETRY (loop again), or ESCALATE
5. Repeat until judge ACCEPTs or max_iterations reached
### EventLoopNode Runtime
EventLoopNodes are **auto-created** by `GraphExecutor` at runtime. You do NOT need to manually register them. Both `GraphExecutor` (direct) and `AgentRuntime` / `create_agent_runtime()` handle event_loop nodes automatically.
```python
# Direct execution — executor auto-creates EventLoopNodes
from framework.graph.executor import GraphExecutor
from framework.runtime.core import Runtime
runtime = Runtime(storage_path)
executor = GraphExecutor(
runtime=runtime,
llm=llm,
tools=tools,
tool_executor=tool_executor,
storage_path=storage_path,
)
result = await executor.execute(graph=graph, goal=goal, input_data=input_data)
# TUI execution — AgentRuntime also works
from framework.runtime.agent_runtime import create_agent_runtime
runtime = create_agent_runtime(
graph=graph, goal=goal, storage_path=storage_path,
entry_points=[...], llm=llm, tools=tools, tool_executor=tool_executor,
)
```
### set_output
Nodes produce structured outputs by calling `set_output(key, value)` — a synthetic tool injected by the framework. When the LLM calls `set_output`, the value is stored in the output accumulator and made available to downstream nodes via shared memory.
`set_output` is NOT a real tool — it is excluded from `real_tool_results`. For client-facing nodes, this means a turn where the LLM only calls `set_output` (no other tools) is treated as a conversational boundary and will block for user input.
### JudgeProtocol
**The judge is the SOLE mechanism for acceptance decisions.** Do not add ad-hoc framework gating, output rollback, or premature rejection logic. If the LLM calls `set_output` too early, fix it with better prompts or a custom judge — not framework-level guards.
The judge controls when a node's loop exits:
- **Implicit judge** (default, no judge configured): ACCEPTs when the LLM finishes with no tool calls and all required output keys are set
- **SchemaJudge**: Validates outputs against a Pydantic model
- **Custom judges**: Implement `evaluate(context) -> JudgeVerdict`
### LoopConfig
Controls loop behavior:
- `max_iterations` (default 50) — prevents infinite loops
- `max_tool_calls_per_turn` (default 10) — limits tool calls per LLM response
- `tool_call_overflow_margin` (default 0.5) — wiggle room before discarding extra tool calls (50% means hard cutoff at 150% of limit)
- `stall_detection_threshold` (default 3) — detects repeated identical responses
- `max_history_tokens` (default 32000) — triggers conversation compaction
### Data Tools (Spillover Management)
When tool results exceed the context window, the framework automatically saves them to a spillover directory and truncates with a hint. Nodes that produce or consume large data should include the data tools:
- `save_data(filename, data)` — Write data to a file in the data directory
- `load_data(filename, offset=0, limit=50)` — Read data with line-based pagination
- `list_data_files()` — List available data files
- `serve_file_to_user(filename, label="")` — Get a clickable file:// URI for the user
Note: `data_dir` is a framework-injected context parameter — the LLM never sees or passes it. `GraphExecutor.execute()` sets it per-execution via `contextvars`, so data tools and spillover always share the same session-scoped directory.
These are real MCP tools (not synthetic). Add them to nodes that handle large tool results:
```python
research_node = NodeSpec(
...
tools=["web_search", "web_scrape", "load_data", "save_data", "list_data_files"],
)
```
### Fan-Out / Fan-In
Multiple ON_SUCCESS edges from the same source create parallel execution. All branches run concurrently via `asyncio.gather()`. Parallel event_loop nodes must have disjoint `output_keys`.
### max_node_visits
Controls how many times a node can execute in one graph run. Default is 1. Set higher for nodes that are targets of feedback edges (review-reject loops). Set 0 for unlimited (guarded by max_steps).
## Tool Discovery & Validation
**CRITICAL:** Before adding a node with tools, you MUST verify the tools exist.
Tools are provided by MCP servers. Never assume a tool exists - always discover dynamically.
### Step 1: Register MCP Server (if not already done)
```python
mcp__agent-builder__add_mcp_server(
name="tools",
transport="stdio",
command="python",
args='["mcp_server.py", "--stdio"]',
cwd="../tools"
)
```
### Step 2: Discover Available Tools
```python
# List all tools from all registered servers
mcp__agent-builder__list_mcp_tools()
# Or list tools from a specific server
mcp__agent-builder__list_mcp_tools(server_name="tools")
```
### Step 3: Validate Before Adding Nodes
Before writing a node with `tools=[...]`:
1. Call `list_mcp_tools()` to get available tools
2. Check each tool in your node exists in the response
3. If a tool doesn't exist:
- **DO NOT proceed** with the node
- Inform the user: "The tool 'X' is not available. Available tools are: ..."
- Ask if they want to use an alternative or proceed without the tool
### Tool Validation Anti-Patterns
- **Never assume a tool exists** - always call `list_mcp_tools()` first
- **Never write a node with unverified tools** - validate before writing
- **Never silently drop tools** - if a tool doesn't exist, inform the user
- **Never guess tool names** - use exact names from discovery response
## Workflow Overview: Incremental File Construction
```
1. CREATE PACKAGE → mkdir + write skeletons
2. DEFINE GOAL → Write to agent.py + config.py
3. FOR EACH NODE:
- Propose design (event_loop for LLM work, function for deterministic)
- User approves
- Write to nodes/__init__.py IMMEDIATELY
- (Optional) Validate with test_node
4. CONNECT EDGES → Update agent.py
- Use priority for feedback edges (negative priority)
- (Optional) Validate with validate_graph
5. FINALIZE → Write agent class to agent.py
6. DONE - Agent ready at exports/my_agent/
```
**Files written immediately. MCP tools optional for validation/testing bookkeeping.**
## When to Use This Skill
Use hive-concepts when:
- Starting a new agent project and need to understand fundamentals
- Need to understand agent architecture before building
- Want to validate tool availability before proceeding
- Learning about node types, edges, and graph execution
**Next Steps:**
- Ready to build? → Use `hive-create` skill
- Need patterns and examples? → Use `hive-patterns` skill
## MCP Tools for Validation
After writing files, optionally use MCP tools for validation:
**test_node** - Validate node configuration with mock inputs
```python
mcp__agent-builder__test_node(
node_id="search-web",
test_input='{"query": "test query"}',
mock_llm_response='{"results": "mock output"}'
)
```
**validate_graph** - Check graph structure
```python
mcp__agent-builder__validate_graph()
# Returns: unreachable nodes, missing connections, event_loop validation, etc.
```
**configure_loop** - Set event loop parameters
```python
mcp__agent-builder__configure_loop(
max_iterations=50,
max_tool_calls_per_turn=10,
stall_detection_threshold=3,
max_history_tokens=32000
)
```
**Key Point:** Files are written FIRST. MCP tools are for validation only.
## Related Skills
- **hive-create** - Step-by-step building process
- **hive-patterns** - Best practices: judges, feedback edges, fan-out, context management
- **hive** - Complete workflow orchestrator
- **hive-test** - Test and validate completed agents
File diff suppressed because it is too large Load Diff
@@ -1,24 +0,0 @@
"""
Deep Research Agent - Interactive, rigorous research with TUI conversation.
Research any topic through multi-source web search, quality evaluation,
and synthesis. Features client-facing TUI interaction at key checkpoints
for user guidance and iterative deepening.
"""
from .agent import DeepResearchAgent, default_agent, goal, nodes, edges
from .config import RuntimeConfig, AgentMetadata, default_config, metadata
__version__ = "1.0.0"
__all__ = [
"DeepResearchAgent",
"default_agent",
"goal",
"nodes",
"edges",
"RuntimeConfig",
"AgentMetadata",
"default_config",
"metadata",
]
@@ -1,241 +0,0 @@
"""
CLI entry point for Deep Research Agent.
Uses AgentRuntime for multi-entrypoint support with HITL pause/resume.
"""
import asyncio
import json
import logging
import sys
import click
from .agent import default_agent, DeepResearchAgent
def setup_logging(verbose=False, debug=False):
"""Configure logging for execution visibility."""
if debug:
level, fmt = logging.DEBUG, "%(asctime)s %(name)s: %(message)s"
elif verbose:
level, fmt = logging.INFO, "%(message)s"
else:
level, fmt = logging.WARNING, "%(levelname)s: %(message)s"
logging.basicConfig(level=level, format=fmt, stream=sys.stderr)
logging.getLogger("framework").setLevel(level)
@click.group()
@click.version_option(version="1.0.0")
def cli():
"""Deep Research Agent - Interactive, rigorous research with TUI conversation."""
pass
@cli.command()
@click.option("--topic", "-t", type=str, required=True, help="Research topic")
@click.option("--mock", is_flag=True, help="Run in mock mode")
@click.option("--quiet", "-q", is_flag=True, help="Only output result JSON")
@click.option("--verbose", "-v", is_flag=True, help="Show execution details")
@click.option("--debug", is_flag=True, help="Show debug logging")
def run(topic, mock, quiet, verbose, debug):
"""Execute research on a topic."""
if not quiet:
setup_logging(verbose=verbose, debug=debug)
context = {"topic": topic}
result = asyncio.run(default_agent.run(context, mock_mode=mock))
output_data = {
"success": result.success,
"steps_executed": result.steps_executed,
"output": result.output,
}
if result.error:
output_data["error"] = result.error
click.echo(json.dumps(output_data, indent=2, default=str))
sys.exit(0 if result.success else 1)
@cli.command()
@click.option("--mock", is_flag=True, help="Run in mock mode")
@click.option("--verbose", "-v", is_flag=True, help="Show execution details")
@click.option("--debug", is_flag=True, help="Show debug logging")
def tui(mock, verbose, debug):
"""Launch the TUI dashboard for interactive research."""
setup_logging(verbose=verbose, debug=debug)
try:
from framework.tui.app import AdenTUI
except ImportError:
click.echo(
"TUI requires the 'textual' package. Install with: pip install textual"
)
sys.exit(1)
from pathlib import Path
from framework.llm import LiteLLMProvider
from framework.runner.tool_registry import ToolRegistry
from framework.runtime.agent_runtime import create_agent_runtime
from framework.runtime.event_bus import EventBus
from framework.runtime.execution_stream import EntryPointSpec
async def run_with_tui():
agent = DeepResearchAgent()
# Build graph and tools
agent._event_bus = EventBus()
agent._tool_registry = ToolRegistry()
storage_path = Path.home() / ".hive" / "agents" / "deep_research_agent"
storage_path.mkdir(parents=True, exist_ok=True)
mcp_config_path = Path(__file__).parent / "mcp_servers.json"
if mcp_config_path.exists():
agent._tool_registry.load_mcp_config(mcp_config_path)
llm = None
if not mock:
llm = LiteLLMProvider(
model=agent.config.model,
api_key=agent.config.api_key,
api_base=agent.config.api_base,
)
tools = list(agent._tool_registry.get_tools().values())
tool_executor = agent._tool_registry.get_executor()
graph = agent._build_graph()
runtime = create_agent_runtime(
graph=graph,
goal=agent.goal,
storage_path=storage_path,
entry_points=[
EntryPointSpec(
id="start",
name="Start Research",
entry_node="intake",
trigger_type="manual",
isolation_level="isolated",
),
],
llm=llm,
tools=tools,
tool_executor=tool_executor,
)
await runtime.start()
try:
app = AdenTUI(runtime)
await app.run_async()
finally:
await runtime.stop()
asyncio.run(run_with_tui())
@cli.command()
@click.option("--json", "output_json", is_flag=True)
def info(output_json):
"""Show agent information."""
info_data = default_agent.info()
if output_json:
click.echo(json.dumps(info_data, indent=2))
else:
click.echo(f"Agent: {info_data['name']}")
click.echo(f"Version: {info_data['version']}")
click.echo(f"Description: {info_data['description']}")
click.echo(f"\nNodes: {', '.join(info_data['nodes'])}")
click.echo(f"Client-facing: {', '.join(info_data['client_facing_nodes'])}")
click.echo(f"Entry: {info_data['entry_node']}")
click.echo(f"Terminal: {', '.join(info_data['terminal_nodes'])}")
@cli.command()
def validate():
"""Validate agent structure."""
validation = default_agent.validate()
if validation["valid"]:
click.echo("Agent is valid")
if validation["warnings"]:
for warning in validation["warnings"]:
click.echo(f" WARNING: {warning}")
else:
click.echo("Agent has errors:")
for error in validation["errors"]:
click.echo(f" ERROR: {error}")
sys.exit(0 if validation["valid"] else 1)
@cli.command()
@click.option("--verbose", "-v", is_flag=True)
def shell(verbose):
"""Interactive research session (CLI, no TUI)."""
asyncio.run(_interactive_shell(verbose))
async def _interactive_shell(verbose=False):
"""Async interactive shell."""
setup_logging(verbose=verbose)
click.echo("=== Deep Research Agent ===")
click.echo("Enter a topic to research (or 'quit' to exit):\n")
agent = DeepResearchAgent()
await agent.start()
try:
while True:
try:
topic = await asyncio.get_event_loop().run_in_executor(
None, input, "Topic> "
)
if topic.lower() in ["quit", "exit", "q"]:
click.echo("Goodbye!")
break
if not topic.strip():
continue
click.echo("\nResearching...\n")
result = await agent.trigger_and_wait("start", {"topic": topic})
if result is None:
click.echo("\n[Execution timed out]\n")
continue
if result.success:
output = result.output
if "report_content" in output:
click.echo("\n--- Report ---\n")
click.echo(output["report_content"])
click.echo("\n")
if "references" in output:
click.echo("--- References ---\n")
for ref in output.get("references", []):
click.echo(
f" [{ref.get('number', '?')}] {ref.get('title', '')} - {ref.get('url', '')}"
)
click.echo("\n")
else:
click.echo(f"\nResearch failed: {result.error}\n")
except KeyboardInterrupt:
click.echo("\nGoodbye!")
break
except Exception as e:
click.echo(f"Error: {e}", err=True)
import traceback
traceback.print_exc()
finally:
await agent.stop()
if __name__ == "__main__":
cli()
@@ -1,358 +0,0 @@
"""Agent graph construction for Deep Research Agent."""
from pathlib import Path
from framework.graph import EdgeSpec, EdgeCondition, Goal, SuccessCriterion, Constraint
from framework.graph.edge import GraphSpec
from framework.graph.executor import ExecutionResult
from framework.graph.checkpoint_config import CheckpointConfig
from framework.llm import LiteLLMProvider
from framework.runner.tool_registry import ToolRegistry
from framework.runtime.agent_runtime import AgentRuntime, create_agent_runtime
from framework.runtime.execution_stream import EntryPointSpec
from .config import default_config, metadata
from .nodes import (
intake_node,
research_node,
review_node,
report_node,
)
# Goal definition
goal = Goal(
id="rigorous-interactive-research",
name="Rigorous Interactive Research",
description=(
"Research any topic by searching diverse sources, analyzing findings, "
"and producing a cited report — with user checkpoints to guide direction."
),
success_criteria=[
SuccessCriterion(
id="source-diversity",
description="Use multiple diverse, authoritative sources",
metric="source_count",
target=">=5",
weight=0.25,
),
SuccessCriterion(
id="citation-coverage",
description="Every factual claim in the report cites its source",
metric="citation_coverage",
target="100%",
weight=0.25,
),
SuccessCriterion(
id="user-satisfaction",
description="User reviews findings before report generation",
metric="user_approval",
target="true",
weight=0.25,
),
SuccessCriterion(
id="report-completeness",
description="Final report answers the original research questions",
metric="question_coverage",
target="90%",
weight=0.25,
),
],
constraints=[
Constraint(
id="no-hallucination",
description="Only include information found in fetched sources",
constraint_type="quality",
category="accuracy",
),
Constraint(
id="source-attribution",
description="Every claim must cite its source with a numbered reference",
constraint_type="quality",
category="accuracy",
),
Constraint(
id="user-checkpoint",
description="Present findings to the user before writing the final report",
constraint_type="functional",
category="interaction",
),
],
)
# Node list
nodes = [
intake_node,
research_node,
review_node,
report_node,
]
# Edge definitions
edges = [
# intake -> research
EdgeSpec(
id="intake-to-research",
source="intake",
target="research",
condition=EdgeCondition.ON_SUCCESS,
priority=1,
),
# research -> review
EdgeSpec(
id="research-to-review",
source="research",
target="review",
condition=EdgeCondition.ON_SUCCESS,
priority=1,
),
# review -> research (feedback loop)
EdgeSpec(
id="review-to-research-feedback",
source="review",
target="research",
condition=EdgeCondition.CONDITIONAL,
condition_expr="needs_more_research == True",
priority=1,
),
# review -> report (user satisfied)
EdgeSpec(
id="review-to-report",
source="review",
target="report",
condition=EdgeCondition.CONDITIONAL,
condition_expr="needs_more_research == False",
priority=2,
),
# report -> research (user wants deeper research on current topic)
EdgeSpec(
id="report-to-research",
source="report",
target="research",
condition=EdgeCondition.CONDITIONAL,
condition_expr="str(next_action).lower() == 'more_research'",
priority=2,
),
# report -> intake (user wants a new topic — default when not more_research)
EdgeSpec(
id="report-to-intake",
source="report",
target="intake",
condition=EdgeCondition.CONDITIONAL,
condition_expr="str(next_action).lower() != 'more_research'",
priority=1,
),
]
# Graph configuration
entry_node = "intake"
entry_points = {"start": "intake"}
pause_nodes = []
terminal_nodes = []
class DeepResearchAgent:
"""
Deep Research Agent 4-node pipeline with user checkpoints.
Flow: intake -> research -> review -> report
^ |
+-- feedback loop (if user wants more)
Uses AgentRuntime for proper session management:
- Session-scoped storage (sessions/{session_id}/)
- Checkpointing for resume capability
- Runtime logging
- Data folder for save_data/load_data
"""
def __init__(self, config=None):
self.config = config or default_config
self.goal = goal
self.nodes = nodes
self.edges = edges
self.entry_node = entry_node
self.entry_points = entry_points
self.pause_nodes = pause_nodes
self.terminal_nodes = terminal_nodes
self._graph: GraphSpec | None = None
self._agent_runtime: AgentRuntime | None = None
self._tool_registry: ToolRegistry | None = None
self._storage_path: Path | None = None
def _build_graph(self) -> GraphSpec:
"""Build the GraphSpec."""
return GraphSpec(
id="deep-research-agent-graph",
goal_id=self.goal.id,
version="1.0.0",
entry_node=self.entry_node,
entry_points=self.entry_points,
terminal_nodes=self.terminal_nodes,
pause_nodes=self.pause_nodes,
nodes=self.nodes,
edges=self.edges,
default_model=self.config.model,
max_tokens=self.config.max_tokens,
loop_config={
"max_iterations": 100,
"max_tool_calls_per_turn": 30,
"max_history_tokens": 32000,
},
conversation_mode="continuous",
identity_prompt=(
"You are a rigorous research agent. You search for information "
"from diverse, authoritative sources, analyze findings critically, "
"and produce well-cited reports. You never fabricate information — "
"every claim must trace back to a source you actually retrieved."
),
)
def _setup(self, mock_mode=False) -> None:
"""Set up the agent runtime with sessions, checkpoints, and logging."""
self._storage_path = Path.home() / ".hive" / "agents" / "deep_research_agent"
self._storage_path.mkdir(parents=True, exist_ok=True)
self._tool_registry = ToolRegistry()
mcp_config_path = Path(__file__).parent / "mcp_servers.json"
if mcp_config_path.exists():
self._tool_registry.load_mcp_config(mcp_config_path)
llm = None
if not mock_mode:
llm = LiteLLMProvider(
model=self.config.model,
api_key=self.config.api_key,
api_base=self.config.api_base,
)
tool_executor = self._tool_registry.get_executor()
tools = list(self._tool_registry.get_tools().values())
self._graph = self._build_graph()
checkpoint_config = CheckpointConfig(
enabled=True,
checkpoint_on_node_start=False,
checkpoint_on_node_complete=True,
checkpoint_max_age_days=7,
async_checkpoint=True,
)
entry_point_specs = [
EntryPointSpec(
id="default",
name="Default",
entry_node=self.entry_node,
trigger_type="manual",
isolation_level="shared",
)
]
self._agent_runtime = create_agent_runtime(
graph=self._graph,
goal=self.goal,
storage_path=self._storage_path,
entry_points=entry_point_specs,
llm=llm,
tools=tools,
tool_executor=tool_executor,
checkpoint_config=checkpoint_config,
)
async def start(self, mock_mode=False) -> None:
"""Set up and start the agent runtime."""
if self._agent_runtime is None:
self._setup(mock_mode=mock_mode)
if not self._agent_runtime.is_running:
await self._agent_runtime.start()
async def stop(self) -> None:
"""Stop the agent runtime and clean up."""
if self._agent_runtime and self._agent_runtime.is_running:
await self._agent_runtime.stop()
self._agent_runtime = None
async def trigger_and_wait(
self,
entry_point: str = "default",
input_data: dict | None = None,
timeout: float | None = None,
session_state: dict | None = None,
) -> ExecutionResult | None:
"""Execute the graph and wait for completion."""
if self._agent_runtime is None:
raise RuntimeError("Agent not started. Call start() first.")
return await self._agent_runtime.trigger_and_wait(
entry_point_id=entry_point,
input_data=input_data or {},
session_state=session_state,
)
async def run(
self, context: dict, mock_mode=False, session_state=None
) -> ExecutionResult:
"""Run the agent (convenience method for single execution)."""
await self.start(mock_mode=mock_mode)
try:
result = await self.trigger_and_wait(
"default", context, session_state=session_state
)
return result or ExecutionResult(success=False, error="Execution timeout")
finally:
await self.stop()
def info(self):
"""Get agent information."""
return {
"name": metadata.name,
"version": metadata.version,
"description": metadata.description,
"goal": {
"name": self.goal.name,
"description": self.goal.description,
},
"nodes": [n.id for n in self.nodes],
"edges": [e.id for e in self.edges],
"entry_node": self.entry_node,
"entry_points": self.entry_points,
"pause_nodes": self.pause_nodes,
"terminal_nodes": self.terminal_nodes,
"client_facing_nodes": [n.id for n in self.nodes if n.client_facing],
}
def validate(self):
"""Validate agent structure."""
errors = []
warnings = []
node_ids = {node.id for node in self.nodes}
for edge in self.edges:
if edge.source not in node_ids:
errors.append(f"Edge {edge.id}: source '{edge.source}' not found")
if edge.target not in node_ids:
errors.append(f"Edge {edge.id}: target '{edge.target}' not found")
if self.entry_node not in node_ids:
errors.append(f"Entry node '{self.entry_node}' not found")
for terminal in self.terminal_nodes:
if terminal not in node_ids:
errors.append(f"Terminal node '{terminal}' not found")
for ep_id, node_id in self.entry_points.items():
if node_id not in node_ids:
errors.append(
f"Entry point '{ep_id}' references unknown node '{node_id}'"
)
return {
"valid": len(errors) == 0,
"errors": errors,
"warnings": warnings,
}
# Create default instance
default_agent = DeepResearchAgent()
@@ -1,26 +0,0 @@
"""Runtime configuration."""
from dataclasses import dataclass
from framework.config import RuntimeConfig
default_config = RuntimeConfig()
@dataclass
class AgentMetadata:
name: str = "Deep Research Agent"
version: str = "1.0.0"
description: str = (
"Interactive research agent that rigorously investigates topics through "
"multi-source search, quality evaluation, and synthesis - with TUI conversation "
"at key checkpoints for user guidance and feedback."
)
intro_message: str = (
"Hi! I'm your deep research assistant. Tell me a topic and I'll investigate it "
"thoroughly — searching multiple sources, evaluating quality, and synthesizing "
"a comprehensive report. What would you like me to research?"
)
metadata = AgentMetadata()
@@ -1,9 +0,0 @@
{
"hive-tools": {
"transport": "stdio",
"command": "uv",
"args": ["run", "python", "mcp_server.py", "--stdio"],
"cwd": "../../tools",
"description": "Hive tools MCP server providing web_search, web_scrape, and write_to_file"
}
}
@@ -1,213 +0,0 @@
"""Node definitions for Deep Research Agent."""
from framework.graph import NodeSpec
# Node 1: Intake (client-facing)
# Brief conversation to clarify what the user wants researched.
intake_node = NodeSpec(
id="intake",
name="Research Intake",
description="Discuss the research topic with the user, clarify scope, and confirm direction",
node_type="event_loop",
client_facing=True,
max_node_visits=0,
input_keys=["topic"],
output_keys=["research_brief"],
success_criteria=(
"The research brief is specific and actionable: it states the topic, "
"the key questions to answer, the desired scope, and depth."
),
system_prompt="""\
You are a research intake specialist. The user wants to research a topic.
Have a brief conversation to clarify what they need.
**STEP 1 Read and respond (text only, NO tool calls):**
1. Read the topic provided
2. If it's vague, ask 1-2 clarifying questions (scope, angle, depth)
3. If it's already clear, confirm your understanding and ask the user to confirm
Keep it short. Don't over-ask.
**STEP 2 After the user confirms, call set_output:**
- set_output("research_brief", "A clear paragraph describing exactly what to research, \
what questions to answer, what scope to cover, and how deep to go.")
""",
tools=[],
)
# Node 2: Research
# The workhorse — searches the web, fetches content, analyzes sources.
# One node with both tools avoids the context-passing overhead of 5 separate nodes.
research_node = NodeSpec(
id="research",
name="Research",
description="Search the web, fetch source content, and compile findings",
node_type="event_loop",
max_node_visits=0,
input_keys=["research_brief", "feedback"],
output_keys=["findings", "sources", "gaps"],
nullable_output_keys=["feedback"],
success_criteria=(
"Findings reference at least 3 distinct sources with URLs. "
"Key claims are substantiated by fetched content, not generated."
),
system_prompt="""\
You are a research agent. Given a research brief, find and analyze sources.
If feedback is provided, this is a follow-up round focus on the gaps identified.
Work in phases:
1. **Search**: Use web_search with 3-5 diverse queries covering different angles.
Prioritize authoritative sources (.edu, .gov, established publications).
2. **Fetch**: Use web_scrape on the most promising URLs (aim for 5-8 sources).
Skip URLs that fail. Extract the substantive content.
3. **Analyze**: Review what you've collected. Identify key findings, themes,
and any contradictions between sources.
Important:
- Work in batches of 3-4 tool calls at a time never more than 10 per turn
- After each batch, assess whether you have enough material
- Prefer quality over quantity 5 good sources beat 15 thin ones
- Track which URL each finding comes from (you'll need citations later)
- Call set_output for each key in a SEPARATE turn (not in the same turn as other tool calls)
Context management:
- Your tool results are automatically saved to files. After compaction, the file \
references remain in the conversation use load_data() to recover any content you need.
- Use append_data('research_notes.md', ...) to maintain a running log of key findings \
as you go. This survives compaction and helps the report node produce a detailed report.
When done, use set_output (one key at a time, separate turns):
- set_output("findings", "Structured summary: key findings with source URLs for each claim. \
Include themes, contradictions, and confidence levels.")
- set_output("sources", [{"url": "...", "title": "...", "summary": "..."}])
- set_output("gaps", "What aspects of the research brief are NOT well-covered yet, if any.")
""",
tools=[
"web_search",
"web_scrape",
"load_data",
"save_data",
"append_data",
"list_data_files",
],
)
# Node 3: Review (client-facing)
# Shows the user what was found and asks whether to dig deeper or proceed.
review_node = NodeSpec(
id="review",
name="Review Findings",
description="Present findings to user and decide whether to research more or write the report",
node_type="event_loop",
client_facing=True,
max_node_visits=0,
input_keys=["findings", "sources", "gaps", "research_brief"],
output_keys=["needs_more_research", "feedback"],
success_criteria=(
"The user has been presented with findings and has explicitly indicated "
"whether they want more research or are ready for the report."
),
system_prompt="""\
Present the research findings to the user clearly and concisely.
**STEP 1 Present (your first message, text only, NO tool calls):**
1. **Summary** (2-3 sentences of what was found)
2. **Key Findings** (bulleted, with confidence levels)
3. **Sources Used** (count and quality assessment)
4. **Gaps** (what's still unclear or under-covered)
End by asking: Are they satisfied, or do they want deeper research? \
Should we proceed to writing the final report?
**STEP 2 After the user responds, call set_output:**
- set_output("needs_more_research", "true") if they want more
- set_output("needs_more_research", "false") if they're satisfied
- set_output("feedback", "What the user wants explored further, or empty string")
""",
tools=[],
)
# Node 4: Report (client-facing)
# Writes an HTML report, serves the link to the user, and answers follow-ups.
report_node = NodeSpec(
id="report",
name="Write & Deliver Report",
description="Write a cited HTML report from the findings and present it to the user",
node_type="event_loop",
client_facing=True,
max_node_visits=0,
input_keys=["findings", "sources", "research_brief"],
output_keys=["delivery_status", "next_action"],
success_criteria=(
"An HTML report has been saved, the file link has been presented to the user, "
"and the user has indicated what they want to do next."
),
system_prompt="""\
Write a research report as an HTML file and present it to the user.
IMPORTANT: save_data requires TWO separate arguments: filename and data.
Call it like: save_data(filename="report.html", data="<html>...</html>")
Do NOT use _raw, do NOT nest arguments inside a JSON string.
**STEP 1 Write and save the HTML report (tool calls, NO text to user yet):**
Build a clean HTML document. Keep the HTML concise aim for clarity over length.
Use minimal embedded CSS (a few lines of style, not a full framework).
Report structure:
- Title & date
- Executive Summary (2-3 paragraphs)
- Key Findings (organized by theme, with [n] citation links)
- Analysis (synthesis, implications)
- Conclusion (key takeaways)
- References (numbered list with clickable URLs)
Requirements:
- Every factual claim must cite its source with [n] notation
- Be objective present multiple viewpoints where sources disagree
- Answer the original research questions from the brief
- If findings appear incomplete or summarized, call list_data_files() and load_data() \
to access the detailed source material from the research phase. The research node's \
tool results and research_notes.md contain the full data.
Save the HTML:
save_data(filename="report.html", data="<html>...</html>")
Then get the clickable link:
serve_file_to_user(filename="report.html", label="Research Report")
If save_data fails, simplify and shorten the HTML, then retry.
**STEP 2 Present the link to the user (text only, NO tool calls):**
Tell the user the report is ready and include the file:// URI from
serve_file_to_user so they can click it to open. Give a brief summary
of what the report covers. Ask if they have questions or want to continue.
**STEP 3 After the user responds:**
- Answer any follow-up questions from the research material
- When the user is ready to move on, ask what they'd like to do next:
- Research a new topic?
- Dig deeper into the current topic?
- Then call set_output:
- set_output("delivery_status", "completed")
- set_output("next_action", "new_topic") if they want a new topic
- set_output("next_action", "more_research") if they want deeper research
""",
tools=[
"save_data",
"append_data",
"edit_data",
"serve_file_to_user",
"load_data",
"list_data_files",
],
)
__all__ = [
"intake_node",
"research_node",
"review_node",
"report_node",
]
-640
View File
@@ -1,640 +0,0 @@
---
name: hive-credentials
description: Set up and install credentials for an agent. Detects missing credentials from agent config, collects them from the user, and stores them securely in the local encrypted store at ~/.hive/credentials.
license: Apache-2.0
metadata:
author: hive
version: "2.3"
type: utility
---
# Setup Credentials
Interactive credential setup for agents with multiple authentication options. Detects what's missing, offers auth method choices, validates with health checks, and stores credentials securely.
## When to Use
- Before running or testing an agent for the first time
- When `AgentRunner.run()` fails with "missing required credentials"
- When a user asks to configure credentials for an agent
- After building a new agent that uses tools requiring API keys
## Workflow
### Step 1: Identify the Agent
Determine which agent needs credentials. The user will either:
- Name the agent directly (e.g., "set up credentials for hubspot-agent")
- Have an agent directory open (check `exports/` for agent dirs)
- Be working on an agent in the current session
Locate the agent's directory under `exports/{agent_name}/`.
### Step 2: Detect Missing Credentials
Use the `check_missing_credentials` MCP tool to detect what the agent needs and what's already configured. This tool loads the agent, inspects its required tools and node types, maps them to credentials via `CREDENTIAL_SPECS`, and checks both the encrypted store and environment variables.
```
check_missing_credentials(agent_path="exports/{agent_name}")
```
The tool returns a JSON response:
```json
{
"agent": "exports/{agent_name}",
"missing": [
{
"credential_name": "brave_search",
"env_var": "BRAVE_SEARCH_API_KEY",
"description": "Brave Search API key for web search",
"help_url": "https://brave.com/search/api/",
"tools": ["web_search"]
}
],
"available": [
{
"credential_name": "anthropic",
"env_var": "ANTHROPIC_API_KEY",
"source": "encrypted_store"
}
],
"total_missing": 1,
"ready": false
}
```
**If `ready` is true (nothing missing):** Report all credentials as configured and skip Steps 3-5. Example:
```
All required credentials are already configured:
✓ anthropic (ANTHROPIC_API_KEY)
✓ brave_search (BRAVE_SEARCH_API_KEY)
Your agent is ready to run!
```
**If credentials are missing:** Continue to Step 3 with the `missing` list.
### Step 3: Present Auth Options for Each Missing Credential
For each missing credential, check what authentication methods are available:
```python
from aden_tools.credentials import CREDENTIAL_SPECS
spec = CREDENTIAL_SPECS.get("hubspot")
if spec:
# Determine available auth options
auth_options = []
if spec.aden_supported:
auth_options.append("aden")
if spec.direct_api_key_supported:
auth_options.append("direct")
auth_options.append("custom") # Always available
# Get setup info
setup_info = {
"env_var": spec.env_var,
"description": spec.description,
"help_url": spec.help_url,
"api_key_instructions": spec.api_key_instructions,
}
```
Present the available options using AskUserQuestion:
```
Choose how to configure HUBSPOT_ACCESS_TOKEN:
1) Aden Platform (OAuth) (Recommended)
Secure OAuth2 flow via hive.adenhq.com
- Quick setup with automatic token refresh
- No need to manage API keys manually
2) Direct API Key
Enter your own API key manually
- Requires creating a HubSpot Private App
- Full control over scopes and permissions
3) Local Credential Setup (Advanced)
Programmatic configuration for CI/CD
- For automated deployments
- Requires manual API calls
```
### Step 4: Execute Auth Flow Based on User Choice
#### Prerequisite: Ensure HIVE_CREDENTIAL_KEY Is Available
Before storing any credentials, verify `HIVE_CREDENTIAL_KEY` is set (needed to encrypt/decrypt the local store). Check both the current session and shell config:
```bash
# Check current session
printenv HIVE_CREDENTIAL_KEY > /dev/null 2>&1 && echo "session: set" || echo "session: not set"
# Check shell config files
for f in ~/.zshrc ~/.bashrc ~/.profile; do [ -f "$f" ] && grep -q 'HIVE_CREDENTIAL_KEY' "$f" && echo "$f"; done
```
- **In current session** — proceed to store credentials
- **In shell config but NOT in current session** — run `source ~/.zshrc` (or `~/.bashrc`) first, then proceed
- **Not set anywhere** — `EncryptedFileStorage` will auto-generate one. After storing, tell the user to persist it: `export HIVE_CREDENTIAL_KEY="{generated_key}"` in their shell profile
> **⚠️ IMPORTANT: After adding `HIVE_CREDENTIAL_KEY` to the user's shell config, always display:**
> ```
> ⚠️ Environment variables were added to your shell config.
> Open a NEW TERMINAL for them to take effect outside this session.
> ```
#### Option 1: Aden Platform (OAuth)
This is the recommended flow for supported integrations (HubSpot, etc.).
**How Aden OAuth Works:**
The ADEN_API_KEY represents a user who has already completed OAuth authorization on Aden's platform. When users sign up and connect integrations on Aden, those OAuth tokens are stored server-side. Having an ADEN_API_KEY means:
1. User has an Aden account
2. User has already authorized integrations (HubSpot, etc.) via OAuth on Aden
3. We just need to sync those credentials down to the local credential store
**4.1a. Check for ADEN_API_KEY**
```python
import os
aden_key = os.environ.get("ADEN_API_KEY")
```
If not set, guide user to get one from Aden (this is where they do OAuth):
```python
from aden_tools.credentials import open_browser, get_aden_setup_url
# Open browser to Aden - user will sign up and connect integrations there
url = get_aden_setup_url() # https://hive.adenhq.com
success, msg = open_browser(url)
print("Please sign in to Aden and connect your integrations (HubSpot, etc.).")
print("Once done, copy your API key and return here.")
```
Ask user to provide the ADEN_API_KEY they received.
**4.1b. Save ADEN_API_KEY to Shell Config**
With user approval, persist ADEN_API_KEY to their shell config:
```python
from aden_tools.credentials import (
detect_shell,
add_env_var_to_shell_config,
get_shell_source_command,
)
shell_type = detect_shell() # 'bash', 'zsh', or 'unknown'
# Ask user for approval before modifying shell config
# If approved:
success, config_path = add_env_var_to_shell_config(
"ADEN_API_KEY",
user_provided_key,
comment="Aden Platform (OAuth) API key"
)
if success:
source_cmd = get_shell_source_command()
print(f"Saved to {config_path}")
print(f"Run: {source_cmd}")
```
> **⚠️ IMPORTANT: After adding `ADEN_API_KEY` to the user's shell config, always display:**
> ```
> ⚠️ Environment variables were added to your shell config.
> Open a NEW TERMINAL for them to take effect outside this session.
> ```
Also save to `~/.hive/configuration.json` for the framework:
```python
import json
from pathlib import Path
config_path = Path.home() / ".hive" / "configuration.json"
config = json.loads(config_path.read_text()) if config_path.exists() else {}
config["aden"] = {
"api_key_configured": True,
"api_url": "https://api.adenhq.com"
}
config_path.parent.mkdir(parents=True, exist_ok=True)
config_path.write_text(json.dumps(config, indent=2))
```
**4.1c. Sync Credentials from Aden Server**
Since the user has already authorized integrations on Aden, use the one-liner factory method:
```python
from core.framework.credentials import CredentialStore
# This single call handles everything:
# - Creates encrypted local storage at ~/.hive/credentials
# - Configures Aden client from ADEN_API_KEY env var
# - Syncs all credentials from Aden server automatically
store = CredentialStore.with_aden_sync(
base_url="https://api.adenhq.com",
auto_sync=True, # Syncs on creation
)
# Check what was synced
synced = store.list_credentials()
print(f"Synced credentials: {synced}")
# If the required credential wasn't synced, the user hasn't authorized it on Aden yet
if "hubspot" not in synced:
print("HubSpot not found in your Aden account.")
print("Please visit https://hive.adenhq.com to connect HubSpot, then try again.")
```
For more control over the sync process:
```python
from core.framework.credentials import CredentialStore
from core.framework.credentials.aden import (
AdenCredentialClient,
AdenClientConfig,
AdenSyncProvider,
)
# Create client (API key loaded from ADEN_API_KEY env var)
client = AdenCredentialClient(AdenClientConfig(
base_url="https://api.adenhq.com",
))
# Create provider and store
provider = AdenSyncProvider(client=client)
store = CredentialStore.with_encrypted_storage()
# Manual sync
synced_count = provider.sync_all(store)
print(f"Synced {synced_count} credentials from Aden")
```
**4.1d. Run Health Check**
```python
from aden_tools.credentials import check_credential_health
# Get the token from the store
cred = store.get_credential("hubspot")
token = cred.keys["access_token"].value.get_secret_value()
result = check_credential_health("hubspot", token)
if result.valid:
print("HubSpot credentials validated successfully!")
else:
print(f"Validation failed: {result.message}")
# Offer to retry the OAuth flow
```
#### Option 2: Direct API Key
For users who prefer manual API key management.
**4.2a. Show Setup Instructions**
```python
from aden_tools.credentials import CREDENTIAL_SPECS
spec = CREDENTIAL_SPECS.get("hubspot")
if spec and spec.api_key_instructions:
print(spec.api_key_instructions)
# Output:
# To get a HubSpot Private App token:
# 1. Go to HubSpot Settings > Integrations > Private Apps
# 2. Click "Create a private app"
# 3. Name your app (e.g., "Hive Agent")
# ...
if spec and spec.help_url:
print(f"More info: {spec.help_url}")
```
**4.2b. Collect API Key from User**
Use AskUserQuestion to securely collect the API key:
```
Please provide your HubSpot access token:
(This will be stored securely in ~/.hive/credentials)
```
**4.2c. Run Health Check Before Storing**
```python
from aden_tools.credentials import check_credential_health
result = check_credential_health("hubspot", user_provided_token)
if not result.valid:
print(f"Warning: {result.message}")
# Ask user if they want to:
# 1. Try a different token
# 2. Continue anyway (not recommended)
```
**4.2d. Store in Local Encrypted Store**
```python
from core.framework.credentials import CredentialStore, CredentialObject, CredentialKey
from pydantic import SecretStr
store = CredentialStore.with_encrypted_storage()
cred = CredentialObject(
id="hubspot",
name="HubSpot Access Token",
keys={
"access_token": CredentialKey(
name="access_token",
value=SecretStr(user_provided_token),
)
},
)
store.save_credential(cred)
```
**4.2e. Export to Current Session**
```bash
export HUBSPOT_ACCESS_TOKEN="the-value"
```
#### Option 3: Local Credential Setup (Advanced)
For programmatic/CI/CD setups.
**4.3a. Show Documentation**
```
For advanced credential management, you can use the CredentialStore API directly:
from core.framework.credentials import CredentialStore, CredentialObject, CredentialKey
from pydantic import SecretStr
store = CredentialStore.with_encrypted_storage()
cred = CredentialObject(
id="hubspot",
name="HubSpot Access Token",
keys={"access_token": CredentialKey(name="access_token", value=SecretStr("..."))}
)
store.save_credential(cred)
For CI/CD environments:
- Set HIVE_CREDENTIAL_KEY for encryption
- Pre-populate ~/.hive/credentials programmatically
- Or use environment variables directly (HUBSPOT_ACCESS_TOKEN)
Documentation: See core/framework/credentials/README.md
```
### Step 5: Record Configuration Method
Track which auth method was used for each credential in `~/.hive/configuration.json`:
```python
import json
from pathlib import Path
from datetime import datetime
config_path = Path.home() / ".hive" / "configuration.json"
config = json.loads(config_path.read_text()) if config_path.exists() else {}
if "credential_methods" not in config:
config["credential_methods"] = {}
config["credential_methods"]["hubspot"] = {
"method": "aden", # or "direct" or "custom"
"configured_at": datetime.now().isoformat(),
}
config_path.write_text(json.dumps(config, indent=2))
```
### Step 6: Verify All Credentials
Use the `verify_credentials` MCP tool to confirm everything is properly configured:
```
verify_credentials(agent_path="exports/{agent_name}")
```
The tool returns:
```json
{
"agent": "exports/{agent_name}",
"ready": true,
"missing_credentials": [],
"warnings": [],
"errors": []
}
```
If `ready` is true, report success. If `missing_credentials` is non-empty, identify what failed and loop back to Step 3 for the remaining credentials.
## Health Check Reference
Health checks validate credentials by making lightweight API calls:
| Credential | Endpoint | What It Checks |
| --------------- | --------------------------------------- | --------------------------------- |
| `anthropic` | `POST /v1/messages` | API key validity |
| `brave_search` | `GET /res/v1/web/search?q=test&count=1` | API key validity |
| `google_search` | `GET /customsearch/v1?q=test&num=1` | API key + CSE ID validity |
| `github` | `GET /user` | Token validity, user identity |
| `hubspot` | `GET /crm/v3/objects/contacts?limit=1` | Bearer token validity, CRM scopes |
| `resend` | `GET /domains` | API key validity |
```python
from aden_tools.credentials import check_credential_health, HealthCheckResult
result: HealthCheckResult = check_credential_health("hubspot", token_value)
# result.valid: bool
# result.message: str
# result.details: dict (status_code, rate_limited, etc.)
```
## Encryption Key (HIVE_CREDENTIAL_KEY)
The local encrypted store requires `HIVE_CREDENTIAL_KEY` to encrypt/decrypt credentials.
- If the user doesn't have one, `EncryptedFileStorage` will auto-generate one and log it
- The user MUST persist this key (e.g., in `~/.bashrc`/`~/.zshrc` or a secrets manager)
- Without this key, stored credentials cannot be decrypted
**Shell config rule:** Only TWO keys belong in shell config (`~/.zshrc`/`~/.bashrc`):
- `HIVE_CREDENTIAL_KEY` — encryption key for the credential store
- `ADEN_API_KEY` — Aden platform auth key (needed before the store can sync)
All other API keys (Brave, Google, HubSpot, etc.) must go in the encrypted store only. **Never offer to add them to shell config.**
If `HIVE_CREDENTIAL_KEY` is not set:
1. Let the store generate one
2. Tell the user to save it: `export HIVE_CREDENTIAL_KEY="{generated_key}"`
3. Recommend adding it to `~/.bashrc` or their shell profile
## Security Rules
- **NEVER** log, print, or echo credential values in tool output
- **NEVER** store credentials in plaintext files, git-tracked files, or agent configs
- **NEVER** hardcode credentials in source code
- **NEVER** offer to save API keys to shell config (`~/.zshrc`/`~/.bashrc`) — the **only** keys that belong in shell config are `HIVE_CREDENTIAL_KEY` and `ADEN_API_KEY`. All other credentials (Brave, Google, HubSpot, GitHub, Resend, etc.) go in the encrypted store only.
- **ALWAYS** use `SecretStr` from Pydantic when handling credential values in Python
- **ALWAYS** use the local encrypted store (`~/.hive/credentials`) for persistence
- **ALWAYS** run health checks before storing credentials (when possible)
- **ALWAYS** verify credentials were stored by re-running validation, not by reading them back
- When modifying `~/.bashrc` or `~/.zshrc`, confirm with the user first
## Credential Sources Reference
All credential specs are defined in `tools/src/aden_tools/credentials/`:
| File | Category | Credentials | Aden Supported |
| ----------------- | ------------- | --------------------------------------------- | -------------- |
| `llm.py` | LLM Providers | `anthropic` | No |
| `search.py` | Search Tools | `brave_search`, `google_search`, `google_cse` | No |
| `email.py` | Email | `resend` | No |
| `integrations.py` | Integrations | `github`, `hubspot`, `google_calendar_oauth` | No / Yes |
**Note:** Additional LLM providers (Cerebras, Groq, OpenAI) are handled by LiteLLM via environment
variables (`CEREBRAS_API_KEY`, `GROQ_API_KEY`, `OPENAI_API_KEY`) but are not yet in CREDENTIAL_SPECS.
Add them to `llm.py` as needed.
To check what's registered:
```python
from aden_tools.credentials import CREDENTIAL_SPECS
for name, spec in CREDENTIAL_SPECS.items():
print(f"{name}: aden={spec.aden_supported}, direct={spec.direct_api_key_supported}")
```
## Migration: CredentialManager → CredentialStore
**CredentialManager is deprecated.** Use CredentialStore instead.
| Old (Deprecated) | New (Recommended) |
| ----------------------------------------- | -------------------------------------------------------------------- |
| `CredentialManager()` | `CredentialStore.with_encrypted_storage()` |
| `creds.get("hubspot")` | `store.get("hubspot")` or `store.get_key("hubspot", "access_token")` |
| `creds.validate_for_tools(tools)` | Use `store.is_available(cred_id)` per credential |
| `creds.get_auth_options("hubspot")` | Check `CREDENTIAL_SPECS["hubspot"].aden_supported` |
| `creds.get_setup_instructions("hubspot")` | Access `CREDENTIAL_SPECS["hubspot"]` directly |
**Why migrate?**
- **CredentialStore** supports encrypted storage, multi-key credentials, template resolution, and automatic token refresh
- **CredentialManager** only reads from environment variables and .env files (no encryption, no refresh)
- **CredentialStoreAdapter** exists for backward compatibility during migration
```python
# Old way (deprecated)
from aden_tools.credentials import CredentialManager
creds = CredentialManager()
token = creds.get("hubspot")
# New way (recommended)
from core.framework.credentials import CredentialStore
store = CredentialStore.with_encrypted_storage()
token = store.get("hubspot")
# With Aden sync (recommended for OAuth integrations)
store = CredentialStore.with_aden_sync()
token = store.get_key("hubspot", "access_token")
```
## Example Session
```
User: /hive-credentials for my research-agent
Agent: Let me check what credentials your research-agent needs.
[Calls check_missing_credentials(agent_path="exports/research-agent")]
→ Returns:
available: anthropic (encrypted_store), brave_search (encrypted_store)
missing: google_search (GOOGLE_API_KEY), google_cse (GOOGLE_CSE_ID)
ready: false
Agent: 2 of 4 required credentials are already configured. Only Google Custom
Search needs setup (2 values).
--- Setting up Google Custom Search (google_search + google_cse) ---
This requires two values that work together.
[Checks HIVE_CREDENTIAL_KEY before storing]
$ printenv HIVE_CREDENTIAL_KEY > /dev/null 2>&1 && echo "set" || echo "not set"
set
First, the Google API Key:
1. Go to https://console.cloud.google.com/apis/credentials
2. Create a new project (or select an existing one)
3. Enable the "Custom Search API" from the API Library
4. Go to Credentials > Create Credentials > API Key
5. Copy the generated API key
[AskUserQuestion: "Please provide your Google API key:"]
[User provides key]
Now, the Custom Search Engine ID:
1. Go to https://programmablesearchengine.google.com/controlpanel/all
2. Click "Add" to create a new search engine
3. Under "What to search", select "Search the entire web"
4. Give your search engine a name
5. Click "Create"
6. Copy the Search Engine ID (cx value)
[AskUserQuestion: "Please provide your Google CSE ID:"]
[User provides ID]
[Runs health check with both values - GET /customsearch/v1?q=test&num=1 → 200 OK]
[Stores both in local encrypted store, exports to env]
✓ Google Custom Search credentials valid
[Calls verify_credentials(agent_path="exports/research-agent")]
→ Returns: ready: true, missing_credentials: []
All credentials are now configured:
✓ anthropic (ANTHROPIC_API_KEY) — already in encrypted store
✓ brave_search (BRAVE_SEARCH_API_KEY) — already in encrypted store
✓ google_search (GOOGLE_API_KEY) — stored in encrypted store
✓ google_cse (GOOGLE_CSE_ID) — stored in encrypted store
┌─────────────────────────────────────────────────────────────────────────────┐
│ ✅ CREDENTIALS CONFIGURED │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ OPEN A NEW TERMINAL before running commands below. │
│ Environment variables were saved to your shell config but │
│ only take effect in new terminal sessions. │
│ │
│ NEXT STEPS: │
│ │
│ 1. RUN YOUR AGENT: │
│ │
│ hive tui │
│ │
│ 2. IF YOU ENCOUNTER ISSUES, USE THE DEBUGGER: │
│ │
│ /hive-debugger │
│ │
│ The debugger analyzes runtime logs, identifies retry loops, tool │
│ failures, stalled execution, and provides actionable fix suggestions. │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
```
File diff suppressed because it is too large Load Diff
-385
View File
@@ -1,385 +0,0 @@
---
name: hive-patterns
description: Best practices, patterns, and examples for building goal-driven agents. Includes client-facing interaction, feedback edges, judge patterns, fan-out/fan-in, context management, and anti-patterns.
license: Apache-2.0
metadata:
author: hive
version: "2.0"
type: reference
part_of: hive
---
# Building Agents - Patterns & Best Practices
Design patterns, examples, and best practices for building robust goal-driven agents.
**Prerequisites:** Complete agent structure using `hive-create`.
## Practical Example: Hybrid Workflow
How to build a node using both direct file writes and optional MCP validation:
```python
# 1. WRITE TO FILE FIRST (Primary - makes it visible)
node_code = '''
search_node = NodeSpec(
id="search-web",
node_type="event_loop",
input_keys=["query"],
output_keys=["search_results"],
system_prompt="Search the web for: {query}. Use web_search, then call set_output to store results.",
tools=["web_search"],
)
'''
Edit(
file_path="exports/research_agent/nodes/__init__.py",
old_string="# Nodes will be added here",
new_string=node_code
)
# 2. OPTIONALLY VALIDATE WITH MCP (Secondary - bookkeeping)
validation = mcp__agent-builder__test_node(
node_id="search-web",
test_input='{"query": "python tutorials"}',
mock_llm_response='{"search_results": [...mock results...]}'
)
```
**User experience:**
- Immediately sees node in their editor (from step 1)
- Gets validation feedback (from step 2)
- Can edit the file directly if needed
## Multi-Turn Interaction Patterns
For agents needing multi-turn conversations with users, use `client_facing=True` on event_loop nodes.
### Client-Facing Nodes
A client-facing node streams LLM output to the user and blocks for user input between conversational turns. This replaces the old pause/resume pattern.
```python
# Client-facing node with STEP 1/STEP 2 prompt pattern
intake_node = NodeSpec(
id="intake",
name="Intake",
description="Gather requirements from the user",
node_type="event_loop",
client_facing=True,
input_keys=["topic"],
output_keys=["research_brief"],
system_prompt="""\
You are an intake specialist.
**STEP 1 — Read and respond (text only, NO tool calls):**
1. Read the topic provided
2. If it's vague, ask 1-2 clarifying questions
3. If it's clear, confirm your understanding
**STEP 2 — After the user confirms, call set_output:**
- set_output("research_brief", "Clear description of what to research")
""",
)
# Internal node runs without user interaction
research_node = NodeSpec(
id="research",
name="Research",
description="Search and analyze sources",
node_type="event_loop",
input_keys=["research_brief"],
output_keys=["findings", "sources"],
system_prompt="Research the topic using web_search and web_scrape...",
tools=["web_search", "web_scrape", "load_data", "save_data"],
)
```
**How it works:**
- Client-facing nodes stream LLM text to the user and block for input after each response
- User input is injected via `node.inject_event(text)`
- When the LLM calls `set_output` to produce structured outputs, the judge evaluates and ACCEPTs
- Internal nodes (non-client-facing) run their entire loop without blocking
- `set_output` is a synthetic tool — a turn with only `set_output` calls (no real tools) triggers user input blocking
**STEP 1/STEP 2 pattern:** Always structure client-facing prompts with explicit phases. STEP 1 is text-only conversation. STEP 2 calls `set_output` after user confirmation. This prevents the LLM from calling `set_output` prematurely before the user responds.
### When to Use client_facing
| Scenario | client_facing | Why |
| ----------------------------------- | :-----------: | ---------------------- |
| Gathering user requirements | Yes | Need user input |
| Human review/approval checkpoint | Yes | Need human decision |
| Data processing (scanning, scoring) | No | Runs autonomously |
| Report generation | No | No user input needed |
| Final confirmation before action | Yes | Need explicit approval |
> **Legacy Note:** The `pause_nodes` / `entry_points` pattern still works for backward compatibility but `client_facing=True` is preferred for new agents.
## Edge-Based Routing and Feedback Loops
### Conditional Edge Routing
Multiple conditional edges from the same source replace the old `router` node type. Each edge checks a condition on the node's output.
```python
# Node with mutually exclusive outputs
review_node = NodeSpec(
id="review",
name="Review",
node_type="event_loop",
client_facing=True,
output_keys=["approved_contacts", "redo_extraction"],
nullable_output_keys=["approved_contacts", "redo_extraction"],
max_node_visits=3,
system_prompt="Present the contact list to the operator. If they approve, call set_output('approved_contacts', ...). If they want changes, call set_output('redo_extraction', 'true').",
)
# Forward edge (positive priority, evaluated first)
EdgeSpec(
id="review-to-campaign",
source="review",
target="campaign-builder",
condition=EdgeCondition.CONDITIONAL,
condition_expr="output.get('approved_contacts') is not None",
priority=1,
)
# Feedback edge (negative priority, evaluated after forward edges)
EdgeSpec(
id="review-feedback",
source="review",
target="extractor",
condition=EdgeCondition.CONDITIONAL,
condition_expr="output.get('redo_extraction') is not None",
priority=-1,
)
```
**Key concepts:**
- `nullable_output_keys`: Lists output keys that may remain unset. The node sets exactly one of the mutually exclusive keys per execution.
- `max_node_visits`: Must be >1 on the feedback target (extractor) so it can re-execute. Default is 1.
- `priority`: Positive = forward edge (evaluated first). Negative = feedback edge. The executor tries forward edges first; if none match, falls back to feedback edges.
### Routing Decision Table
| Pattern | Old Approach | New Approach |
| ---------------------- | ----------------------- | --------------------------------------------- |
| Conditional branching | `router` node | Conditional edges with `condition_expr` |
| Binary approve/reject | `pause_nodes` + resume | `client_facing=True` + `nullable_output_keys` |
| Loop-back on rejection | Manual entry_points | Feedback edge with `priority=-1` |
| Multi-way routing | Router with routes dict | Multiple conditional edges with priorities |
## Judge Patterns
**Core Principle: The judge is the SOLE mechanism for acceptance decisions.** Never add ad-hoc framework gating to compensate for LLM behavior. If the LLM calls `set_output` prematurely, fix the system prompt or use a custom judge. Anti-patterns to avoid:
- Output rollback logic
- `_user_has_responded` flags
- Premature set_output rejection
- Interaction protocol injection into system prompts
Judges control when an event_loop node's loop exits. Choose based on validation needs.
### Implicit Judge (Default)
When no judge is configured, the implicit judge ACCEPTs when:
- The LLM finishes its response with no tool calls
- All required output keys have been set via `set_output`
Best for simple nodes where "all outputs set" is sufficient validation.
### SchemaJudge
Validates outputs against a Pydantic model. Use when you need structural validation.
```python
from pydantic import BaseModel
class ScannerOutput(BaseModel):
github_users: list[dict] # Must be a list of user objects
class SchemaJudge:
def __init__(self, output_model: type[BaseModel]):
self._model = output_model
async def evaluate(self, context: dict) -> JudgeVerdict:
missing = context.get("missing_keys", [])
if missing:
return JudgeVerdict(
action="RETRY",
feedback=f"Missing output keys: {missing}. Use set_output to provide them.",
)
try:
self._model.model_validate(context["output_accumulator"])
return JudgeVerdict(action="ACCEPT")
except ValidationError as e:
return JudgeVerdict(action="RETRY", feedback=str(e))
```
### When to Use Which Judge
| Judge | Use When | Example |
| --------------- | ------------------------------------- | ---------------------- |
| Implicit (None) | Output keys are sufficient validation | Simple data extraction |
| SchemaJudge | Need structural validation of outputs | API response parsing |
| Custom | Domain-specific validation logic | Score must be 0.0-1.0 |
## Fan-Out / Fan-In (Parallel Execution)
Multiple ON_SUCCESS edges from the same source trigger parallel execution. All branches run concurrently via `asyncio.gather()`.
```python
# Scanner fans out to Profiler and Scorer in parallel
EdgeSpec(id="scanner-to-profiler", source="scanner", target="profiler",
condition=EdgeCondition.ON_SUCCESS)
EdgeSpec(id="scanner-to-scorer", source="scanner", target="scorer",
condition=EdgeCondition.ON_SUCCESS)
# Both fan in to Extractor
EdgeSpec(id="profiler-to-extractor", source="profiler", target="extractor",
condition=EdgeCondition.ON_SUCCESS)
EdgeSpec(id="scorer-to-extractor", source="scorer", target="extractor",
condition=EdgeCondition.ON_SUCCESS)
```
**Requirements:**
- Parallel event_loop nodes must have **disjoint output_keys** (no key written by both)
- Only one parallel branch may contain a `client_facing` node
- Fan-in node receives outputs from all completed branches in shared memory
## Context Management Patterns
### Tiered Compaction
EventLoopNode automatically manages context window usage with tiered compaction:
1. **Pruning** — Old tool results replaced with compact placeholders (zero-cost, no LLM call)
2. **Normal compaction** — LLM summarizes older messages
3. **Aggressive compaction** — Keeps only recent messages + summary
4. **Emergency** — Hard reset with tool history preservation
### Spillover Pattern
The framework automatically truncates large tool results and saves full content to a spillover directory. The LLM receives a truncation message with instructions to use `load_data` to read the full result.
For explicit data management, use the data tools (real MCP tools, not synthetic):
```python
# save_data, load_data, list_data_files, serve_file_to_user are real MCP tools
# data_dir is auto-injected by the framework — the LLM never sees it
# Saving large results
save_data(filename="sources.json", data=large_json_string)
# Reading with pagination (line-based offset/limit)
load_data(filename="sources.json", offset=0, limit=50)
# Listing available files
list_data_files()
# Serving a file to the user as a clickable link
serve_file_to_user(filename="report.html", label="Research Report")
```
Add data tools to nodes that handle large tool results:
```python
research_node = NodeSpec(
...
tools=["web_search", "web_scrape", "load_data", "save_data", "list_data_files"],
)
```
`data_dir` is a framework context parameter — auto-injected at call time. `GraphExecutor.execute()` sets it per-execution via `ToolRegistry.set_execution_context(data_dir=...)` (using `contextvars` for concurrency safety), ensuring it matches the session-scoped spillover directory.
## Anti-Patterns
### What NOT to Do
- **Don't rely on `export_graph`** — Write files immediately, not at end
- **Don't hide code in session** — Write to files as components are approved
- **Don't wait to write files** — Agent visible from first step
- **Don't batch everything** — Write incrementally, one component at a time
- **Don't create too many thin nodes** — Prefer fewer, richer nodes (see below)
- **Don't add framework gating for LLM behavior** — Fix prompts or use judges instead
### Fewer, Richer Nodes
A common mistake is splitting work into too many small single-purpose nodes. Each node boundary requires serializing outputs, losing in-context information, and adding edge complexity.
| Bad (8 thin nodes) | Good (4 rich nodes) |
| ------------------- | ----------------------------------- |
| parse-query | intake (client-facing) |
| search-sources | research (search + fetch + analyze) |
| fetch-content | review (client-facing) |
| evaluate-sources | report (write + deliver) |
| synthesize-findings | |
| write-report | |
| quality-check | |
| save-report | |
**Why fewer nodes are better:**
- The LLM retains full context of its work within a single node
- A research node that searches, fetches, and analyzes keeps all source material in its conversation history
- Fewer edges means simpler graph and fewer failure points
- Data tools (`save_data`/`load_data`) handle context window limits within a single node
### MCP Tools - Correct Usage
**MCP tools OK for:**
- `test_node` — Validate node configuration with mock inputs
- `validate_graph` — Check graph structure
- `configure_loop` — Set event loop parameters
- `create_session` — Track session state for bookkeeping
**Just don't:** Use MCP as the primary construction method or rely on export_graph
## Error Handling Patterns
### Graceful Failure with Fallback
```python
edges = [
# Success path
EdgeSpec(id="api-success", source="api-call", target="process-results",
condition=EdgeCondition.ON_SUCCESS),
# Fallback on failure
EdgeSpec(id="api-to-fallback", source="api-call", target="fallback-cache",
condition=EdgeCondition.ON_FAILURE, priority=1),
# Report if fallback also fails
EdgeSpec(id="fallback-to-error", source="fallback-cache", target="report-error",
condition=EdgeCondition.ON_FAILURE, priority=1),
]
```
## Handoff to Testing
When agent is complete, transition to testing phase:
### Pre-Testing Checklist
- [ ] Agent structure validates: `uv run python -m agent_name validate`
- [ ] All nodes defined in nodes/**init**.py
- [ ] All edges connect valid nodes with correct priorities
- [ ] Feedback edge targets have `max_node_visits > 1`
- [ ] Client-facing nodes have meaningful system prompts
- [ ] Agent can be imported: `from exports.agent_name import default_agent`
## Related Skills
- **hive-concepts** — Fundamental concepts (node types, edges, event loop architecture)
- **hive-create** — Step-by-step building process
- **hive-test** — Test and validate agents
- **hive** — Complete workflow orchestrator
---
**Remember: Agent is actively constructed, visible the whole time. No hidden state. No surprise exports. Just transparent, incremental file building.**
-940
View File
@@ -1,940 +0,0 @@
---
name: hive-test
description: Iterative agent testing with session recovery. Execute, analyze, fix, resume from checkpoints. Use when testing an agent, debugging test failures, or verifying fixes without re-running from scratch.
---
# Agent Testing
Test agents iteratively: execute, analyze failures, fix, resume from checkpoint, repeat.
## When to Use
- Testing a newly built agent against its goal
- Debugging a failing agent iteratively
- Verifying fixes without re-running expensive early nodes
- Running final regression tests before deployment
## Prerequisites
1. Agent package at `exports/{agent_name}/` (built with `/hive-create`)
2. Credentials configured (`/hive-credentials`)
3. `ANTHROPIC_API_KEY` set (or appropriate LLM provider key)
**Path distinction** (critical — don't confuse these):
- `exports/{agent_name}/` — agent source code (edit here)
- `~/.hive/agents/{agent_name}/` — runtime data: sessions, checkpoints, logs (read here)
---
## The Iterative Test Loop
This is the core workflow. Don't re-run the entire agent when a late node fails — analyze, fix, and resume from the last clean checkpoint.
```
┌──────────────────────────────────────┐
│ PHASE 1: Generate Test Scenarios │
│ Goal → synthetic test inputs + tests │
└──────────────┬───────────────────────┘
┌──────────────────────────────────────┐
│ PHASE 2: Execute │◄────────────────┐
│ Run agent (CLI or pytest) │ │
└──────────────┬───────────────────────┘ │
↓ │
Pass? ──yes──► PHASE 6: Final Verification │
│ │
no │
↓ │
┌──────────────────────────────────────┐ │
│ PHASE 3: Analyze │ │
│ Session + runtime logs + checkpoints │ │
└──────────────┬───────────────────────┘ │
↓ │
┌──────────────────────────────────────┐ │
│ PHASE 4: Fix │ │
│ Prompt / code / graph / goal │ │
└──────────────┬───────────────────────┘ │
↓ │
┌──────────────────────────────────────┐ │
│ PHASE 5: Recover & Resume │─────────────────┘
│ Checkpoint resume OR fresh re-run │
└──────────────────────────────────────┘
```
---
### Phase 1: Generate Test Scenarios
Create synthetic tests from the agent's goal, constraints, and success criteria.
#### Step 1a: Read the goal
```python
# Read goal from agent.py
Read(file_path="exports/{agent_name}/agent.py")
# Extract the Goal definition and convert to JSON string
```
#### Step 1b: Get test guidelines
```python
# Get constraint test guidelines
generate_constraint_tests(
goal_id="your-goal-id",
goal_json='{"id": "...", "constraints": [...]}',
agent_path="exports/{agent_name}"
)
# Get success criteria test guidelines
generate_success_tests(
goal_id="your-goal-id",
goal_json='{"id": "...", "success_criteria": [...]}',
node_names="intake,research,review,report",
tool_names="web_search,web_scrape",
agent_path="exports/{agent_name}"
)
```
These return `file_header`, `test_template`, `constraints_formatted`/`success_criteria_formatted`, and `test_guidelines`. They do NOT generate test code — you write the tests.
#### Step 1c: Write tests
```python
Write(
file_path=result["output_file"],
content=result["file_header"] + "\n\n" + your_test_code
)
```
#### Test writing rules
- Every test MUST be `async` with `@pytest.mark.asyncio`
- Every test MUST accept `runner, auto_responder, mock_mode` fixtures
- Use `await auto_responder.start()` before running, `await auto_responder.stop()` in `finally`
- Use `await runner.run(input_dict)` — this goes through AgentRunner → AgentRuntime → ExecutionStream
- Access output via `result.output.get("key")` — NEVER `result.output["key"]`
- `result.success=True` means no exception, NOT goal achieved — always check output
- Write 8-15 tests total, not 30+
- Each real test costs ~3 seconds + LLM tokens
- NEVER use `default_agent.run()` — it bypasses the runtime (no sessions, no logs, client-facing nodes hang)
#### Step 1d: Check existing tests
Before generating, check if tests already exist:
```python
list_tests(
goal_id="your-goal-id",
agent_path="exports/{agent_name}"
)
```
---
### Phase 2: Execute
Two execution paths, use the right one for your situation.
#### Iterative debugging (for complex agents)
Run the agent via CLI. This creates sessions with checkpoints at `~/.hive/agents/{agent_name}/sessions/`:
```bash
uv run hive run exports/{agent_name} --input '{"query": "test topic"}'
```
Sessions and checkpoints are saved automatically.
**Client-facing nodes**: Agents with `client_facing=True` nodes (interactive conversation) work in headless mode when run from a real terminal — the agent streams output to stdout and reads user input from stdin via a `>>> ` prompt. In non-interactive shells (like Claude Code's Bash tool), client-facing nodes will hang because there is no stdin. For testing interactive agents from Claude Code, use `run_tests` with mock mode or have the user run the agent manually in their terminal.
#### Automated regression (for CI or final verification)
Use the `run_tests` MCP tool to run all pytest tests:
```python
run_tests(
goal_id="your-goal-id",
agent_path="exports/{agent_name}"
)
```
Returns structured results:
```json
{
"overall_passed": false,
"summary": {"total": 12, "passed": 10, "failed": 2, "pass_rate": "83.3%"},
"test_results": [{"test_name": "test_success_source_diversity", "status": "failed"}],
"failures": [{"test_name": "test_success_source_diversity", "details": "..."}]
}
```
**Options:**
```python
# Run only constraint tests
run_tests(goal_id, agent_path, test_types='["constraint"]')
# Stop on first failure
run_tests(goal_id, agent_path, fail_fast=True)
# Parallel execution
run_tests(goal_id, agent_path, parallel=4)
```
**Note:** `run_tests` uses `AgentRunner` with `tmp_path` storage, so sessions are isolated per test run. For checkpoint-based recovery with persistent sessions, use CLI execution. Use `run_tests` for quick regression checks and final verification.
---
### Phase 3: Analyze Failures
When a test fails, drill down systematically. Don't guess — use the tools.
#### Step 3a: Get error category
```python
debug_test(
goal_id="your-goal-id",
test_name="test_success_source_diversity",
agent_path="exports/{agent_name}"
)
```
Returns error category (`IMPLEMENTATION_ERROR`, `ASSERTION_FAILURE`, `TIMEOUT`, `IMPORT_ERROR`, `API_ERROR`) plus full traceback and suggestions.
#### Step 3b: Find the failed session
```python
list_agent_sessions(
agent_work_dir="~/.hive/agents/{agent_name}",
status="failed",
limit=5
)
```
Returns session list with IDs, timestamps, current_node (where it failed), execution_quality.
#### Step 3c: Inspect session state
```python
get_agent_session_state(
agent_work_dir="~/.hive/agents/{agent_name}",
session_id="session_20260209_143022_abc12345"
)
```
Returns execution path, which node was current, step count, timestamps — but excludes memory values (to avoid context bloat). Shows `memory_keys` and `memory_size` instead.
#### Step 3d: Examine runtime logs (L2/L3)
```python
# L2: Per-node success/failure, retry counts
query_runtime_log_details(
agent_work_dir="~/.hive/agents/{agent_name}",
run_id="session_20260209_143022_abc12345",
needs_attention_only=True
)
# L3: Exact LLM responses, tool call inputs/outputs
query_runtime_log_raw(
agent_work_dir="~/.hive/agents/{agent_name}",
run_id="session_20260209_143022_abc12345",
node_id="research"
)
```
#### Step 3e: Inspect memory data
```python
# See what data a node actually produced
get_agent_session_memory(
agent_work_dir="~/.hive/agents/{agent_name}",
session_id="session_20260209_143022_abc12345",
key="research_results"
)
```
#### Step 3f: Find recovery points
```python
list_agent_checkpoints(
agent_work_dir="~/.hive/agents/{agent_name}",
session_id="session_20260209_143022_abc12345",
is_clean="true"
)
```
Returns checkpoint summaries with IDs, types (`node_start`, `node_complete`), which node, and `is_clean` flag. Clean checkpoints are safe resume points.
#### Step 3g: Compare checkpoints (optional)
To understand what changed between two points in execution:
```python
compare_agent_checkpoints(
agent_work_dir="~/.hive/agents/{agent_name}",
session_id="session_20260209_143022_abc12345",
checkpoint_id_before="cp_node_complete_research_143030",
checkpoint_id_after="cp_node_complete_review_143115"
)
```
Returns memory diff (added/removed/changed keys) and execution path diff.
---
### Phase 4: Fix Based on Root Cause
Use the analysis from Phase 3 to determine what to fix and where.
| Root Cause | What to Fix | Where to Edit |
|------------|------------|---------------|
| **Prompt issue** — LLM produces wrong output format, misses instructions | Node `system_prompt` | `exports/{agent}/nodes/__init__.py` |
| **Code bug** — TypeError, KeyError, logic error in Python | Agent code | `exports/{agent}/agent.py`, `nodes/__init__.py` |
| **Graph issue** — wrong routing, missing edge, bad condition_expr | Edges, node config | `exports/{agent}/agent.py` |
| **Tool issue** — MCP tool fails, wrong config, missing credential | Tool config | `exports/{agent}/mcp_servers.json`, `/hive-credentials` |
| **Goal issue** — success criteria too strict/vague, wrong constraints | Goal definition | `exports/{agent}/agent.py` (goal section) |
| **Test issue** — test expectations don't match actual agent behavior | Test code | `exports/{agent}/tests/test_*.py` |
#### Fix strategies by error category
**IMPLEMENTATION_ERROR** (TypeError, AttributeError, KeyError):
```python
# Read the failing code
Read(file_path="exports/{agent_name}/nodes/__init__.py")
# Fix the bug
Edit(
file_path="exports/{agent_name}/nodes/__init__.py",
old_string="results.get('videos')",
new_string="(results or {}).get('videos', [])"
)
```
**ASSERTION_FAILURE** (test assertions fail but agent ran successfully):
- Check if the agent's output is actually wrong → fix the prompt
- Check if the test's expectations are unrealistic → fix the test
- Use `get_agent_session_memory` to see what the agent actually produced
**TIMEOUT / STALL** (agent runs too long):
- Check `node_visit_counts` for feedback loops hitting max_node_visits
- Check L3 logs for tool calls that hang
- Reduce `max_iterations` in loop_config or fix the prompt to converge faster
**API_ERROR** (connection, rate limit, auth):
- Verify credentials with `/hive-credentials`
- Check MCP server configuration
---
### Phase 5: Recover & Resume
After fixing the agent, decide whether to resume or re-run.
#### When to resume from checkpoint
Resume when ALL of these are true:
- The fix is to a node that comes AFTER existing clean checkpoints
- Clean checkpoints exist (from a CLI execution with checkpointing)
- The early nodes are expensive (web scraping, API calls, long LLM chains)
```bash
# Resume from the last clean checkpoint before the failing node
uv run hive run exports/{agent_name} \
--resume-session session_20260209_143022_abc12345 \
--checkpoint cp_node_complete_research_143030
```
This skips all nodes before the checkpoint and only re-runs the fixed node onward.
#### When to re-run from scratch
Re-run when ANY of these are true:
- The fix is to the entry node or an early node
- No checkpoints exist (e.g., agent was run via `run_tests`)
- The agent is fast (2-3 nodes, completes in seconds)
- You changed the graph structure (added/removed nodes/edges)
```bash
uv run hive run exports/{agent_name} --input '{"query": "test topic"}'
```
#### Inspecting a checkpoint before resuming
```python
get_agent_checkpoint(
agent_work_dir="~/.hive/agents/{agent_name}",
session_id="session_20260209_143022_abc12345",
checkpoint_id="cp_node_complete_research_143030"
)
```
Returns the full checkpoint: shared_memory snapshot, execution_path, current_node, next_node, is_clean.
#### Loop back to Phase 2
After resuming or re-running, check if the fix worked. If not, go back to Phase 3.
---
### Phase 6: Final Verification
Once the iterative fix loop converges (the agent produces correct output), run the full automated test suite:
```python
run_tests(
goal_id="your-goal-id",
agent_path="exports/{agent_name}"
)
```
All tests should pass. If not, repeat the loop for remaining failures.
---
## Credential Requirements
**CRITICAL: Testing requires ALL credentials the agent depends on.** This includes both the LLM API key AND any tool-specific credentials (HubSpot, Brave Search, etc.).
### Prerequisites
Before running agent tests, you MUST collect ALL required credentials from the user.
**Step 1: LLM API Key (always required)**
```bash
export ANTHROPIC_API_KEY="your-key-here"
```
**Step 2: Tool-specific credentials (depends on agent's tools)**
Inspect the agent's `mcp_servers.json` and tool configuration to determine which tools the agent uses, then check for all required credentials:
```python
from aden_tools.credentials import CredentialManager, CREDENTIAL_SPECS
creds = CredentialManager()
# Determine which tools the agent uses (from agent.json or mcp_servers.json)
agent_tools = [...] # e.g., ["hubspot_search_contacts", "web_search", ...]
# Find all missing credentials for those tools
missing = creds.get_missing_for_tools(agent_tools)
```
Common tool credentials:
| Tool | Env Var | Help URL |
|------|---------|----------|
| HubSpot CRM | `HUBSPOT_ACCESS_TOKEN` | https://developers.hubspot.com/docs/api/private-apps |
| Brave Search | `BRAVE_SEARCH_API_KEY` | https://brave.com/search/api/ |
| Google Search | `GOOGLE_SEARCH_API_KEY` + `GOOGLE_SEARCH_CX` | https://developers.google.com/custom-search |
**Why ALL credentials are required:**
- Tests need to execute the agent's LLM nodes to validate behavior
- Tools with missing credentials will return error dicts instead of real data
- Mock mode bypasses everything, providing no confidence in real-world performance
### Mock Mode Limitations
Mock mode (`--mock` flag or `MOCK_MODE=1`) is **ONLY for structure validation**:
- Validates graph structure (nodes, edges, connections)
- Validates that `AgentRunner.load()` succeeds and the agent is importable
- Does NOT execute event_loop agents — MockLLMProvider never calls `set_output`, so event_loop nodes loop forever
- Does NOT test LLM reasoning, content quality, or constraint validation
- Does NOT test real API integrations or tool use
**Bottom line:** If you're testing whether an agent achieves its goal, you MUST use real credentials.
### Enforcing Credentials in Tests
When writing tests, **ALWAYS include credential checks**:
```python
import os
import pytest
from aden_tools.credentials import CredentialManager
pytestmark = pytest.mark.skipif(
not CredentialManager().is_available("anthropic") and not os.environ.get("MOCK_MODE"),
reason="API key required for real testing. Set ANTHROPIC_API_KEY or use MOCK_MODE=1."
)
@pytest.fixture(scope="session", autouse=True)
def check_credentials():
"""Ensure ALL required credentials are set for real testing."""
creds = CredentialManager()
mock_mode = os.environ.get("MOCK_MODE")
if not creds.is_available("anthropic"):
if mock_mode:
print("\nRunning in MOCK MODE - structure validation only")
else:
pytest.fail(
"\nANTHROPIC_API_KEY not set!\n"
"Set API key: export ANTHROPIC_API_KEY='your-key-here'\n"
"Or run structure validation: MOCK_MODE=1 pytest exports/{agent}/tests/"
)
if not mock_mode:
agent_tools = [] # Update per agent
missing = creds.get_missing_for_tools(agent_tools)
if missing:
lines = ["\nMissing tool credentials!"]
for name in missing:
spec = creds.specs.get(name)
if spec:
lines.append(f" {spec.env_var} - {spec.description}")
pytest.fail("\n".join(lines))
```
### User Communication
When the user asks to test an agent, **ALWAYS check for ALL credentials first**:
1. **Identify the agent's tools** from `mcp_servers.json`
2. **Check ALL required credentials** using `CredentialManager`
3. **Ask the user to provide any missing credentials** before proceeding
4. Collect ALL missing credentials in a single prompt — not one at a time
---
## Safe Test Patterns
### OutputCleaner
The framework automatically validates and cleans node outputs using a fast LLM at edge traversal time. Tests should still use safe patterns because OutputCleaner may not catch all issues.
### Safe Access (REQUIRED)
```python
# UNSAFE - will crash on missing keys
approval = result.output["approval_decision"]
category = result.output["analysis"]["category"]
# SAFE - use .get() with defaults
output = result.output or {}
approval = output.get("approval_decision", "UNKNOWN")
# SAFE - type check before operations
analysis = output.get("analysis", {})
if isinstance(analysis, dict):
category = analysis.get("category", "unknown")
# SAFE - handle JSON parsing trap (LLM response as string)
import json
recommendation = output.get("recommendation", "{}")
if isinstance(recommendation, str):
try:
parsed = json.loads(recommendation)
if isinstance(parsed, dict):
approval = parsed.get("approval_decision", "UNKNOWN")
except json.JSONDecodeError:
approval = "UNKNOWN"
elif isinstance(recommendation, dict):
approval = recommendation.get("approval_decision", "UNKNOWN")
# SAFE - type check before iteration
items = output.get("items", [])
if isinstance(items, list):
for item in items:
...
```
### Helper Functions for conftest.py
```python
import json
import re
def _parse_json_from_output(result, key):
"""Parse JSON from agent output (framework may store full LLM response as string)."""
response_text = result.output.get(key, "")
json_text = re.sub(r'```json\s*|\s*```', '', response_text).strip()
try:
return json.loads(json_text)
except (json.JSONDecodeError, AttributeError, TypeError):
return result.output.get(key)
def safe_get_nested(result, key_path, default=None):
"""Safely get nested value from result.output."""
output = result.output or {}
current = output
for key in key_path:
if isinstance(current, dict):
current = current.get(key)
elif isinstance(current, str):
try:
json_text = re.sub(r'```json\s*|\s*```', '', current).strip()
parsed = json.loads(json_text)
if isinstance(parsed, dict):
current = parsed.get(key)
else:
return default
except json.JSONDecodeError:
return default
else:
return default
return current if current is not None else default
# Make available in tests
pytest.parse_json_from_output = _parse_json_from_output
pytest.safe_get_nested = safe_get_nested
```
### ExecutionResult Fields
**`result.success=True` means NO exception, NOT goal achieved**
```python
# WRONG
assert result.success
# RIGHT
assert result.success, f"Agent failed: {result.error}"
output = result.output or {}
approval = output.get("approval_decision")
assert approval == "APPROVED", f"Expected APPROVED, got {approval}"
```
All fields:
- `success: bool` — Completed without exception (NOT goal achieved!)
- `output: dict` — Complete memory snapshot (may contain raw strings)
- `error: str | None` — Error message if failed
- `steps_executed: int` — Number of nodes executed
- `total_tokens: int` — Cumulative token usage
- `total_latency_ms: int` — Total execution time
- `path: list[str]` — Node IDs traversed (may repeat in feedback loops)
- `paused_at: str | None` — Node ID if paused
- `session_state: dict` — State for resuming
- `node_visit_counts: dict[str, int]` — Visit counts per node (feedback loop testing)
- `execution_quality: str` — "clean", "degraded", or "failed"
### Test Count Guidance
**Write 8-15 tests, not 30+**
- 2-3 tests per success criterion
- 1 happy path test
- 1 boundary/edge case test
- 1 error handling test (optional)
Each real test costs ~3 seconds + LLM tokens. 12 tests = ~36 seconds, $0.12.
---
## Test Patterns
### Happy Path
```python
@pytest.mark.asyncio
async def test_happy_path(runner, auto_responder, mock_mode):
"""Test normal successful execution."""
await auto_responder.start()
try:
result = await runner.run({"query": "python tutorials"})
finally:
await auto_responder.stop()
assert result.success, f"Agent failed: {result.error}"
output = result.output or {}
assert output.get("report"), "No report produced"
```
### Boundary Condition
```python
@pytest.mark.asyncio
async def test_minimum_sources(runner, auto_responder, mock_mode):
"""Test at minimum source threshold."""
await auto_responder.start()
try:
result = await runner.run({"query": "niche topic"})
finally:
await auto_responder.stop()
assert result.success, f"Agent failed: {result.error}"
output = result.output or {}
sources = output.get("sources", [])
if isinstance(sources, list):
assert len(sources) >= 3, f"Expected >= 3 sources, got {len(sources)}"
```
### Error Handling
```python
@pytest.mark.asyncio
async def test_empty_input(runner, auto_responder, mock_mode):
"""Test graceful handling of empty input."""
await auto_responder.start()
try:
result = await runner.run({"query": ""})
finally:
await auto_responder.stop()
# Agent should either fail gracefully or produce an error message
output = result.output or {}
assert not result.success or output.get("error"), "Should handle empty input"
```
### Feedback Loop
```python
@pytest.mark.asyncio
async def test_feedback_loop_terminates(runner, auto_responder, mock_mode):
"""Test that feedback loops don't run forever."""
await auto_responder.start()
try:
result = await runner.run({"query": "test"})
finally:
await auto_responder.stop()
visits = result.node_visit_counts or {}
for node_id, count in visits.items():
assert count <= 5, f"Node {node_id} visited {count} times — possible infinite loop"
```
---
## MCP Tool Reference
### Phase 1: Test Generation
```python
# Check existing tests
list_tests(goal_id, agent_path)
# Get constraint test guidelines (returns templates, NOT generated tests)
generate_constraint_tests(goal_id, goal_json, agent_path)
# Returns: output_file, file_header, test_template, constraints_formatted, test_guidelines
# Get success criteria test guidelines
generate_success_tests(goal_id, goal_json, node_names, tool_names, agent_path)
# Returns: output_file, file_header, test_template, success_criteria_formatted, test_guidelines
```
### Phase 2: Execution
```python
# Automated regression (no checkpoints, fresh runs)
run_tests(goal_id, agent_path, test_types='["all"]', parallel=-1, fail_fast=False)
# Run only specific test types
run_tests(goal_id, agent_path, test_types='["constraint"]')
run_tests(goal_id, agent_path, test_types='["success"]')
```
```bash
# Iterative debugging with checkpoints (via CLI)
uv run hive run exports/{agent_name} --input '{"query": "test"}'
```
### Phase 3: Analysis
```python
# Debug a specific failed test
debug_test(goal_id, test_name, agent_path)
# Find failed sessions
list_agent_sessions(agent_work_dir, status="failed", limit=5)
# Inspect session state (excludes memory values)
get_agent_session_state(agent_work_dir, session_id)
# Inspect memory data
get_agent_session_memory(agent_work_dir, session_id, key="research_results")
# Runtime logs: L1 summaries
query_runtime_logs(agent_work_dir, status="needs_attention")
# Runtime logs: L2 per-node details
query_runtime_log_details(agent_work_dir, run_id, needs_attention_only=True)
# Runtime logs: L3 tool/LLM raw data
query_runtime_log_raw(agent_work_dir, run_id, node_id="research")
# Find clean checkpoints
list_agent_checkpoints(agent_work_dir, session_id, is_clean="true")
# Compare checkpoints (memory diff)
compare_agent_checkpoints(agent_work_dir, session_id, cp_before, cp_after)
```
### Phase 5: Recovery
```python
# Inspect checkpoint before resuming
get_agent_checkpoint(agent_work_dir, session_id, checkpoint_id)
# Empty checkpoint_id = latest checkpoint
```
```bash
# Resume from checkpoint via CLI (headless)
uv run hive run exports/{agent_name} \
--resume-session {session_id} --checkpoint {checkpoint_id}
```
---
## Anti-Patterns
| Don't | Do Instead |
|-------|-----------|
| Use `default_agent.run()` in tests | Use `runner.run()` with `auto_responder` fixtures (goes through AgentRuntime) |
| Re-run entire agent when a late node fails | Resume from last clean checkpoint |
| Treat `result.success` as goal achieved | Check `result.output` for actual criteria |
| Access `result.output["key"]` directly | Use `result.output.get("key")` |
| Fix random things hoping tests pass | Analyze L2/L3 logs to find root cause first |
| Write 30+ tests | Write 8-15 focused tests |
| Skip credential check | Use `/hive-credentials` before testing |
| Confuse `exports/` with `~/.hive/agents/` | Code in `exports/`, runtime data in `~/.hive/` |
| Use `run_tests` for iterative debugging | Use headless CLI with checkpoints for iterative debugging |
| Use headless CLI for final regression | Use `run_tests` for automated regression |
| Use `--tui` from Claude Code | Use headless `run` command — TUI hangs in non-interactive shells |
| Test client-facing nodes from Claude Code | Use mock mode, or have the user run the agent in their terminal |
| Run tests without reading goal first | Always understand the goal before writing tests |
| Skip Phase 3 analysis and guess | Use session + log tools to identify root cause |
---
## Example Walkthrough: Deep Research Agent
A complete iteration showing the test loop for an agent with nodes: `intake → research → review → report`.
### Phase 1: Generate tests
```python
# Read the goal
Read(file_path="exports/deep_research_agent/agent.py")
# Get success criteria test guidelines
result = generate_success_tests(
goal_id="rigorous-interactive-research",
goal_json='{"id": "rigorous-interactive-research", "success_criteria": [{"id": "source-diversity", "target": ">=5"}, {"id": "citation-coverage", "target": "100%"}, {"id": "report-completeness", "target": "90%"}]}',
node_names="intake,research,review,report",
tool_names="web_search,web_scrape",
agent_path="exports/deep_research_agent"
)
# Write tests
Write(
file_path=result["output_file"],
content=result["file_header"] + "\n\n" + test_code
)
```
### Phase 2: First execution
```python
run_tests(
goal_id="rigorous-interactive-research",
agent_path="exports/deep_research_agent",
fail_fast=True
)
```
Result: `test_success_source_diversity` fails — agent only found 2 sources instead of 5.
### Phase 3: Analyze
```python
# Debug the failing test
debug_test(
goal_id="rigorous-interactive-research",
test_name="test_success_source_diversity",
agent_path="exports/deep_research_agent"
)
# → ASSERTION_FAILURE: Expected >= 5 sources, got 2
# Find the session
list_agent_sessions(
agent_work_dir="~/.hive/agents/deep_research_agent",
status="completed",
limit=1
)
# → session_20260209_150000_abc12345
# See what the research node produced
get_agent_session_memory(
agent_work_dir="~/.hive/agents/deep_research_agent",
session_id="session_20260209_150000_abc12345",
key="research_results"
)
# → Only 2 web_search calls made, each returned 1 source
# Check the LLM's behavior in the research node
query_runtime_log_raw(
agent_work_dir="~/.hive/agents/deep_research_agent",
run_id="session_20260209_150000_abc12345",
node_id="research"
)
# → LLM called web_search only twice, then called set_output
```
Root cause: The research node's prompt doesn't tell the LLM to search for at least 5 diverse sources. It stops after the first couple of searches.
### Phase 4: Fix the prompt
```python
Read(file_path="exports/deep_research_agent/nodes/__init__.py")
Edit(
file_path="exports/deep_research_agent/nodes/__init__.py",
old_string='system_prompt="Search for information on the user\'s topic."',
new_string='system_prompt="Search for information on the user\'s topic. You MUST find at least 5 diverse, authoritative sources. Use multiple different search queries to ensure source diversity. Do not stop searching until you have at least 5 distinct sources."'
)
```
### Phase 5: Resume from checkpoint
For this example, the fix is to the `research` node. If we had run via CLI with checkpointing, we could resume from the checkpoint after `intake` to skip re-running intake:
```bash
# Check if clean checkpoint exists after intake
list_agent_checkpoints(
agent_work_dir="~/.hive/agents/deep_research_agent",
session_id="session_20260209_150000_abc12345",
is_clean="true"
)
# → cp_node_complete_intake_150005
# Resume from after intake, re-run research with fixed prompt
uv run hive run exports/deep_research_agent \
--resume-session session_20260209_150000_abc12345 \
--checkpoint cp_node_complete_intake_150005
```
Or for this simple case (intake is fast), just re-run:
```bash
uv run hive run exports/deep_research_agent --input '{"topic": "test"}'
```
### Phase 6: Final verification
```python
run_tests(
goal_id="rigorous-interactive-research",
agent_path="exports/deep_research_agent"
)
# → All 12 tests pass
```
---
## Test File Structure
```
exports/{agent_name}/
├── agent.py ← Agent to test (goal, nodes, edges)
├── nodes/__init__.py ← Node implementations (prompts, config)
├── config.py ← Agent configuration
├── mcp_servers.json ← Tool server config
└── tests/
├── conftest.py ← Shared fixtures + safe access helpers
├── test_constraints.py ← Constraint tests
├── test_success_criteria.py ← Success criteria tests
└── test_edge_cases.py ← Edge case tests
```
## Integration with Other Skills
| Scenario | From | To | Action |
|----------|------|----|--------|
| Agent built, ready to test | `/hive-create` | `/hive-test` | Generate tests, start loop |
| Prompt fix needed | `/hive-test` Phase 4 | Direct edit | Edit `nodes/__init__.py`, resume |
| Goal definition wrong | `/hive-test` Phase 4 | `/hive-create` | Update goal, may need rebuild |
| Missing credentials | `/hive-test` Phase 3 | `/hive-credentials` | Set up credentials |
| Complex runtime failure | `/hive-test` Phase 3 | `/hive-debugger` | Deep L1/L2/L3 analysis |
| All tests pass | `/hive-test` Phase 6 | Done | Agent validated |
@@ -1,333 +0,0 @@
# Example: Iterative Testing of a Research Agent
This example walks through the full iterative test loop for a research agent that searches the web, reviews findings, and produces a cited report.
## Agent Structure
```
exports/deep_research_agent/
├── agent.py # Goal + graph: intake → research → review → report
├── nodes/__init__.py # Node definitions (system_prompt, input/output keys)
├── config.py # Model config
├── mcp_servers.json # Tools: web_search, web_scrape
└── tests/ # Test files (we'll create these)
```
**Goal:** "Rigorous Interactive Research" — find 5+ diverse sources, cite every claim, produce a complete report.
---
## Phase 1: Generate Tests
### Read the goal
```python
Read(file_path="exports/deep_research_agent/agent.py")
# Extract: goal_id="rigorous-interactive-research"
# success_criteria: source-diversity (>=5), citation-coverage (100%), report-completeness (90%)
# constraints: no-hallucination, source-attribution
```
### Get test guidelines
```python
result = generate_success_tests(
goal_id="rigorous-interactive-research",
goal_json='{"id": "rigorous-interactive-research", "success_criteria": [{"id": "source-diversity", "description": "Use multiple diverse sources", "target": ">=5"}, {"id": "citation-coverage", "description": "Every claim cites its source", "target": "100%"}, {"id": "report-completeness", "description": "Report answers the research questions", "target": "90%"}]}',
node_names="intake,research,review,report",
tool_names="web_search,web_scrape",
agent_path="exports/deep_research_agent"
)
```
### Write tests
```python
Write(
file_path="exports/deep_research_agent/tests/test_success_criteria.py",
content=result["file_header"] + '''
@pytest.mark.asyncio
async def test_success_source_diversity(runner, auto_responder, mock_mode):
"""At least 5 diverse sources are found."""
await auto_responder.start()
try:
result = await runner.run({"query": "impact of remote work on productivity"})
finally:
await auto_responder.stop()
assert result.success, f"Agent failed: {result.error}"
output = result.output or {}
sources = output.get("sources", [])
if isinstance(sources, list):
assert len(sources) >= 5, f"Expected >= 5 sources, got {len(sources)}"
@pytest.mark.asyncio
async def test_success_citation_coverage(runner, auto_responder, mock_mode):
"""Every factual claim in the report cites its source."""
await auto_responder.start()
try:
result = await runner.run({"query": "climate change effects on agriculture"})
finally:
await auto_responder.stop()
assert result.success, f"Agent failed: {result.error}"
output = result.output or {}
report = output.get("report", "")
# Check that report contains numbered references
assert "[1]" in str(report) or "[source" in str(report).lower(), "Report lacks citations"
@pytest.mark.asyncio
async def test_success_report_completeness(runner, auto_responder, mock_mode):
"""Report addresses the original research question."""
query = "pros and cons of nuclear energy"
await auto_responder.start()
try:
result = await runner.run({"query": query})
finally:
await auto_responder.stop()
assert result.success, f"Agent failed: {result.error}"
output = result.output or {}
report = output.get("report", "")
assert len(str(report)) > 200, f"Report too short: {len(str(report))} chars"
@pytest.mark.asyncio
async def test_empty_query_handling(runner, auto_responder, mock_mode):
"""Agent handles empty input gracefully."""
await auto_responder.start()
try:
result = await runner.run({"query": ""})
finally:
await auto_responder.stop()
output = result.output or {}
assert not result.success or output.get("error"), "Should handle empty query"
@pytest.mark.asyncio
async def test_feedback_loop_terminates(runner, auto_responder, mock_mode):
"""Feedback loop between review and research terminates."""
await auto_responder.start()
try:
result = await runner.run({"query": "quantum computing basics"})
finally:
await auto_responder.stop()
visits = result.node_visit_counts or {}
for node_id, count in visits.items():
assert count <= 5, f"Node {node_id} visited {count} times"
'''
)
```
---
## Phase 2: First Execution
```python
run_tests(
goal_id="rigorous-interactive-research",
agent_path="exports/deep_research_agent",
fail_fast=True
)
```
**Result:**
```json
{
"overall_passed": false,
"summary": {"total": 5, "passed": 3, "failed": 2, "pass_rate": "60.0%"},
"failures": [
{"test_name": "test_success_source_diversity", "details": "AssertionError: Expected >= 5 sources, got 2"},
{"test_name": "test_success_citation_coverage", "details": "AssertionError: Report lacks citations"}
]
}
```
---
## Phase 3: Analyze (Iteration 1)
### Debug the first failure
```python
debug_test(
goal_id="rigorous-interactive-research",
test_name="test_success_source_diversity",
agent_path="exports/deep_research_agent"
)
# Category: ASSERTION_FAILURE — Expected >= 5 sources, got 2
```
### Find the session and inspect memory
```python
list_agent_sessions(
agent_work_dir="~/.hive/agents/deep_research_agent",
status="completed",
limit=1
)
# → session_20260209_150000_abc12345
get_agent_session_memory(
agent_work_dir="~/.hive/agents/deep_research_agent",
session_id="session_20260209_150000_abc12345",
key="research_results"
)
# → Only 2 sources found. LLM stopped searching after 2 queries.
```
### Check LLM behavior in the research node
```python
query_runtime_log_raw(
agent_work_dir="~/.hive/agents/deep_research_agent",
run_id="session_20260209_150000_abc12345",
node_id="research"
)
# → LLM called web_search twice, got results, immediately called set_output.
# → Prompt doesn't instruct it to find at least 5 sources.
```
**Root cause:** The research node's system_prompt doesn't specify minimum source requirements.
---
## Phase 4: Fix (Iteration 1)
```python
Read(file_path="exports/deep_research_agent/nodes/__init__.py")
# Fix the research node prompt
Edit(
file_path="exports/deep_research_agent/nodes/__init__.py",
old_string='system_prompt="Search for information on the user\'s topic using web search."',
new_string='system_prompt="Search for information on the user\'s topic using web search. You MUST find at least 5 diverse, authoritative sources. Use multiple different search queries with varied keywords. Do NOT call set_output until you have gathered at least 5 distinct sources from different domains."'
)
```
---
## Phase 5: Recover & Resume (Iteration 1)
The fix is to the `research` node. Since this was a `run_tests` execution (no checkpoints), we re-run from scratch:
```python
run_tests(
goal_id="rigorous-interactive-research",
agent_path="exports/deep_research_agent",
fail_fast=True
)
```
**Result:**
```json
{
"overall_passed": false,
"summary": {"total": 5, "passed": 4, "failed": 1, "pass_rate": "80.0%"},
"failures": [
{"test_name": "test_success_citation_coverage", "details": "AssertionError: Report lacks citations"}
]
}
```
Source diversity now passes. Citation coverage still fails.
---
## Phase 3: Analyze (Iteration 2)
```python
debug_test(
goal_id="rigorous-interactive-research",
test_name="test_success_citation_coverage",
agent_path="exports/deep_research_agent"
)
# Category: ASSERTION_FAILURE — Report lacks citations
# Check what the report node produced
list_agent_sessions(
agent_work_dir="~/.hive/agents/deep_research_agent",
status="completed",
limit=1
)
# → session_20260209_151500_def67890
get_agent_session_memory(
agent_work_dir="~/.hive/agents/deep_research_agent",
session_id="session_20260209_151500_def67890",
key="report"
)
# → Report text exists but uses no numbered references.
# → Sources are in memory but report node doesn't cite them.
```
**Root cause:** The report node's prompt doesn't instruct the LLM to include numbered citations.
---
## Phase 4: Fix (Iteration 2)
```python
Edit(
file_path="exports/deep_research_agent/nodes/__init__.py",
old_string='system_prompt="Write a comprehensive report based on the research findings."',
new_string='system_prompt="Write a comprehensive report based on the research findings. You MUST include numbered citations [1], [2], etc. for every factual claim. At the end, include a References section listing all sources with their URLs. Every claim must be traceable to a specific source."'
)
```
---
## Phase 5: Resume (Iteration 2)
The fix is to the `report` node (the last node). To demonstrate checkpoint recovery, run via CLI:
```bash
# Run via CLI to get checkpoints
uv run hive run exports/deep_research_agent --input '{"topic": "climate change effects"}'
# After it runs, find the clean checkpoint before report
list_agent_checkpoints(
agent_work_dir="~/.hive/agents/deep_research_agent",
session_id="session_20260209_152000_ghi34567",
is_clean="true"
)
# → cp_node_complete_review_152100 (after review, before report)
# Resume — skips intake, research, review entirely
uv run hive run exports/deep_research_agent \
--resume-session session_20260209_152000_ghi34567 \
--checkpoint cp_node_complete_review_152100
```
Only the `report` node re-runs with the fixed prompt, using research data from the checkpoint.
---
## Phase 6: Final Verification
```python
run_tests(
goal_id="rigorous-interactive-research",
agent_path="exports/deep_research_agent"
)
```
**Result:**
```json
{
"overall_passed": true,
"summary": {"total": 5, "passed": 5, "failed": 0, "pass_rate": "100.0%"}
}
```
All tests pass.
---
## Summary
| Iteration | Failure | Root Cause | Fix | Recovery |
|-----------|---------|------------|-----|----------|
| 1 | Source diversity (2 < 5) | Research prompt too vague | Added "at least 5 sources" to prompt | Re-run (no checkpoints) |
| 2 | No citations in report | Report prompt lacks citation instructions | Added citation requirements | Checkpoint resume (skipped 3 nodes) |
**Key takeaways:**
- Phase 3 analysis (session memory + L3 logs) identified root causes without guessing
- Checkpoint recovery in iteration 2 saved time by skipping 3 expensive nodes
- Final `run_tests` confirms all scenarios pass end-to-end
-526
View File
@@ -1,526 +0,0 @@
---
name: hive
description: Complete workflow for building, implementing, and testing goal-driven agents. Orchestrates hive-* skills. Use when starting a new agent project, unsure which skill to use, or need end-to-end guidance.
license: Apache-2.0
metadata:
author: hive
version: "2.0"
type: workflow-orchestrator
orchestrates:
- hive-concepts
- hive-create
- hive-patterns
- hive-test
- hive-credentials
- hive-debugger
---
# Agent Development Workflow
**THIS IS AN EXECUTABLE WORKFLOW. DO NOT explore the codebase or read source files. ROUTE to the correct skill IMMEDIATELY.**
When this skill is loaded, **ALWAYS use the AskUserQuestion tool** to present options:
```
Use AskUserQuestion with these options:
- "Build a new agent" → Then invoke /hive-create
- "Test an existing agent" → Then invoke /hive-test
- "Learn agent concepts" → Then invoke /hive-concepts
- "Optimize agent design" → Then invoke /hive-patterns
- "Set up credentials" → Then invoke /hive-credentials
- "Debug a failing agent" → Then invoke /hive-debugger
- "Other" (please describe what you want to achieve)
```
**DO NOT:** Read source files, explore the codebase, search for code, or do any investigation before routing. The sub-skills handle all of that.
---
Complete Standard Operating Procedure (SOP) for building production-ready goal-driven agents.
## Overview
This workflow orchestrates specialized skills to take you from initial concept to production-ready agent:
1. **Understand Concepts**`/hive-concepts` (optional)
2. **Build Structure**`/hive-create`
3. **Optimize Design**`/hive-patterns` (optional)
4. **Setup Credentials**`/hive-credentials` (if agent uses tools requiring API keys)
5. **Test & Validate**`/hive-test`
6. **Debug Issues**`/hive-debugger` (if agent fails at runtime)
## When to Use This Workflow
Use this meta-skill when:
- Starting a new agent from scratch
- Unclear which skill to use first
- Need end-to-end guidance for agent development
- Want consistent, repeatable agent builds
**Skip this workflow** if:
- You only need to test an existing agent → use `/hive-test` directly
- You know exactly which phase you're in → use specific skill directly
## Quick Decision Tree
```
"Need to understand agent concepts" → hive-concepts
"Build a new agent" → hive-create
"Optimize my agent design" → hive-patterns
"Need client-facing nodes or feedback loops" → hive-patterns
"Set up API keys for my agent" → hive-credentials
"Test my agent" → hive-test
"My agent is failing/stuck/has errors" → hive-debugger
"Not sure what I need" → Read phases below, then decide
"Agent has structure but needs implementation" → See agent directory STATUS.md
```
## Phase 0: Understand Concepts (Optional)
**Skill**: `/hive-concepts`
**Input**: Questions about agent architecture
### When to Use
- First time building an agent
- Need to understand node types, edges, goals
- Want to validate tool availability
- Learning about event loop architecture and client-facing nodes
### What This Phase Provides
- Architecture overview (Python packages, not JSON)
- Core concepts (Goal, Node, Edge, Event Loop, Judges)
- Tool discovery and validation procedures
- Workflow overview
**Skip this phase** if you already understand agent fundamentals.
## Phase 1: Build Agent Structure
**Skill**: `/hive-create`
**Input**: User requirements ("Build an agent that...") or a template to start from
### What This Phase Does
Creates the complete agent architecture:
- Package structure (`exports/agent_name/`)
- Goal with success criteria and constraints
- Workflow graph (nodes and edges)
- Node specifications
- CLI interface
- Documentation
### Process
1. **Create package** - Directory structure with skeleton files
2. **Define goal** - Success criteria and constraints written to agent.py
3. **Design nodes** - Each node approved and written incrementally
4. **Connect edges** - Workflow graph with conditional routing
5. **Finalize** - Agent class, exports, and documentation
### Outputs
-`exports/agent_name/` package created
- ✅ Goal defined in agent.py
- ✅ 3-5 success criteria defined
- ✅ 1-5 constraints defined
- ✅ 5-10 nodes specified in nodes/__init__.py
- ✅ 8-15 edges connecting workflow
- ✅ Validated structure (passes `uv run python -m agent_name validate`)
- ✅ README.md with usage instructions
- ✅ CLI commands (info, validate, run, shell)
### Success Criteria
You're ready for Phase 2 when:
- Agent structure validates without errors
- All nodes and edges are defined
- CLI commands work (info, validate)
- You see: "Agent complete: exports/agent_name/"
### Common Outputs
The hive-create skill produces:
```
exports/agent_name/
├── __init__.py (package exports)
├── __main__.py (CLI interface)
├── agent.py (goal, graph, agent class)
├── nodes/__init__.py (node specifications)
├── config.py (configuration)
├── implementations.py (may be created for Python functions)
└── README.md (documentation)
```
### Next Steps
**If structure complete and validated:**
→ Check `exports/agent_name/STATUS.md` or `IMPLEMENTATION_GUIDE.md`
→ These files explain implementation options
→ You may need to add Python functions or MCP tools (not covered by current skills)
**If want to optimize design:**
→ Proceed to Phase 1.5 (hive-patterns)
**If ready to test:**
→ Proceed to Phase 2
## Phase 1.5: Optimize Design (Optional)
**Skill**: `/hive-patterns`
**Input**: Completed agent structure
### When to Use
- Want to add client-facing blocking or feedback edges
- Need judge patterns for output validation
- Want fan-out/fan-in (parallel execution)
- Need error handling patterns
- Want best practices guidance
### What This Phase Provides
- Client-facing interaction patterns
- Feedback edge routing with nullable output keys
- Judge patterns (implicit, SchemaJudge)
- Fan-out/fan-in parallel execution
- Context management and spillover patterns
- Anti-patterns to avoid
**Skip this phase** if your agent design is straightforward.
## Phase 2: Test & Validate
**Skill**: `/hive-test`
**Input**: Working agent from Phase 1
### What This Phase Does
Guides the creation and execution of a comprehensive test suite:
- Constraint tests
- Success criteria tests
- Edge case tests
- Integration tests
### Process
1. **Analyze agent** - Read goal, constraints, success criteria
2. **Generate tests** - The calling agent writes pytest files in `exports/agent_name/tests/` using hive-test guidelines and templates
3. **User approval** - Review and approve each test
4. **Run evaluation** - Execute tests and collect results
5. **Debug failures** - Identify and fix issues
6. **Iterate** - Repeat until all tests pass
### Outputs
- ✅ Test files in `exports/agent_name/tests/`
- ✅ Test report with pass/fail metrics
- ✅ Coverage of all success criteria
- ✅ Coverage of all constraints
- ✅ Edge case handling verified
### Success Criteria
You're done when:
- All tests pass
- All success criteria validated
- All constraints verified
- Agent handles edge cases
- Test coverage is comprehensive
### Next Steps
**Agent ready for:**
- Production deployment
- Integration into larger systems
- Documentation and handoff
- Continuous monitoring
## Phase Transitions
### From Phase 1 to Phase 2
**Trigger signals:**
- "Agent complete: exports/..."
- Structure validation passes
- README indicates implementation complete
**Before proceeding:**
- Verify agent can be imported: `from exports.agent_name import default_agent`
- Check if implementation is needed (see STATUS.md or IMPLEMENTATION_GUIDE.md)
- Confirm agent executes without import errors
### Skipping Phases
**When to skip Phase 1:**
- Agent structure already exists
- Only need to add tests
- Modifying existing agent
**When to skip Phase 2:**
- Prototyping or exploring
- Agent not production-bound
- Manual testing sufficient
## Common Patterns
### Pattern 1: Complete New Build (Simple)
```
User: "Build an agent that monitors files"
→ Use /hive-create
→ Agent structure created
→ Use /hive-test
→ Tests created and passing
→ Done: Production-ready agent
```
### Pattern 1b: Complete New Build (With Learning)
```
User: "Build an agent (first time)"
→ Use /hive-concepts (understand concepts)
→ Use /hive-create (build structure)
→ Use /hive-patterns (optimize design)
→ Use /hive-test (validate)
→ Done: Production-ready agent
```
### Pattern 1c: Build from Template
```
User: "Build an agent based on the deep research template"
→ Use /hive-create
→ Select "From a template" path
→ Pick template, name new agent
→ Review/modify goal, nodes, graph
→ Agent exported with customizations
→ Use /hive-test
→ Done: Customized agent
```
### Pattern 2: Test Existing Agent
```
User: "Test my agent at exports/my_agent"
→ Skip Phase 1
→ Use /hive-test directly
→ Tests created
→ Done: Validated agent
```
### Pattern 3: Iterative Development
```
User: "Build an agent"
→ Use /hive-create (Phase 1)
→ Implementation needed (see STATUS.md)
→ [User implements functions]
→ Use /hive-test (Phase 2)
→ Tests reveal bugs
→ [Fix bugs manually]
→ Re-run tests
→ Done: Working agent
```
### Pattern 4: Agent with Review Loops and HITL Checkpoints
```
User: "Build an agent with human review and feedback loops"
→ Use /hive-concepts (learn event loop, client-facing nodes)
→ Use /hive-create (build structure with feedback edges)
→ Use /hive-patterns (implement client-facing + feedback patterns)
→ Use /hive-test (validate review flows and edge routing)
→ Done: Agent with HITL checkpoints and review loops
```
## Skill Dependencies
```
hive (meta-skill)
├── hive-concepts (foundational)
│ ├── Architecture concepts (event loop, judges)
│ ├── Node types (event_loop, function)
│ ├── Edge routing and priority
│ ├── Tool discovery procedures
│ └── Workflow overview
├── hive-create (procedural)
│ ├── Creates package structure
│ ├── Defines goal
│ ├── Adds nodes (event_loop, function)
│ ├── Connects edges with priority routing
│ ├── Finalizes agent class
│ └── Requires: hive-concepts
├── hive-patterns (reference)
│ ├── Client-facing interaction patterns
│ ├── Feedback edges and review loops
│ ├── Judge patterns (implicit, SchemaJudge)
│ ├── Fan-out/fan-in parallel execution
│ └── Context management and anti-patterns
├── hive-credentials (utility)
│ ├── Detects missing credentials
│ ├── Offers auth method choices (Aden OAuth, direct API key)
│ ├── Stores securely in ~/.hive/credentials
│ └── Validates with health checks
├── hive-test (validation)
│ ├── Reads agent goal
│ ├── Generates tests
│ ├── Runs evaluation
│ └── Reports results
└── hive-debugger (troubleshooting)
├── Monitors runtime logs (L1/L2/L3)
├── Identifies retry loops, tool failures
├── Categorizes issues (10 categories)
└── Provides fix recommendations
```
## Troubleshooting
### "Agent structure won't validate"
- Check node IDs match between nodes/__init__.py and agent.py
- Verify all edges reference valid node IDs
- Ensure entry_node exists in nodes list
- Run: `PYTHONPATH=exports uv run python -m agent_name validate`
### "Agent has structure but won't run"
- Check for STATUS.md or IMPLEMENTATION_GUIDE.md in agent directory
- Implementation may be needed (Python functions or MCP tools)
- This is expected - hive-create creates structure, not implementation
- See implementation guide for completion options
### "Tests are failing"
- Review test output for specific failures
- Check agent goal and success criteria
- Verify constraints are met
- Use `/hive-test` to debug and iterate
- Fix agent code and re-run tests
### "Agent is failing at runtime"
- Use `/hive-debugger` to analyze runtime logs
- The debugger identifies retry loops, tool failures, and stalled execution
- Get actionable fix recommendations with code changes
- Monitor the agent in real-time during TUI sessions
### "Not sure which phase I'm in"
Run these checks:
```bash
# Check if agent structure exists
ls exports/my_agent/agent.py
# Check if it validates
PYTHONPATH=exports uv run python -m my_agent validate
# Check if tests exist
ls exports/my_agent/tests/
# If structure exists and validates → Phase 2 (testing)
# If structure doesn't exist → Phase 1 (building)
# If tests exist but failing → Debug phase
```
## Best Practices
### For Phase 1 (Building)
1. **Start with clear requirements** - Know what the agent should do
2. **Define success criteria early** - Measurable goals drive design
3. **Keep nodes focused** - One responsibility per node
4. **Use descriptive names** - Node IDs should explain purpose
5. **Validate incrementally** - Check structure after each major addition
### For Phase 2 (Testing)
1. **Test constraints first** - Hard requirements must pass
2. **Mock external dependencies** - Use mock mode for LLMs/APIs
3. **Cover edge cases** - Test failures, not just success paths
4. **Iterate quickly** - Fix one test at a time
5. **Document test patterns** - Future tests follow same structure
### General Workflow
1. **Use version control** - Git commit after each phase
2. **Document decisions** - Update README with changes
3. **Keep iterations small** - Build → Test → Fix → Repeat
4. **Preserve working states** - Tag successful iterations
5. **Learn from failures** - Failed tests reveal design issues
## Exit Criteria
You're done with the workflow when:
✅ Agent structure validates
✅ All tests pass
✅ Success criteria met
✅ Constraints verified
✅ Documentation complete
✅ Agent ready for deployment
## Additional Resources
- **hive-concepts**: See `.claude/skills/hive-concepts/SKILL.md`
- **hive-create**: See `.claude/skills/hive-create/SKILL.md`
- **hive-patterns**: See `.claude/skills/hive-patterns/SKILL.md`
- **hive-test**: See `.claude/skills/hive-test/SKILL.md`
- **Agent framework docs**: See `core/README.md`
- **Example agents**: See `exports/` directory
## Summary
This workflow provides a proven path from concept to production-ready agent:
1. **Learn** with `/hive-concepts` → Understand fundamentals (optional)
2. **Build** with `/hive-create` → Get validated structure
3. **Optimize** with `/hive-patterns` → Apply best practices (optional)
4. **Configure** with `/hive-credentials` → Set up API keys (if needed)
5. **Test** with `/hive-test` → Get verified functionality
6. **Debug** with `/hive-debugger` → Fix runtime issues (if needed)
The workflow is **flexible** - skip phases as needed, iterate freely, and adapt to your specific requirements. The goal is **production-ready agents** built with **consistent, repeatable processes**.
## Skill Selection Guide
**Choose hive-concepts when:**
- First time building agents
- Need to understand event loop architecture
- Validating tool availability
- Learning about node types, edges, and judges
**Choose hive-create when:**
- Actually building an agent
- Have clear requirements
- Ready to write code
- Want step-by-step guidance
- Want to start from an existing template and customize it
**Choose hive-patterns when:**
- Agent structure complete
- Need client-facing nodes or feedback edges
- Implementing review loops or fan-out/fan-in
- Want judge patterns or context management
- Want best practices
**Choose hive-test when:**
- Agent structure complete
- Ready to validate functionality
- Need comprehensive test coverage
- Testing feedback loops, output keys, or fan-out
**Choose hive-debugger when:**
- Agent is failing or stuck at runtime
- Seeing retry loops or escalations
- Tool calls are failing
- Need to understand why a node isn't completing
- Want real-time monitoring of agent execution
@@ -1,199 +0,0 @@
# Example: File Monitor Agent
This example shows the complete /hive workflow in action for building a file monitoring agent.
## Initial Request
```
User: "Build an agent that monitors ~/Downloads and copies new files to ~/Documents"
```
## Phase 1: Building (20 minutes)
### Step 1: Create Structure
Agent invokes `/hive-create` skill and:
1. Creates `exports/file_monitor_agent/` package
2. Writes skeleton files (__init__.py, __main__.py, agent.py, etc.)
**Output**: Package structure visible immediately
### Step 2: Define Goal
```python
goal = Goal(
id="file-monitor-copy",
name="Automated File Monitor & Copy",
success_criteria=[
# 100% detection rate
# 100% copy success
# 100% conflict resolution
# >99% uptime
],
constraints=[
# Preserve originals
# Handle errors gracefully
# Track state
# Respect permissions
]
)
```
**Output**: Goal written to agent.py
### Step 3: Design Nodes
7 nodes approved and written incrementally:
1. `initialize-state` - Set up tracking
2. `list-downloads` - Scan directory
3. `identify-new-files` - Find new files
4. `check-for-new-files` - Router
5. `copy-files` - Copy with conflict resolution
6. `update-state` - Mark as processed
7. `wait-interval` - Sleep between cycles
**Output**: All nodes in nodes/__init__.py
### Step 4: Connect Edges
8 edges connecting the workflow loop:
```
initialize → list → identify → check
↓ ↓
copy wait
↓ ↑
update ↓
↓ ↓
wait → list (loop)
```
**Output**: Edges written to agent.py
### Step 5: Finalize
```bash
$ PYTHONPATH=exports uv run python -m file_monitor_agent validate
✓ Agent is valid
$ PYTHONPATH=exports uv run python -m file_monitor_agent info
Agent: File Monitor & Copy Agent
Nodes: 7
Edges: 8
```
**Phase 1 Complete**: Structure validated ✅
### Status After Phase 1
```
exports/file_monitor_agent/
├── __init__.py ✅ (exports)
├── __main__.py ✅ (CLI)
├── agent.py ✅ (goal, graph, agent class)
├── nodes/__init__.py ✅ (7 nodes)
├── config.py ✅ (configuration)
├── implementations.py ✅ (Python functions)
├── README.md ✅ (documentation)
├── IMPLEMENTATION_GUIDE.md ✅ (next steps)
└── STATUS.md ✅ (current state)
```
**Note**: Implementation gap exists - data flow needs connection (covered in STATUS.md)
## Phase 2: Testing (25 minutes)
### Step 1: Analyze Agent
Agent invokes `/hive-test` skill and:
1. Reads goal from `exports/file_monitor_agent/agent.py`
2. Identifies 4 success criteria to test
3. Identifies 4 constraints to verify
4. Plans test coverage
### Step 2: Generate Tests
Creates test files:
```
exports/file_monitor_agent/tests/
├── conftest.py (fixtures)
├── test_constraints.py (4 constraint tests)
├── test_success_criteria.py (4 success tests)
└── test_edge_cases.py (error handling)
```
Tests approved incrementally by user.
### Step 3: Run Tests
```bash
$ PYTHONPATH=exports uv run pytest exports/file_monitor_agent/tests/
test_constraints.py::test_preserves_originals PASSED
test_constraints.py::test_handles_errors PASSED
test_constraints.py::test_tracks_state PASSED
test_constraints.py::test_respects_permissions PASSED
test_success_criteria.py::test_detects_all_files PASSED
test_success_criteria.py::test_copies_all_files PASSED
test_success_criteria.py::test_resolves_conflicts PASSED
test_success_criteria.py::test_continuous_run PASSED
test_edge_cases.py::test_empty_directory PASSED
test_edge_cases.py::test_permission_denied PASSED
test_edge_cases.py::test_disk_full PASSED
test_edge_cases.py::test_large_files PASSED
========================== 12 passed in 3.42s ==========================
```
**Phase 2 Complete**: All tests pass ✅
## Final Output
**Production-Ready Agent:**
```bash
# Run the agent
./RUN_AGENT.sh
# Or manually
PYTHONPATH=exports uv run python -m file_monitor_agent run
```
**Capabilities:**
- Monitors ~/Downloads continuously
- Copies new files to ~/Documents
- Resolves conflicts with timestamps
- Handles errors gracefully
- Tracks processed files
- Runs as background service
**Total Time**: ~45 minutes from concept to production
## Key Learnings
1. **Incremental building** - Files written immediately, visible throughout
2. **Validation early** - Structure validated before moving to implementation
3. **Test-driven** - Tests reveal real behavior
4. **Documentation included** - README, STATUS, and guides auto-generated
5. **Repeatable process** - Same workflow for any agent type
## Variations
**For simpler agents:**
- Fewer nodes (3-5 instead of 7)
- Simpler workflow (linear instead of looping)
- Faster build time (10-15 minutes)
**For complex agents:**
- More nodes (10-15+)
- Multiple subgraphs
- Pause/resume points for human-in-the-loop
- Longer build time (45-60 minutes)
The workflow scales to your needs!
-7
View File
@@ -1,7 +0,0 @@
# Project-level Codex config for Hive.
# Keep this file minimal: MCP connectivity + skill discovery.
[mcp_servers.agent-builder]
command = "uv"
args = ["run", "--directory", "core", "-m", "framework.mcp.agent_builder_server"]
cwd = "."
-20
View File
@@ -1,20 +0,0 @@
{
"mcpServers": {
"agent-builder": {
"command": "python",
"args": ["-m", "framework.mcp.agent_builder_server"],
"cwd": "core",
"env": {
"PYTHONPATH": "../tools/src"
}
},
"tools": {
"command": "python",
"args": ["mcp_server.py", "--stdio"],
"cwd": "tools",
"env": {
"PYTHONPATH": "src"
}
}
}
}
-1
View File
@@ -1 +0,0 @@
../../.claude/skills/hive
-1
View File
@@ -1 +0,0 @@
../../.claude/skills/hive-concepts
-1
View File
@@ -1 +0,0 @@
../../.claude/skills/hive-create
-1
View File
@@ -1 +0,0 @@
../../.claude/skills/hive-credentials
-1
View File
@@ -1 +0,0 @@
../../.claude/skills/hive-patterns
-1
View File
@@ -1 +0,0 @@
../../.claude/skills/hive-test
-31
View File
@@ -1,31 +0,0 @@
name: Link Discord Account
description: Connect your GitHub and Discord for the bounty program
title: "link: @{{ github.actor }}"
labels: ["link-discord"]
body:
- type: markdown
attributes:
value: |
Link your Discord account to receive XP and role rewards when your bounty PRs are merged.
**How to find your Discord ID:**
1. Open Discord Settings > Advanced > Enable **Developer Mode**
2. Right-click your username > **Copy User ID**
- type: input
id: discord_id
attributes:
label: Discord User ID
description: "Your numeric Discord ID (not your username). Example: 123456789012345678"
placeholder: "123456789012345678"
validations:
required: true
- type: input
id: display_name
attributes:
label: Display Name (optional)
description: How you'd like to be credited
placeholder: "Jane Doe"
validations:
required: false
+13 -5
View File
@@ -5,7 +5,7 @@ on:
branches: [main]
pull_request:
branches: [main]
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true
@@ -24,6 +24,8 @@ jobs:
- name: Install uv
uses: astral-sh/setup-uv@v4
with:
enable-cache: true
- name: Install dependencies
run: uv sync --project core --group dev
@@ -54,10 +56,12 @@ jobs:
- name: Install uv
uses: astral-sh/setup-uv@v4
with:
enable-cache: true
- name: Install dependencies and run tests
working-directory: core
run: |
cd core
uv sync
uv run pytest tests/ -v
@@ -77,10 +81,12 @@ jobs:
- name: Install uv
uses: astral-sh/setup-uv@v4
with:
enable-cache: true
- name: Install dependencies and run tests
working-directory: tools
run: |
cd tools
uv sync --extra dev
uv run pytest tests/ -v
@@ -98,10 +104,12 @@ jobs:
- name: Install uv
uses: astral-sh/setup-uv@v4
with:
enable-cache: true
- name: Install dependencies
working-directory: core
run: |
cd core
uv sync
- name: Validate exported agents
@@ -0,0 +1,54 @@
# Closes PRs that still have the `pr-requirements-warning` label
# after contributors were warned in pr-requirements.yml.
name: PR Requirements Enforcement
on:
schedule:
- cron: "0 0 * * *" # runs every day once at midnight
jobs:
enforce:
name: Close PRs still failing contribution requirements
runs-on: ubuntu-latest
permissions:
pull-requests: write
issues: write
steps:
- name: Close PRs still failing requirements
uses: actions/github-script@v7
with:
script: |
const { owner, repo } = context.repo;
const prs = await github.paginate(github.rest.pulls.list, {
owner,
repo,
state: "open",
per_page: 100
});
for (const pr of prs) {
// Skip draft PRs — author may still be actively working toward compliance
if (pr.draft) continue;
const labels = pr.labels.map(l => l.name);
if (!labels.includes("pr-requirements-warning")) continue;
const gracePeriod = 24 * 60 * 60 * 1000;
const lastUpdated = new Date(pr.created_at);
const now = new Date();
if (now - lastUpdated < gracePeriod) {
console.log(`Skipping PR #${pr.number} — still within grace period`);
continue;
}
const prNumber = pr.number;
const prAuthor = pr.user.login;
await github.rest.issues.createComment({
owner,
repo,
issue_number: prNumber,
body: `Closing PR because the contribution requirements were not resolved within the 24-hour grace period.
If this was closed in error, feel free to reopen the PR after fixing the requirements.`
});
await github.rest.pulls.update({
owner,
repo,
pull_number: prNumber,
state: "closed"
});
console.log(`Closed PR #${prNumber} by ${prAuthor} (PR requirements were not met)`);
}
+31 -17
View File
@@ -43,9 +43,10 @@ jobs:
console.log(` Found issue references: ${issueNumbers.length > 0 ? issueNumbers.join(', ') : 'none'}`);
if (issueNumbers.length === 0) {
const message = `## PR Closed - Requirements Not Met
const message = `## PR Requirements Warning
This PR has been automatically closed because it doesn't meet the requirements.
This PR does not meet the contribution requirements.
If the issue is not fixed within ~24 hours, it may be automatically closed.
**Missing:** No linked issue found.
@@ -67,14 +68,15 @@ jobs:
**Why is this required?** See #472 for details.`;
const comments = await github.rest.issues.listComments({
const comments = await github.paginate(github.rest.issues.listComments, {
owner: context.repo.owner,
repo: context.repo.repo,
issue_number: prNumber,
per_page: 100,
});
const botComment = comments.data.find(
(c) => c.user.type === 'Bot' && c.body.includes('PR Closed - Requirements Not Met')
const botComment = comments.find(
(c) => c.user.type === 'Bot' && c.body.includes('PR Requirements Warning')
);
if (!botComment) {
@@ -86,11 +88,11 @@ jobs:
});
}
await github.rest.pulls.update({
await github.rest.issues.addLabels({
owner: context.repo.owner,
repo: context.repo.repo,
pull_number: prNumber,
state: 'closed',
issue_number: prNumber,
labels: ['pr-requirements-warning'],
});
core.setFailed('PR must reference an issue');
@@ -132,9 +134,10 @@ jobs:
`#${i.number} (assignees: ${i.assignees.length > 0 ? i.assignees.join(', ') : 'none'})`
).join(', ');
const message = `## PR Closed - Requirements Not Met
const message = `## PR Requirements Warning
This PR has been automatically closed because it doesn't meet the requirements.
This PR does not meet the contribution requirements.
If the issue is not fixed within ~24 hours, it may be automatically closed.
**PR Author:** @${prAuthor}
**Found issues:** ${issueList}
@@ -157,14 +160,15 @@ jobs:
**Why is this required?** See #472 for details.`;
const comments = await github.rest.issues.listComments({
const comments = await github.paginate(github.rest.issues.listComments, {
owner: context.repo.owner,
repo: context.repo.repo,
issue_number: prNumber,
per_page: 100,
});
const botComment = comments.data.find(
(c) => c.user.type === 'Bot' && c.body.includes('PR Closed - Requirements Not Met')
const botComment = comments.find(
(c) => c.user.type === 'Bot' && c.body.includes('PR Requirements Warning')
);
if (!botComment) {
@@ -176,14 +180,24 @@ jobs:
});
}
await github.rest.pulls.update({
await github.rest.issues.addLabels({
owner: context.repo.owner,
repo: context.repo.repo,
pull_number: prNumber,
state: 'closed',
issue_number: prNumber,
labels: ['pr-requirements-warning'],
});
core.setFailed('PR author must be assigned to the linked issue');
} else {
console.log(`PR requirements met! Issue #${issueWithAuthorAssigned} has ${prAuthor} as assignee.`);
}
try {
await github.rest.issues.removeLabel({
owner: context.repo.owner,
repo: context.repo.repo,
issue_number: prNumber,
name: "pr-requirements-warning"
});
}catch (error){
//ignore if label doesn't exist
}
}
-2
View File
@@ -67,8 +67,6 @@ temp/
exports/*
.agent-builder-sessions/*
.claude/settings.local.json
.claude/skills/ship-it/
+1 -7
View File
@@ -1,9 +1,3 @@
{
"mcpServers": {
"agent-builder": {
"command": "uv",
"args": ["run", "-m", "framework.mcp.agent_builder_server"],
"cwd": "core"
}
}
"mcpServers": {}
}
-30
View File
@@ -1,30 +0,0 @@
{
"mcpServers": {
"agent-builder": {
"command": "uv",
"args": [
"run",
"python",
"-m",
"framework.mcp.agent_builder_server"
],
"cwd": "core",
"env": {
"PYTHONPATH": "../tools/src"
}
},
"tools": {
"command": "uv",
"args": [
"run",
"python",
"mcp_server.py",
"--stdio"
],
"cwd": "tools",
"env": {
"PYTHONPATH": "src"
}
}
}
}
-1
View File
@@ -1 +0,0 @@
../../.claude/skills/hive
-1
View File
@@ -1 +0,0 @@
../../.claude/skills/hive-concepts
-1
View File
@@ -1 +0,0 @@
../../.claude/skills/hive-create
-1
View File
@@ -1 +0,0 @@
../../.claude/skills/hive-credentials
-1
View File
@@ -1 +0,0 @@
../../.claude/skills/hive-debugger
-1
View File
@@ -1 +0,0 @@
../../.claude/skills/hive-patterns
-1
View File
@@ -1 +0,0 @@
../../.claude/skills/hive-test
-1
View File
@@ -1 +0,0 @@
../../.claude/skills/triage-issue
-7
View File
@@ -1,7 +0,0 @@
{
"recommendations": [
"charliermarsh.ruff",
"editorconfig.editorconfig",
"ms-python.python"
]
}
-1
View File
@@ -1,5 +1,4 @@
exports/
docs/
.agent-builder-sessions/
.pytest_cache/
**/__pycache__/
-5
View File
@@ -1,10 +1,5 @@
{
"mcpServers": {
"agent-builder": {
"command": "python",
"args": ["-m", "framework.mcp.agent_builder_server"],
"cwd": "core"
},
"tools": {
"command": "python",
"args": ["-m", "aden_tools.mcp_server", "--stdio"],
+10 -11
View File
@@ -1,17 +1,16 @@
# MCP Server Guide - Agent Builder
# MCP Server Guide - Agent Building Tools
This guide covers the MCP (Model Context Protocol) server for building goal-driven agents.
> **Note:** The standalone `agent-builder` MCP server (`framework.mcp.agent_builder_server`) has been replaced. Agent building is now done via the `coder-tools` server's `initialize_agent_package` tool, with underlying logic in `framework.builder.package_generator`.
This guide covers the MCP tools available for building goal-driven agents.
## Setup
### Quick Setup
```bash
# Using the setup script (recommended)
python setup_mcp.py
# Or using bash
./setup_mcp.sh
# Run the quickstart script (recommended)
./quickstart.sh
```
### Manual Configuration
@@ -21,10 +20,10 @@ Add to your MCP client configuration (e.g., Claude Desktop):
```json
{
"mcpServers": {
"agent-builder": {
"command": "python",
"args": ["-m", "framework.mcp.agent_builder_server"],
"cwd": "/path/to/goal-agent"
"coder-tools": {
"command": "uv",
"args": ["run", "coder_tools_server.py", "--stdio"],
"cwd": "/path/to/hive/tools"
}
}
}
+4 -59
View File
@@ -17,66 +17,11 @@ Framework provides a runtime framework that captures **decisions**, not just act
uv pip install -e .
```
## MCP Server Setup
## Agent Building
The framework includes an MCP (Model Context Protocol) server for building agents. To set up the MCP server:
Agent scaffolding is handled by the `coder-tools` MCP server (in `tools/coder_tools_server.py`), which provides the `initialize_agent_package` tool and related utilities. The underlying package generation logic lives in `framework.builder.package_generator`.
### Automated Setup
**Using bash (Linux/macOS):**
```bash
./setup_mcp.sh
```
**Using Python (cross-platform):**
```bash
python setup_mcp.py
```
The setup script will:
1. Install the framework package
2. Install MCP dependencies (mcp, fastmcp)
3. Create/verify `.mcp.json` configuration
4. Test the MCP server module
### Manual Setup
If you prefer manual setup:
```bash
# Install framework
uv pip install -e .
# Install MCP dependencies
uv pip install mcp fastmcp
# Test the server
uv run python -m framework.mcp.agent_builder_server
```
### Using with MCP Clients
To use the agent builder with Claude Desktop or other MCP clients, add this to your MCP client configuration:
```json
{
"mcpServers": {
"agent-builder": {
"command": "python",
"args": ["-m", "framework.mcp.agent_builder_server"],
"cwd": "/path/to/hive/core"
}
}
}
```
The MCP server provides tools for:
- Creating agent building sessions
- Defining goals with success criteria
- Adding nodes (event_loop only)
- Connecting nodes with edges
- Validating and exporting agent graphs
- Testing nodes and full agent graphs
See the [Getting Started Guide](../docs/getting-started.md) for building agents.
## Quick Start
@@ -145,7 +90,7 @@ uv run python -m framework test-debug <agent_path> <test_name>
uv run python -m framework test-list <agent_path>
```
For detailed testing workflows, see the [hive-test skill](../.claude/skills/hive-test/SKILL.md).
For detailed testing workflows, see [developer-guide.md](../docs/developer-guide.md).
### Analyzing Agent Behavior with Builder
-75
View File
@@ -95,81 +95,6 @@ async def example_3_config_file():
(test_agent_path / "mcp_servers.json").unlink()
async def example_4_custom_agent_with_mcp_tools():
"""Example 4: Build custom agent that uses MCP tools"""
print("\n=== Example 4: Custom Agent with MCP Tools ===\n")
from framework.builder.workflow import GraphBuilder
# Create a workflow builder
builder = GraphBuilder()
# Define goal
builder.set_goal(
goal_id="web-researcher",
name="Web Research Agent",
description="Search the web and summarize findings",
)
# Add success criteria
builder.add_success_criterion(
"search-results", "Successfully retrieve at least 3 web search results"
)
builder.add_success_criterion("summary", "Provide a clear, concise summary of the findings")
# Add nodes that will use MCP tools
builder.add_node(
node_id="web-searcher",
name="Web Search",
description="Search the web for information",
node_type="event_loop",
system_prompt="Search for {query} and return the top results. Use the web_search tool.",
tools=["web_search"], # This tool comes from tools MCP server
input_keys=["query"],
output_keys=["search_results"],
)
builder.add_node(
node_id="summarizer",
name="Summarize Results",
description="Summarize the search results",
node_type="event_loop",
system_prompt="Summarize the following search results in 2-3 sentences: {search_results}",
input_keys=["search_results"],
output_keys=["summary"],
)
# Connect nodes
builder.add_edge("web-searcher", "summarizer")
# Set entry point
builder.set_entry("web-searcher")
builder.set_terminal("summarizer")
# Export the agent
export_path = Path("exports/web-research-agent")
export_path.mkdir(parents=True, exist_ok=True)
builder.export(export_path)
# Load and register MCP server
runner = AgentRunner.load(export_path)
runner.register_mcp_server(
name="tools",
transport="stdio",
command="python",
args=["-m", "aden_tools.mcp_server", "--stdio"],
cwd="../tools",
)
# Run the agent
result = await runner.run({"query": "latest AI breakthroughs 2026"})
print(f"\nAgent completed with result:\n{result}")
# Cleanup
runner.cleanup()
async def main():
"""Run all examples"""
print("=" * 60)
@@ -406,7 +406,8 @@ nodes = [
client_facing=True,
max_node_visits=0,
input_keys=[],
output_keys=[],
output_keys=["test_result"],
nullable_output_keys=["test_result"],
tools=["get_account_info"],
system_prompt="""\
You are a credential tester. Your job is to help the user verify that their \
@@ -444,7 +445,7 @@ edges = []
entry_node = "tester"
entry_points = {"start": "tester"}
pause_nodes = []
terminal_nodes = [] # Forever-alive: loops until user exits
terminal_nodes = ["tester"] # Tester node can terminate
conversation_mode = "continuous"
identity_prompt = (
@@ -531,7 +532,7 @@ class CredentialTesterAgent:
version="1.0.0",
entry_node="tester",
entry_points={"start": "tester"},
terminal_nodes=[],
terminal_nodes=["tester"], # Tester node can terminate
pause_nodes=[],
nodes=[tester_node],
edges=[],
@@ -51,7 +51,8 @@ The key is pre-injected into the session environment and tools read it automatic
client_facing=True,
max_node_visits=0,
input_keys=[],
output_keys=[],
output_keys=["test_result"],
nullable_output_keys=["test_result"],
tools=tools,
system_prompt=f"""\
You are a credential tester for the {account_label}: {provider}/{alias}{detail}
@@ -7,9 +7,7 @@ from natural language specifications.
"""
from .agent import (
HiveCoderAgent,
conversation_mode,
default_agent,
edges,
entry_node,
entry_points,
@@ -25,8 +23,6 @@ from .config import AgentMetadata, RuntimeConfig, default_config, metadata
__version__ = "1.0.0"
__all__ = [
"HiveCoderAgent",
"default_agent",
"goal",
"nodes",
"edges",
+14 -177
View File
@@ -1,13 +1,13 @@
"""CLI entry point for Hive Coder agent."""
import asyncio
import json
import logging
import sys
import click
from .agent import HiveCoderAgent, default_agent
from .agent import entry_node, goal, nodes
from .config import metadata
def setup_logging(verbose=False, debug=False):
@@ -29,112 +29,22 @@ def cli():
pass
@cli.command()
@click.option("--request", "-r", type=str, required=True, help="What agent to build")
@click.option("--mock", is_flag=True, help="Run in mock mode")
@click.option("--quiet", "-q", is_flag=True, help="Only output result JSON")
@click.option("--verbose", "-v", is_flag=True, help="Show execution details")
@click.option("--debug", is_flag=True, help="Show debug logging")
def run(request, mock, quiet, verbose, debug):
"""Execute agent building from a request."""
if not quiet:
setup_logging(verbose=verbose, debug=debug)
context = {"user_request": request}
result = asyncio.run(default_agent.run(context, mock_mode=mock))
output_data = {
"success": result.success,
"steps_executed": result.steps_executed,
"output": result.output,
}
if result.error:
output_data["error"] = result.error
click.echo(json.dumps(output_data, indent=2, default=str))
sys.exit(0 if result.success else 1)
@cli.command()
@click.option("--mock", is_flag=True, help="Run in mock mode")
@click.option("--verbose", "-v", is_flag=True, help="Show execution details")
@click.option("--debug", is_flag=True, help="Show debug logging")
def tui(mock, verbose, debug):
"""Launch the TUI dashboard for interactive agent building."""
setup_logging(verbose=verbose, debug=debug)
try:
from framework.tui.app import AdenTUI
except ImportError:
click.echo("TUI requires the 'textual' package. Install with: pip install textual")
sys.exit(1)
from pathlib import Path
from framework.llm import LiteLLMProvider
from framework.runner.tool_registry import ToolRegistry
from framework.runtime.agent_runtime import create_agent_runtime
from framework.runtime.execution_stream import EntryPointSpec
async def run_with_tui():
agent = HiveCoderAgent()
agent._tool_registry = ToolRegistry()
storage_path = Path.home() / ".hive" / "agents" / "hive_coder"
storage_path.mkdir(parents=True, exist_ok=True)
mcp_config_path = Path(__file__).parent / "mcp_servers.json"
if mcp_config_path.exists():
agent._tool_registry.load_mcp_config(mcp_config_path)
llm = None
if not mock:
llm = LiteLLMProvider(
model=agent.config.model,
api_key=agent.config.api_key,
api_base=agent.config.api_base,
)
tools = list(agent._tool_registry.get_tools().values())
tool_executor = agent._tool_registry.get_executor()
graph = agent._build_graph()
runtime = create_agent_runtime(
graph=graph,
goal=agent.goal,
storage_path=storage_path,
entry_points=[
EntryPointSpec(
id="start",
name="Build Agent",
entry_node="coder",
trigger_type="manual",
isolation_level="isolated",
),
],
llm=llm,
tools=tools,
tool_executor=tool_executor,
)
await runtime.start()
try:
app = AdenTUI(runtime)
await app.run_async()
finally:
await runtime.stop()
asyncio.run(run_with_tui())
@cli.command()
@click.option("--json", "output_json", is_flag=True)
def info(output_json):
"""Show agent information."""
info_data = default_agent.info()
info_data = {
"name": metadata.name,
"version": metadata.version,
"description": metadata.description,
"goal": {
"name": goal.name,
"description": goal.description,
},
"nodes": [n.id for n in nodes],
"entry_node": entry_node,
"client_facing_nodes": [n.id for n in nodes if n.client_facing],
}
if output_json:
click.echo(json.dumps(info_data, indent=2))
else:
@@ -144,79 +54,6 @@ def info(output_json):
click.echo(f"\nNodes: {', '.join(info_data['nodes'])}")
click.echo(f"Client-facing: {', '.join(info_data['client_facing_nodes'])}")
click.echo(f"Entry: {info_data['entry_node']}")
click.echo(f"Terminal: {', '.join(info_data['terminal_nodes']) or '(forever-alive)'}")
@cli.command()
def validate():
"""Validate agent structure."""
validation = default_agent.validate()
if validation["valid"]:
click.echo("Agent is valid")
if validation["warnings"]:
for warning in validation["warnings"]:
click.echo(f" WARNING: {warning}")
else:
click.echo("Agent has errors:")
for error in validation["errors"]:
click.echo(f" ERROR: {error}")
sys.exit(0 if validation["valid"] else 1)
@cli.command()
@click.option("--verbose", "-v", is_flag=True)
def shell(verbose):
"""Interactive agent building session (CLI, no TUI)."""
asyncio.run(_interactive_shell(verbose))
async def _interactive_shell(verbose=False):
"""Async interactive shell."""
setup_logging(verbose=verbose)
click.echo("=== Hive Coder ===")
click.echo("Describe the agent you want to build (or 'quit' to exit):\n")
agent = HiveCoderAgent()
await agent.start()
try:
while True:
try:
request = await asyncio.get_event_loop().run_in_executor(None, input, "Build> ")
if request.lower() in ["quit", "exit", "q"]:
click.echo("Goodbye!")
break
if not request.strip():
continue
click.echo("\nBuilding agent...\n")
result = await agent.trigger_and_wait("default", {"user_request": request})
if result is None:
click.echo("\n[Execution timed out]\n")
continue
if result.success:
output = result.output or {}
agent_name = output.get("agent_name", "unknown")
validation = output.get("validation_result", "unknown")
click.echo(f"\nAgent '{agent_name}' built. Validation: {validation}\n")
else:
click.echo(f"\nBuild failed: {result.error}\n")
except KeyboardInterrupt:
click.echo("\nGoodbye!")
break
except Exception as e:
click.echo(f"Error: {e}", err=True)
import traceback
traceback.print_exc()
finally:
await agent.stop()
if __name__ == "__main__":
+4 -208
View File
@@ -1,27 +1,13 @@
"""Agent graph construction for Hive Coder."""
from pathlib import Path
from framework.graph import Constraint, Goal, SuccessCriterion
from framework.graph.checkpoint_config import CheckpointConfig
from framework.graph.edge import GraphSpec
from framework.graph.executor import ExecutionResult
from framework.llm import LiteLLMProvider
from framework.runner.tool_registry import ToolRegistry
from framework.runtime.agent_runtime import AgentRuntime, create_agent_runtime
from framework.runtime.execution_stream import EntryPointSpec
from .config import default_config, metadata
from .nodes import coder_node, queen_node
# ticket_receiver is no longer needed — the queen runs as an independent
# GraphExecutor and receives escalation tickets via inject_event().
# Keeping the import commented for reference:
# from .ticket_receiver import TICKET_RECEIVER_ENTRY_POINT
# Goal definition
goal = Goal(
id="agent-builder",
id="hive-coder",
name="Hive Agent Builder",
description=(
"Build complete, validated Hive agent packages from natural language "
@@ -57,7 +43,7 @@ goal = Goal(
id="framework-compliance",
description=(
"Generated code follows framework patterns: STEP 1/STEP 2 "
"for client-facing, correct imports, entry_points format"
"for client-facing and correct imports"
),
metric="pattern_compliance",
target="100%",
@@ -99,14 +85,14 @@ goal = Goal(
# GraphExecutor with queen_node — not as part of this graph.
nodes = [coder_node]
# No edges needed — single forever-alive event_loop node
# No edges needed — single event_loop node
edges = []
# Graph configuration
entry_node = "coder"
entry_points = {"start": "coder"}
pause_nodes = []
terminal_nodes = [] # Forever-alive: loops until user exits
terminal_nodes = [] # Coder node has output_keys and can terminate
# No async entry points needed — the queen is now an independent executor,
# not a secondary graph receiving events via add_graph().
@@ -165,193 +151,3 @@ queen_graph = GraphSpec(
"max_history_tokens": 32000,
},
)
class HiveCoderAgent:
"""
Hive Coder builds Hive agent packages from natural language.
Single-node architecture: the coder runs in a continuous while(true) loop.
The queen runs as an independent GraphExecutor (loaded by the TUI via
_load_judge_and_queen), not as part of this graph.
"""
def __init__(self, config=None):
self.config = config or default_config
self.goal = goal
self.nodes = nodes
self.edges = edges
self.entry_node = entry_node
self.entry_points = entry_points
self.pause_nodes = pause_nodes
self.terminal_nodes = terminal_nodes
self.async_entry_points = async_entry_points
self._graph: GraphSpec | None = None
self._agent_runtime: AgentRuntime | None = None
self._tool_registry: ToolRegistry | None = None
self._storage_path: Path | None = None
def _build_graph(self) -> GraphSpec:
"""Build the GraphSpec."""
return GraphSpec(
id="hive-coder-graph",
goal_id=self.goal.id,
version="1.0.0",
entry_node=self.entry_node,
entry_points=self.entry_points,
terminal_nodes=self.terminal_nodes,
pause_nodes=self.pause_nodes,
nodes=self.nodes,
edges=self.edges,
default_model=self.config.model,
max_tokens=self.config.max_tokens,
loop_config=loop_config,
conversation_mode=conversation_mode,
identity_prompt=identity_prompt,
async_entry_points=self.async_entry_points,
)
def _setup(self, mock_mode=False) -> None:
"""Set up the agent runtime."""
self._storage_path = Path.home() / ".hive" / "agents" / "hive_coder"
self._storage_path.mkdir(parents=True, exist_ok=True)
self._tool_registry = ToolRegistry()
mcp_config_path = Path(__file__).parent / "mcp_servers.json"
if mcp_config_path.exists():
self._tool_registry.load_mcp_config(mcp_config_path)
llm = None
if not mock_mode:
llm = LiteLLMProvider(
model=self.config.model,
api_key=self.config.api_key,
api_base=self.config.api_base,
)
tool_executor = self._tool_registry.get_executor()
tools = list(self._tool_registry.get_tools().values())
self._graph = self._build_graph()
checkpoint_config = CheckpointConfig(
enabled=True,
checkpoint_on_node_start=False,
checkpoint_on_node_complete=True,
checkpoint_max_age_days=7,
async_checkpoint=True,
)
entry_point_specs = [
EntryPointSpec(
id="default",
name="Default",
entry_node=self.entry_node,
trigger_type="manual",
isolation_level="shared",
),
]
self._agent_runtime = create_agent_runtime(
graph=self._graph,
goal=self.goal,
storage_path=self._storage_path,
entry_points=entry_point_specs,
llm=llm,
tools=tools,
tool_executor=tool_executor,
checkpoint_config=checkpoint_config,
graph_id="hive_coder",
)
async def start(self, mock_mode=False) -> None:
"""Set up and start the agent runtime."""
if self._agent_runtime is None:
self._setup(mock_mode=mock_mode)
if not self._agent_runtime.is_running:
await self._agent_runtime.start()
async def stop(self) -> None:
"""Stop the agent runtime and clean up."""
if self._agent_runtime and self._agent_runtime.is_running:
await self._agent_runtime.stop()
self._agent_runtime = None
async def trigger_and_wait(
self,
entry_point: str = "default",
input_data: dict | None = None,
timeout: float | None = None,
session_state: dict | None = None,
) -> ExecutionResult | None:
"""Execute the graph and wait for completion."""
if self._agent_runtime is None:
raise RuntimeError("Agent not started. Call start() first.")
return await self._agent_runtime.trigger_and_wait(
entry_point_id=entry_point,
input_data=input_data or {},
session_state=session_state,
)
async def run(self, context: dict, mock_mode=False, session_state=None) -> ExecutionResult:
"""Run the agent (convenience method for single execution)."""
await self.start(mock_mode=mock_mode)
try:
result = await self.trigger_and_wait("default", context, session_state=session_state)
return result or ExecutionResult(success=False, error="Execution timeout")
finally:
await self.stop()
def info(self):
"""Get agent information."""
return {
"name": metadata.name,
"version": metadata.version,
"description": metadata.description,
"goal": {
"name": self.goal.name,
"description": self.goal.description,
},
"nodes": [n.id for n in self.nodes],
"edges": [e.id for e in self.edges],
"entry_node": self.entry_node,
"entry_points": self.entry_points,
"pause_nodes": self.pause_nodes,
"terminal_nodes": self.terminal_nodes,
"client_facing_nodes": [n.id for n in self.nodes if n.client_facing],
}
def validate(self):
"""Validate agent structure."""
errors = []
warnings = []
node_ids = {node.id for node in self.nodes}
for edge in self.edges:
if edge.source not in node_ids:
errors.append(f"Edge {edge.id}: source '{edge.source}' not found")
if edge.target not in node_ids:
errors.append(f"Edge {edge.id}: target '{edge.target}' not found")
if self.entry_node not in node_ids:
errors.append(f"Entry node '{self.entry_node}' not found")
for terminal in self.terminal_nodes:
if terminal not in node_ids:
errors.append(f"Terminal node '{terminal}' not found")
for ep_id, node_id in self.entry_points.items():
if node_id not in node_ids:
errors.append(f"Entry point '{ep_id}' references unknown node '{node_id}'")
return {
"valid": len(errors) == 0,
"errors": errors,
"warnings": warnings,
}
# Create default instance
default_agent = HiveCoderAgent()
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,80 @@
"""Queen thinking hook — HR persona classifier.
Fires once when the queen enters building mode at session start.
Makes a single non-streaming LLM call (acting as an HR Director) to select
the best-fit expert persona for the user's request, then returns a persona
prefix string that replaces the queen's default "Solution Architect" identity.
This is designed to activate the model's latent domain expertise — a CFO
persona on a financial question, a Lawyer on a legal question, etc.
"""
from __future__ import annotations
import json
import logging
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from framework.llm.provider import LLMProvider
logger = logging.getLogger(__name__)
_HR_SYSTEM_PROMPT = """\
You are an expert HR Director and talent consultant at a world-class firm.
A new request has arrived and you must identify which professional's expertise
would produce the highest-quality response.
Reply with ONLY a valid JSON object no markdown, no prose, no explanation:
{"role": "<job title>", "persona": "<2-3 sentence first-person identity statement>"}
Rules:
- Choose from any real professional role: CFO, CEO, CTO, Lawyer, Data Scientist,
Product Manager, Security Engineer, DevOps Engineer, Software Architect,
HR Director, Marketing Director, Business Analyst, UX Designer,
Financial Analyst, Operations Director, Legal Counsel, etc.
- The persona statement must be written in first person ("I am..." or "I have...").
- Select the role whose domain knowledge most directly applies to solving the request.
- If the request is clearly about coding or building software systems, pick Software Architect.
- "Queen" is your internal alias do not include it in the persona.
"""
async def select_expert_persona(user_message: str, llm: LLMProvider) -> str:
"""Run the HR classifier and return a persona prefix string.
Makes a single non-streaming acomplete() call with the session LLM.
Returns an empty string on any failure so the queen falls back
gracefully to its default "Solution Architect" identity.
Args:
user_message: The user's opening message for the session.
llm: The session LLM provider.
Returns:
A persona prefix like "You are a CFO. I am a CFO with 20 years..."
or "" on failure.
"""
if not user_message.strip():
return ""
try:
response = await llm.acomplete(
messages=[{"role": "user", "content": user_message}],
system=_HR_SYSTEM_PROMPT,
max_tokens=1024,
json_mode=True,
)
raw = response.content.strip()
parsed = json.loads(raw)
role = parsed.get("role", "").strip()
persona = parsed.get("persona", "").strip()
if not role or not persona:
logger.warning("Thinking hook: empty role/persona in response: %r", raw)
return ""
result = f"You are a {role}. {persona}"
logger.info("Thinking hook: selected persona — %s", role)
return result
except Exception:
logger.warning("Thinking hook: persona classification failed", exc_info=True)
return ""
@@ -1,113 +1,33 @@
# Common Mistakes When Building Hive Agents
## Critical Errors
1. **Using tools that don't exist** — Always verify tools are available in the hive-tools MCP server before assigning them to nodes. Never guess tool names.
2. **Wrong entry_points format** — MUST be `{"start": "first-node-id"}`. NOT a set, NOT `{node_id: [keys]}`.
3. **Wrong mcp_servers.json format** — Flat dict (no `"mcpServers"` wrapper). `cwd` must be `"../../tools"`. `command` must be `"uv"` with args `["run", "python", ...]`.
4. **Missing STEP 1/STEP 2 in client-facing prompts** — Without explicit phases, the LLM calls set_output before the user responds. Always use the pattern.
5. **Forgetting nullable_output_keys** — When a node receives inputs from multiple edges and some inputs only arrive on certain edges (e.g., feedback), mark those as nullable. Without this, the executor blocks waiting for a value that will never arrive.
6. **Creating dead-end nodes in forever-alive graphs** — Every node must have at least one outgoing edge. A node with no outgoing edges ends the execution, breaking the loop.
7. **Setting max_node_visits to a non-zero value in forever-alive agents** — The framework default is `max_node_visits=0` (unbounded). Setting it to any positive value (e.g., 1) means the node stops executing after that many visits, silently breaking the forever-alive loop. Only set `max_node_visits > 0` in one-shot agents with feedback loops that need bounded retries.
7. **Missing module-level exports in `__init__.py`** — The runner loads agents via `importlib.import_module(package_name)`, which imports `__init__.py`. It then reads `goal`, `nodes`, `edges`, `entry_node`, `entry_points`, `pause_nodes`, `terminal_nodes`, `conversation_mode`, `identity_prompt`, `loop_config` via `getattr()`. If ANY of these are missing from `__init__.py`, they default to `None` or `{}` — causing "must define goal, nodes, edges" errors or "node X is unreachable" validation failures. **ALL module-level variables from agent.py must be re-exported in `__init__.py`.**
1. **Using tools that don't exist** — Always verify tools via `list_agent_tools()` before designing. Common hallucinations: `csv_read`, `csv_write`, `file_upload`, `database_query`, `bulk_fetch_emails`.
2. **Wrong mcp_servers.json format** — Flat dict (no `"mcpServers"` wrapper). `cwd` must be `"../../tools"`. `command` must be `"uv"` with args `["run", "python", ...]`.
3. **Missing module-level exports in `__init__.py`** — The runner reads `goal`, `nodes`, `edges`, `entry_node`, `entry_points`, `terminal_nodes`, `conversation_mode`, `identity_prompt`, `loop_config` via `getattr()`. ALL module-level variables from agent.py must be re-exported in `__init__.py`.
## Value Errors
8. **Invalid `conversation_mode` value** — Only two valid values: `"continuous"` (recommended for interactive agents) or omit entirely (for isolated per-node conversations). Values like `"client_facing"`, `"interactive"`, `"adaptive"` do NOT exist and will cause runtime errors.
9. **Invalid `loop_config` keys** — Only three valid keys: `max_iterations` (int), `max_tool_calls_per_turn` (int), `max_history_tokens` (int). Keys like `"strategy"`, `"mode"`, `"timeout"` are NOT valid and are silently ignored or cause errors.
10. **Fabricating tools that don't exist** — Never guess tool names. Always verify via `list_agent_tools()` before designing and `validate_agent_tools()` after building. Common hallucinations: `csv_read`, `csv_write`, `csv_append`, `file_upload`, `database_query`, `bulk_fetch_emails`. If a required tool doesn't exist, redesign the agent to use tools that DO exist (e.g., `save_data`/`load_data` for data persistence).
4. **Fabricating tools** — Always verify via `list_agent_tools()` before designing and `validate_agent_package()` after building.
## Design Errors
11. **Too many thin nodes** — Hard limit: **2-4 nodes** for most agents. Each node boundary serializes outputs to shared memory and loses all in-context information (tool results, intermediate reasoning, conversation history). A node with 0 tools that just does LLM reasoning is NOT a real node — merge it into its predecessor or successor.
**Merge when:**
- Node has NO tools — pure LLM reasoning belongs in the node that produces or consumes its data
- Node sets only 1 trivial output (e.g., `set_output("done", "true")`) — collapse into predecessor
- Multiple consecutive autonomous nodes with same/similar tools — combine into one
- A "report" or "summary" node that just presents analysis — merge into the client-facing node
- A "schedule" or "confirm" node that doesn't actually schedule anything — remove entirely
**Keep separate when:**
- Client-facing vs autonomous — different interaction models require separate nodes
- Fundamentally different tool sets (e.g., web search vs file I/O)
- Fan-out parallelism — parallel branches MUST be separate nodes
**Bad example** (7 nodes — WAY too many):
```
profile_setup → daily_intake → update_tracker → analyze_progress → generate_plan → schedule_reminders → report
```
`analyze_progress` has no tools. `schedule_reminders` just sets one boolean. `report` just presents analysis. `update_tracker` and `generate_plan` are sequential autonomous work.
**Good example** (2 nodes):
```
process (autonomous: track + analyze + plan) → review (client-facing) → process (loop back)
```
The queen handles intake (gathering requirements from the user) and passes the task via `run_agent_with_input(task)`. One autonomous node handles ALL backend work (CSV update, analysis, plan generation) with tools and context preserved. One client-facing node handles review/approval when needed.
12. **Adding framework gating for LLM behavior** — Don't add output rollback, premature rejection, or interaction protocol injection. Fix with better prompts or custom judges.
13. **Not using continuous conversation mode** — Interactive agents should use `conversation_mode="continuous"`. Without it, each node starts with blank context.
14. **Adding terminal nodes by default** — ALL agents should use `terminal_nodes=[]` (forever-alive) unless the user explicitly requests a one-shot/batch agent. Forever-alive is the standard pattern. Every node must have at least one outgoing edge. Dead-end nodes break the loop.
15. **Calling set_output in same turn as tool calls** — Instruct the LLM to call set_output in a SEPARATE turn from real tool calls.
5. **Adding framework gating for LLM behavior** — Don't add output rollback or premature rejection. Fix with better prompts or custom judges.
6. **Calling set_output in same turn as tool calls** — Call set_output in a SEPARATE turn.
## File Template Errors
16. **Wrong import paths**Use `from framework.graph import ...`, NOT `from core.framework.graph import ...`. The PYTHONPATH includes `core/`.
17. **Missing storage path** — Agent class must set `self._storage_path = Path.home() / ".hive" / "agents" / "agent_name"`.
18. **Missing mcp_servers.json** — Without this, the agent has no tools at runtime.
19. **Bare `python` command in mcp_servers.json** — Use `"command": "uv"` with args `["run", "python", ...]`.
7. **Wrong import paths** — Use `from framework.graph import ...`, NOT `from core.framework.graph import ...`.
8. **Missing storage path**Agent class must set `self._storage_path = Path.home() / ".hive" / "agents" / "agent_name"`.
9. **Missing mcp_servers.json** — Without this, the agent has no tools at runtime.
10. **Bare `python` command** — Use `"command": "uv"` with args `["run", "python", ...]`.
## Testing Errors
11. **Using `runner.run()` on forever-alive agents**`runner.run()` hangs forever because forever-alive agents have no terminal node. Write structural tests instead: validate graph structure, verify node specs, test `AgentRunner.load()` succeeds (no API key needed).
12. **Stale tests after restructuring** — When changing nodes/edges, update tests to match. Tests referencing old node names will fail.
13. **Running integration tests without API keys** — Use `pytest.skip()` when credentials are missing.
14. **Forgetting sys.path setup in conftest.py** — Tests need `exports/` and `core/` on sys.path.
20. **Using `runner.run()` on forever-alive agents**`runner.run()` calls `trigger_and_wait()` which blocks until the graph reaches a terminal node. Forever-alive agents have `terminal_nodes=[]`, so **`runner.run()` hangs forever**. This is the #1 cause of stuck test suites.
## GCU Errors
15. **Manually wiring browser tools on event_loop nodes** — Use `node_type="gcu"` which auto-includes browser tools. Do NOT manually list browser tool names.
16. **Using GCU nodes as regular graph nodes** — GCU nodes are subagents only. They must ONLY appear in `sub_agents=["gcu-node-id"]` and be invoked via `delegate_to_sub_agent()`. Never connect via edges or use as entry/terminal nodes.
**For forever-alive agents, write structural tests instead:**
- Validate graph structure (nodes, edges, entry points)
- Verify node specs (tools, prompts, client-facing flag)
- Check goal/constraints/success criteria definitions
- Test that `AgentRunner.load()` succeeds (structural, no API key needed)
**What NOT to do:**
```python
# WRONG — hangs forever on forever-alive agents
result = await runner.run({"topic": "quantum computing"})
```
**Correct pattern for structure tests:**
```python
def test_research_has_web_tools(self):
assert "web_search" in research_node.tools
def test_research_routes_back_to_interact(self):
edges_to_interact = [e for e in edges if e.source == "research" and e.target == "interact"]
assert edges_to_interact
```
21. **Stale tests after agent restructuring** — When you change an agent's node count or names (e.g., 4 nodes → 2 nodes), the tests MUST be updated too. Tests referencing old node names (e.g., `"review"`, `"report"`) will fail or hang. Always check that test assertions match the current `nodes/__init__.py`.
22. **Running full integration tests without API keys** — Structural tests (validate, import) work without keys. Full integration tests need `ANTHROPIC_API_KEY`. Use `pytest.skip()` in the runner fixture when `_setup()` fails due to missing credentials.
23. **Forgetting sys.path setup in conftest.py** — Tests need `exports/` and `core/` on sys.path.
24. **Not using auto_responder for client-facing nodes** — Tests with client-facing nodes hang without an auto-responder that injects input. But note: even WITH auto_responder, forever-alive agents still hang because the graph never terminates. Auto-responder only helps for agents with terminal nodes.
25. **Manually wiring browser tools on event_loop nodes** — If the agent needs browser automation, use `node_type="gcu"` which auto-includes all browser tools and prepends best-practices guidance. Do NOT manually list browser tool names on event_loop nodes — they may not exist in the MCP server or may be incomplete. See the GCU Guide appendix.
26. **Using GCU nodes as regular graph nodes** — GCU nodes (`node_type="gcu"`) are exclusively subagents. They must ONLY appear in a parent node's `sub_agents=["gcu-node-id"]` list and be invoked via `delegate_to_sub_agent()`. They must NEVER be connected via edges, used as entry nodes, or used as terminal nodes. If a GCU node appears as an edge source or target, the graph will fail pre-load validation.
27. **Adding a client-facing intake node to worker agents** — The queen owns intake. She defines the entry node's `input_keys` at build time and fills them via `run_agent_with_input(task)` at run time. Worker agents should start with an autonomous processing node, NOT a client-facing intake node that asks the user for requirements. Client-facing nodes in workers are for mid-execution review/approval only.
## Worker Agent Errors
17. **Adding client-facing intake node to workers** — The queen owns intake. Workers should start with an autonomous processing node. Client-facing nodes in workers are for mid-execution review/approval only.
18. **Putting `escalate` or `set_output` in NodeSpec `tools=[]`** — These are synthetic framework tools, auto-injected at runtime. Only list MCP tools from `list_agent_tools()`.
@@ -84,35 +84,36 @@ Work in phases:
tools=["web_search", "web_scrape", "save_data", "load_data", "list_data_files"],
)
# Node 3: Review (client-facing)
review_node = NodeSpec(
id="review",
name="Review",
description="Present results for user approval",
# Node 2: Handoff (autonomous)
handoff_node = NodeSpec(
id="handoff",
name="Handoff",
description="Prepare worker results for queen review",
node_type="event_loop",
client_facing=True,
client_facing=False,
max_node_visits=0,
input_keys=["results", "user_request"],
output_keys=["next_action", "feedback"],
nullable_output_keys=["feedback"],
success_criteria="User has reviewed and decided next steps.",
output_keys=["next_action", "feedback", "worker_summary"],
nullable_output_keys=["feedback", "worker_summary"],
success_criteria="Results are packaged for queen decision-making.",
system_prompt="""\
Present the results to the user.
Do NOT talk to the user directly. The queen is the only user interface.
**STEP 1 — Present (text only, NO tool calls):**
1. Summary of work done
2. Key results
3. Ask: satisfied, or want changes?
If blocked by tool failures, missing credentials, or unclear constraints, call:
- escalate(reason, context)
Then set:
- set_output("next_action", "escalated")
- set_output("feedback", "what help is needed")
**STEP 2 — After user responds, call set_output:**
- set_output("next_action", "done") — if satisfied
- set_output("next_action", "revise") — if changes needed
- set_output("feedback", "what to change") only if revising
Otherwise summarize findings for queen and set:
- set_output("worker_summary", "short summary for queen")
- set_output("next_action", "done") or set_output("next_action", "revise")
- set_output("feedback", "what to revise") only when revising
""",
tools=[],
)
__all__ = ["process_node", "review_node"]
__all__ = ["process_node", "handoff_node"]
```
## agent.py
@@ -132,7 +133,7 @@ from framework.runtime.agent_runtime import AgentRuntime, create_agent_runtime
from framework.runtime.execution_stream import EntryPointSpec
from .config import default_config, metadata
from .nodes import process_node, review_node
from .nodes import process_node, handoff_node
# Goal definition
goal = Goal(
@@ -149,18 +150,22 @@ goal = Goal(
)
# Node list
nodes = [process_node, review_node]
nodes = [process_node, handoff_node]
# Edge definitions
edges = [
EdgeSpec(id="process-to-review", source="process", target="review",
EdgeSpec(id="process-to-handoff", source="process", target="handoff",
condition=EdgeCondition.ON_SUCCESS, priority=1),
# Feedback loop — revise results
EdgeSpec(id="review-to-process", source="review", target="process",
EdgeSpec(id="handoff-to-process", source="handoff", target="process",
condition=EdgeCondition.CONDITIONAL,
condition_expr="str(next_action).lower() == 'revise'", priority=2),
# Loop back for next task (queen sends new input)
EdgeSpec(id="review-done", source="review", target="process",
# Escalation loop — queen injects guidance and worker retries
EdgeSpec(id="handoff-escalated", source="handoff", target="process",
condition=EdgeCondition.CONDITIONAL,
condition_expr="str(next_action).lower() == 'escalated'", priority=3),
# Loop back for next task after queen decision
EdgeSpec(id="handoff-done", source="handoff", target="process",
condition=EdgeCondition.CONDITIONAL,
condition_expr="str(next_action).lower() == 'done'", priority=1),
]
@@ -267,16 +272,60 @@ class MyAgent:
}
def validate(self):
"""Validate graph wiring and entry-point contract."""
errors, warnings = [], []
node_ids = {n.id for n in self.nodes}
for e in self.edges:
if e.source not in node_ids: errors.append(f"Edge {e.id}: source '{e.source}' not found")
if e.target not in node_ids: errors.append(f"Edge {e.id}: target '{e.target}' not found")
if self.entry_node not in node_ids: errors.append(f"Entry node '{self.entry_node}' not found")
if e.source not in node_ids:
errors.append(f"Edge {e.id}: source '{e.source}' not found")
if e.target not in node_ids:
errors.append(f"Edge {e.id}: target '{e.target}' not found")
if self.entry_node not in node_ids:
errors.append(f"Entry node '{self.entry_node}' not found")
for t in self.terminal_nodes:
if t not in node_ids: errors.append(f"Terminal node '{t}' not found")
for ep_id, nid in self.entry_points.items():
if nid not in node_ids: errors.append(f"Entry point '{ep_id}' references unknown node '{nid}'")
if t not in node_ids:
errors.append(f"Terminal node '{t}' not found")
if not isinstance(self.entry_points, dict):
errors.append(
"Invalid entry_points: expected dict[str, str] like "
"{'start': '<entry-node-id>'}. "
f"Got {type(self.entry_points).__name__}. "
"Fix agent.py: set entry_points = {'start': '<entry-node-id>'}."
)
else:
if "start" not in self.entry_points:
errors.append(
"entry_points must include 'start' mapped to entry_node. "
"Example: {'start': '<entry-node-id>'}."
)
else:
start_node = self.entry_points.get("start")
if start_node != self.entry_node:
errors.append(
f"entry_points['start'] points to '{start_node}' "
f"but entry_node is '{self.entry_node}'. Keep these aligned."
)
for ep_id, nid in self.entry_points.items():
if not isinstance(ep_id, str):
errors.append(
f"Invalid entry_points key {ep_id!r} "
f"({type(ep_id).__name__}). Entry point names must be strings."
)
continue
if not isinstance(nid, str):
errors.append(
f"Invalid entry_points['{ep_id}']={nid!r} "
f"({type(nid).__name__}). Node ids must be strings."
)
continue
if nid not in node_ids:
errors.append(
f"Entry point '{ep_id}' references unknown node '{nid}'. "
f"Known nodes: {sorted(node_ids)}"
)
return {"valid": len(errors) == 0, "errors": errors, "warnings": warnings}
@@ -510,6 +559,9 @@ if __name__ == "__main__":
## mcp_servers.json
> **Auto-generated.** `initialize_agent_package` creates this file with hive-tools
> as the default. Only edit manually to add additional MCP servers.
```json
{
"hive-tools": {
@@ -26,7 +26,7 @@ module-level variables via `getattr()`:
| `edges` | YES | `None` | **FATAL** — same error |
| `entry_node` | no | `nodes[0].id` | Probably wrong node |
| `entry_points` | no | `{}` | **Nodes unreachable** — validation fails |
| `terminal_nodes` | no | `[]` | OK for forever-alive |
| `terminal_nodes` | **YES** | `[]` | **FATAL** — graph must have at least one terminal node |
| `pause_nodes` | no | `[]` | OK |
| `conversation_mode` | no | not passed | Isolated mode (no context carryover) |
| `identity_prompt` | no | not passed | No agent-level identity |
@@ -108,7 +108,7 @@ This prevents premature set_output before user interaction.
### Fewer, Richer Nodes (CRITICAL)
**Hard limit: 2-4 nodes for most agents.** Never exceed 5 unless the user
**Hard limit: 3-6 nodes for most agents.** Never exceed 6 unless the user
explicitly requests a complex multi-phase pipeline.
Each node boundary serializes outputs to shared memory and **destroys** all
@@ -165,8 +165,9 @@ review_node = NodeSpec(
)
```
### Forever-Alive Pattern
`terminal_nodes=[]` — every node has outgoing edges, graph loops until user exits.
### Continuous Loop Pattern
Mark the primary event_loop node as terminal: `terminal_nodes=["process"]`.
The node has `output_keys` and can complete when the agent finishes its work.
Use `conversation_mode="continuous"` to preserve context across transitions.
### set_output
@@ -192,16 +193,16 @@ condition_expr examples:
| Pattern | terminal_nodes | When |
|---------|---------------|------|
| **Forever-alive** | `[]` | **DEFAULT for all agents** |
| Linear | `["last-node"]` | Only if user explicitly requests one-shot/batch |
| **Continuous loop** | `["node-with-output-keys"]` | **DEFAULT for all agents** |
| Linear | `["last-node"]` | One-shot/batch agents |
**Forever-alive is the default.** Always use `terminal_nodes=[]`.
The framework default for `max_node_visits` is 0 (unbounded), so
nodes work correctly in forever-alive loops without explicit override.
Only set `max_node_visits > 0` in one-shot agents with feedback loops.
Every node must have at least one outgoing edge — no dead ends. The
user exits by closing the TUI. Only use terminal nodes if the user
explicitly asks for a batch/one-shot agent that runs once and exits.
**Every graph must have at least one terminal node.** Terminal nodes
define where execution ends. For interactive agents that loop continuously,
mark the primary event_loop node as terminal (it has `output_keys` and can
complete at any point). The framework default for `max_node_visits` is 0
(unbounded), so nodes work correctly in continuous loops without explicit
override. Only set `max_node_visits > 0` in one-shot agents with feedback loops.
Every node must have at least one outgoing edge — no dead ends.
## Continuous Conversation Mode
@@ -258,177 +259,57 @@ Judge is the SOLE acceptance mechanism — no ad-hoc framework gating.
## Async Entry Points (Webhooks, Timers, Events)
For agents that need to react to external events (incoming emails, scheduled
tasks, API calls), use `AsyncEntryPointSpec` and optionally `AgentRuntimeConfig`.
### Imports
```python
from framework.graph.edge import GraphSpec, AsyncEntryPointSpec
from framework.runtime.agent_runtime import AgentRuntime, AgentRuntimeConfig, create_agent_runtime
```
Note: `AsyncEntryPointSpec` is in `framework.graph.edge` (the graph/declarative layer).
`AgentRuntimeConfig` is in `framework.runtime.agent_runtime` (the runtime layer).
### AsyncEntryPointSpec Fields
| Field | Type | Default | Description |
|-------|------|---------|-------------|
| id | str | required | Unique identifier |
| name | str | required | Human-readable name |
| entry_node | str | required | Node ID to start execution from |
| trigger_type | str | `"manual"` | `webhook`, `api`, `timer`, `event`, `manual` |
| trigger_config | dict | `{}` | Trigger-specific config (see below) |
| isolation_level | str | `"shared"` | `isolated`, `shared`, `synchronized` |
| priority | int | `0` | Execution priority (higher = more priority) |
| max_concurrent | int | `10` | Max concurrent executions |
### Trigger Types
**timer** — Fires on a schedule. Two modes: cron expressions or fixed interval.
Cron (preferred for precise scheduling):
```python
AsyncEntryPointSpec(
id="daily-digest",
name="Daily Digest",
entry_node="check-node",
trigger_type="timer",
trigger_config={"cron": "0 9 * * *"}, # daily at 9am
isolation_level="shared",
max_concurrent=1,
)
```
- `cron` (str) — standard cron expression (5 fields: min hour dom month dow)
- Examples: `"0 9 * * *"` (daily 9am), `"0 9 * * MON-FRI"` (weekdays 9am), `"*/30 * * * *"` (every 30 min)
Fixed interval (simpler, for polling-style tasks):
```python
AsyncEntryPointSpec(
id="scheduled-check",
name="Scheduled Check",
entry_node="check-node",
trigger_type="timer",
trigger_config={"interval_minutes": 20, "run_immediately": False},
isolation_level="shared",
max_concurrent=1,
)
```
- `interval_minutes` (float) — how often to fire
- `run_immediately` (bool, default False) — fire once on startup
**event** — Subscribes to EventBus (e.g., webhook events):
```python
AsyncEntryPointSpec(
id="email-event",
name="Email Event Handler",
entry_node="process-emails",
trigger_type="event",
trigger_config={"event_types": ["webhook_received"]},
isolation_level="shared",
max_concurrent=10,
)
```
- `event_types` (list[str]) — EventType values to subscribe to
- `filter_stream` (str, optional) — only receive from this stream
- `filter_node` (str, optional) — only receive from this node
**webhook** — HTTP endpoint (requires AgentRuntimeConfig):
The webhook server publishes `WEBHOOK_RECEIVED` events on the EventBus.
An `event` trigger type with `event_types: ["webhook_received"]` subscribes
to those events. The flow is:
```
HTTP POST /webhooks/gmail → WebhookServer → EventBus (WEBHOOK_RECEIVED)
→ event entry point → triggers graph execution from entry_node
```
**manual** — Triggered programmatically via `runtime.trigger()`.
### Isolation Levels
| Level | Meaning |
|-------|---------|
| `isolated` | Private state per execution |
| `shared` | Eventual consistency — async executions can read primary session memory |
| `synchronized` | Shared with write locks (use when ordering matters) |
For most async patterns, use `shared` — the async execution reads the primary
session's memory (e.g., user-configured rules) and runs its own workflow.
### AgentRuntimeConfig (for webhook servers)
For agents that react to external events, use `AsyncEntryPointSpec`:
```python
from framework.graph.edge import AsyncEntryPointSpec
from framework.runtime.agent_runtime import AgentRuntimeConfig
# Timer trigger (cron or interval)
async_entry_points = [
AsyncEntryPointSpec(
id="daily-check",
name="Daily Check",
entry_node="process",
trigger_type="timer",
trigger_config={"cron": "0 9 * * *"}, # daily at 9am
isolation_level="shared",
)
]
# Webhook server (optional)
runtime_config = AgentRuntimeConfig(
webhook_host="127.0.0.1",
webhook_port=8080,
webhook_routes=[
{
"source_id": "gmail",
"path": "/webhooks/gmail",
"methods": ["POST"],
"secret": None, # Optional HMAC-SHA256 secret
},
],
)
```
`runtime_config` is a module-level variable read by `AgentRunner.load()`.
The runner passes it to `create_agent_runtime()`. On `runtime.start()`,
if webhook_routes is non-empty, an embedded HTTP server starts.
### Session Sharing
Timer and event triggers automatically call `_get_primary_session_state()`
before execution. This finds the active user-facing session and provides
its memory to the async execution, filtered to only the async entry node's
`input_keys`. This means the async flow can read user-configured values
(like rules, preferences) without needing separate configuration.
### Module-Level Variables
Agents with async entry points must export two additional variables:
```python
# In agent.py:
async_entry_points = [AsyncEntryPointSpec(...), ...]
runtime_config = AgentRuntimeConfig(...) # Only if using webhooks
```
Both must be re-exported from `__init__.py`:
```python
from .agent import (
..., async_entry_points, runtime_config,
webhook_routes=[{"source_id": "gmail", "path": "/webhooks/gmail", "methods": ["POST"]}],
)
```
### Reference Agent
### Key Fields
- `trigger_type`: `"timer"`, `"event"`, `"webhook"`, `"manual"`
- `trigger_config`: `{"cron": "0 9 * * *"}` or `{"interval_minutes": 20}`
- `isolation_level`: `"shared"` (recommended), `"isolated"`, `"synchronized"`
- `event_types`: For event triggers, e.g., `["webhook_received"]`
See `exports/gmail_inbox_guardian/agent.py` for a complete example with:
- Primary client-facing node (user configures rules)
- Timer-based scheduled inbox checks (every 20 min)
- Webhook-triggered email event handling
- Shared isolation for memory access across streams
### Exports Required
Both `async_entry_points` and `runtime_config` must be exported from `__init__.py`.
## Framework Capabilities
**Works well:** Multi-turn conversations, HITL review, tool orchestration, structured outputs, parallel execution, context management, error recovery, session persistence.
**Limitations:** LLM latency (2-10s/turn), context window limits (~128K), cost per run, rate limits, node boundaries lose context.
**Not designed for:** Sub-second responses, millions of items, real-time streaming, guaranteed determinism, offline/air-gapped.
See `exports/gmail_inbox_guardian/agent.py` for complete example.
## Tool Discovery
Do NOT rely on a static tool list — it will be outdated. Always use
`list_agent_tools()` to discover available tools, grouped by category.
Do NOT rely on a static tool list — it will be outdated. Always call
`list_agent_tools()` with NO arguments first to see ALL available tools.
Only use `group=` or `output_schema=` as follow-up calls after seeing the
full list.
```
list_agent_tools() # names + descriptions, all groups
list_agent_tools(output_schema="full") # include input_schema
list_agent_tools(group="gmail") # only gmail_* tools
list_agent_tools() # ALWAYS call this first
list_agent_tools(group="gmail", output_schema="full") # then drill into a category
list_agent_tools("exports/my_agent/mcp_servers.json") # specific agent's tools
```
After building, validate tools exist: `validate_agent_tools("exports/{name}")`
After building, run `validate_agent_package("{name}")` to check everything at once.
Common tool categories (verify via list_agent_tools):
- **Web**: search, scrape, PDF
-14
View File
@@ -1,21 +1,7 @@
"""Builder interface for analyzing and building agents."""
from framework.builder.query import BuilderQuery
from framework.builder.workflow import (
BuildPhase,
BuildSession,
GraphBuilder,
TestCase,
TestResult,
ValidationResult,
)
__all__ = [
"BuilderQuery",
"GraphBuilder",
"BuildSession",
"BuildPhase",
"ValidationResult",
"TestCase",
"TestResult",
]
-832
View File
@@ -1,832 +0,0 @@
"""
GraphBuilder Workflow - Enforced incremental building with HITL approval.
The build process:
1. Define Goal APPROVE
2. Add Node VALIDATE TEST APPROVE
3. Add Edge VALIDATE TEST APPROVE
4. Repeat until graph is complete
5. Final integration test APPROVE
6. Export
Each step requires validation and human approval before proceeding.
You cannot skip steps or bypass validation.
"""
from collections.abc import Callable
from datetime import datetime
from enum import StrEnum
from pathlib import Path
from typing import Any
from pydantic import BaseModel, Field
from framework.graph.edge import EdgeCondition, EdgeSpec, GraphSpec
from framework.graph.goal import Goal
from framework.graph.node import NodeSpec
class BuildPhase(StrEnum):
"""Current phase of the build process."""
INIT = "init" # Just started
GOAL_DRAFT = "goal_draft" # Drafting goal
GOAL_APPROVED = "goal_approved" # Goal approved
ADDING_NODES = "adding_nodes" # Adding nodes
ADDING_EDGES = "adding_edges" # Adding edges
TESTING = "testing" # Running tests
APPROVED = "approved" # Fully approved
EXPORTED = "exported" # Exported to file
class ValidationResult(BaseModel):
"""Result of a validation check."""
valid: bool
errors: list[str] = Field(default_factory=list)
warnings: list[str] = Field(default_factory=list)
suggestions: list[str] = Field(default_factory=list)
class TestCase(BaseModel):
"""A test case for validating agent behavior."""
id: str
description: str
input: dict[str, Any]
expected_output: Any = None # None means just check it doesn't error
expected_contains: str | None = None
class TestResult(BaseModel):
"""Result of running a test case."""
test_id: str
passed: bool
actual_output: Any = None
error: str | None = None
execution_path: list[str] = Field(default_factory=list)
class BuildSession(BaseModel):
"""
Persistent build session state.
Saved after each approved step so you can resume later.
"""
id: str
name: str
phase: BuildPhase = BuildPhase.INIT
created_at: datetime = Field(default_factory=datetime.now)
updated_at: datetime = Field(default_factory=datetime.now)
# The artifacts being built
goal: Goal | None = None
nodes: list[NodeSpec] = Field(default_factory=list)
edges: list[EdgeSpec] = Field(default_factory=list)
# Test cases
test_cases: list[TestCase] = Field(default_factory=list)
test_results: list[TestResult] = Field(default_factory=list)
# Approval history
approvals: list[dict[str, Any]] = Field(default_factory=list)
# Tools (stored as dicts for serialization)
tools: list[dict[str, Any]] = Field(default_factory=list)
model_config = {"extra": "allow"}
class GraphBuilder:
"""
Enforced incremental graph building with HITL approval.
Usage:
builder = GraphBuilder("my-agent")
# Step 1: Define and approve goal
builder.set_goal(goal)
builder.validate() # Must pass
builder.approve("Goal looks good") # Human approval required
# Step 2: Add nodes one by one
builder.add_node(node_spec)
builder.validate() # Must pass
builder.test(test_case) # Must pass
builder.approve("Node works")
# Step 3: Add edges
builder.add_edge(edge_spec)
builder.validate()
builder.approve("Edge correct")
# Step 4: Final approval
builder.run_all_tests()
builder.final_approve("Ready for production")
# Step 5: Export
graph = builder.export()
"""
def __init__(
self,
name: str,
storage_path: Path | str | None = None,
session_id: str | None = None,
):
self.storage_path = Path(storage_path) if storage_path else Path.home() / ".core" / "builds"
self.storage_path.mkdir(parents=True, exist_ok=True)
if session_id:
self.session = self._load_session(session_id)
else:
self.session = BuildSession(
id=f"build_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
name=name,
)
self._pending_validation: ValidationResult | None = None
# =========================================================================
# PHASE 1: GOAL
# =========================================================================
def set_goal(self, goal: Goal) -> ValidationResult:
"""
Set the goal for this agent.
Returns validation result. Must call approve() after validation passes.
"""
self._require_phase([BuildPhase.INIT, BuildPhase.GOAL_DRAFT])
self.session.goal = goal
self.session.phase = BuildPhase.GOAL_DRAFT
validation = self._validate_goal(goal)
self._pending_validation = validation
self._save_session()
return validation
def _validate_goal(self, goal: Goal) -> ValidationResult:
"""Validate a goal definition."""
errors = []
warnings = []
suggestions = []
if not goal.id:
errors.append("Goal must have an id")
if not goal.name:
errors.append("Goal must have a name")
if not goal.description:
errors.append("Goal must have a description")
if not goal.success_criteria:
errors.append("Goal must have at least one success criterion")
else:
for sc in goal.success_criteria:
if not sc.description:
errors.append(f"Success criterion '{sc.id}' needs a description")
if not goal.constraints:
warnings.append("Consider adding constraints to define boundaries")
if not goal.required_capabilities:
suggestions.append("Specify required_capabilities (e.g., ['llm', 'tools'])")
return ValidationResult(
valid=len(errors) == 0,
errors=errors,
warnings=warnings,
suggestions=suggestions,
)
# =========================================================================
# PHASE 2: NODES
# =========================================================================
def add_node(self, node: NodeSpec) -> ValidationResult:
"""
Add a node to the graph.
Returns validation result. Must call approve() after validation passes.
"""
self._require_phase([BuildPhase.GOAL_APPROVED, BuildPhase.ADDING_NODES])
# Check for duplicate
if any(n.id == node.id for n in self.session.nodes):
return ValidationResult(
valid=False,
errors=[f"Node with id '{node.id}' already exists"],
)
self.session.nodes.append(node)
self.session.phase = BuildPhase.ADDING_NODES
validation = self._validate_node(node)
self._pending_validation = validation
self._save_session()
return validation
def _validate_node(self, node: NodeSpec) -> ValidationResult:
"""Validate a node definition."""
errors = []
warnings = []
suggestions = []
if not node.id:
errors.append("Node must have an id")
if not node.name:
errors.append("Node must have a name")
if not node.description:
warnings.append(f"Node '{node.id}' should have a description")
# Type-specific validation
if node.node_type == "event_loop":
if node.tools and not node.system_prompt:
warnings.append(f"Event loop node '{node.id}' should have a system_prompt")
if node.node_type == "router":
if not node.routes:
errors.append(f"Router node '{node.id}' must specify routes")
# Check input/output keys
if not node.input_keys:
suggestions.append(f"Consider specifying input_keys for '{node.id}'")
if not node.output_keys:
suggestions.append(f"Consider specifying output_keys for '{node.id}'")
return ValidationResult(
valid=len(errors) == 0,
errors=errors,
warnings=warnings,
suggestions=suggestions,
)
def update_node(self, node_id: str, **updates) -> ValidationResult:
"""Update an existing node."""
self._require_phase([BuildPhase.ADDING_NODES])
for i, node in enumerate(self.session.nodes):
if node.id == node_id:
node_dict = node.model_dump()
node_dict.update(updates)
updated_node = NodeSpec(**node_dict)
self.session.nodes[i] = updated_node
validation = self._validate_node(updated_node)
self._pending_validation = validation
self._save_session()
return validation
return ValidationResult(valid=False, errors=[f"Node '{node_id}' not found"])
def remove_node(self, node_id: str) -> ValidationResult:
"""Remove a node (only if no edges reference it)."""
self._require_phase([BuildPhase.ADDING_NODES])
# Check for edge references
for edge in self.session.edges:
if edge.source == node_id or edge.target == node_id:
return ValidationResult(
valid=False,
errors=[f"Cannot remove node '{node_id}': referenced by edge '{edge.id}'"],
)
self.session.nodes = [n for n in self.session.nodes if n.id != node_id]
self._save_session()
return ValidationResult(valid=True)
# =========================================================================
# PHASE 3: EDGES
# =========================================================================
def add_edge(self, edge: EdgeSpec) -> ValidationResult:
"""
Add an edge to the graph.
Returns validation result. Must call approve() after validation passes.
"""
self._require_phase([BuildPhase.ADDING_NODES, BuildPhase.ADDING_EDGES])
# Check for duplicate
if any(e.id == edge.id for e in self.session.edges):
return ValidationResult(
valid=False,
errors=[f"Edge with id '{edge.id}' already exists"],
)
self.session.edges.append(edge)
self.session.phase = BuildPhase.ADDING_EDGES
validation = self._validate_edge(edge)
self._pending_validation = validation
self._save_session()
return validation
def _validate_edge(self, edge: EdgeSpec) -> ValidationResult:
"""Validate an edge definition."""
errors = []
warnings = []
if not edge.id:
errors.append("Edge must have an id")
# Check source exists
if not any(n.id == edge.source for n in self.session.nodes):
errors.append(f"Edge source '{edge.source}' not found in nodes")
# Check target exists
if not any(n.id == edge.target for n in self.session.nodes):
errors.append(f"Edge target '{edge.target}' not found in nodes")
# Warn about conditional edges without expressions
if edge.condition == EdgeCondition.CONDITIONAL and not edge.condition_expr:
warnings.append(f"Conditional edge '{edge.id}' has no condition_expr")
return ValidationResult(
valid=len(errors) == 0,
errors=errors,
warnings=warnings,
)
# =========================================================================
# VALIDATION & TESTING
# =========================================================================
def validate(self) -> ValidationResult:
"""Validate the entire current graph state."""
errors = []
warnings = []
# Must have a goal
if not self.session.goal:
errors.append("No goal defined")
return ValidationResult(valid=False, errors=errors)
# Must have at least one node
if not self.session.nodes:
errors.append("No nodes defined")
# Check for entry node
entry_candidates = []
for node in self.session.nodes:
# A node is an entry candidate if no edges point to it
if not any(e.target == node.id for e in self.session.edges):
entry_candidates.append(node.id)
if len(entry_candidates) == 0 and self.session.nodes:
errors.append("No entry node found (all nodes have incoming edges)")
elif len(entry_candidates) > 1:
warnings.append(f"Multiple entry candidates: {entry_candidates}. Specify one.")
# Check for terminal nodes
terminal_candidates = []
for node in self.session.nodes:
if not any(e.source == node.id for e in self.session.edges):
terminal_candidates.append(node.id)
if not terminal_candidates and self.session.nodes:
warnings.append("No terminal nodes found (all nodes have outgoing edges)")
# Check reachability from ALL entry candidates (not just the first one).
# Agents with async entry points have multiple nodes with no incoming
# edges (e.g., a primary entry node and an event-driven entry node).
if entry_candidates and self.session.nodes:
reachable = set()
for candidate in entry_candidates:
reachable |= self._compute_reachable(candidate)
unreachable = [n.id for n in self.session.nodes if n.id not in reachable]
if unreachable:
errors.append(f"Unreachable nodes: {unreachable}")
validation = ValidationResult(
valid=len(errors) == 0,
errors=errors,
warnings=warnings,
)
self._pending_validation = validation
return validation
def _compute_reachable(self, start: str) -> set[str]:
"""Compute all nodes reachable from start."""
reachable = set()
to_visit = [start]
while to_visit:
current = to_visit.pop()
if current in reachable:
continue
reachable.add(current)
for edge in self.session.edges:
if edge.source == current:
to_visit.append(edge.target)
# Also follow router routes
for node in self.session.nodes:
if node.id == current and node.routes:
for target in node.routes.values():
to_visit.append(target)
return reachable
def add_test(self, test: TestCase) -> None:
"""Add a test case."""
self.session.test_cases.append(test)
self._save_session()
async def run_test_async(
self,
test: TestCase,
executor_factory: Callable,
) -> TestResult:
"""
Run a single test case asynchronously.
This method is safe to call from async contexts (Jupyter, FastAPI, etc.).
executor_factory should return a configured GraphExecutor.
"""
self._require_phase([BuildPhase.ADDING_NODES, BuildPhase.ADDING_EDGES, BuildPhase.TESTING])
self.session.phase = BuildPhase.TESTING
try:
# Build temporary graph for testing
graph = self._build_graph()
executor = executor_factory()
# Run the test
result = await executor.execute(
graph=graph,
goal=self.session.goal,
input_data=test.input,
)
# Check result
passed = result.success
if test.expected_output is not None:
passed = passed and (result.output.get("result") == test.expected_output)
if test.expected_contains:
output_str = str(result.output)
passed = passed and (test.expected_contains in output_str)
test_result = TestResult(
test_id=test.id,
passed=passed,
actual_output=result.output,
execution_path=result.path,
)
except Exception as e:
test_result = TestResult(
test_id=test.id,
passed=False,
error=str(e),
)
self.session.test_results.append(test_result)
self._save_session()
return test_result
def run_test(
self,
test: TestCase,
executor_factory: Callable,
) -> TestResult:
"""
Run a single test case.
This is a synchronous wrapper around run_test_async().
If called from an async context (Jupyter, FastAPI, etc.), use run_test_async() instead.
executor_factory should return a configured GraphExecutor.
"""
import asyncio
# Check if an event loop is already running
# get_running_loop() returns a loop if one exists, or raises RuntimeError if none exists
try:
asyncio.get_running_loop()
except RuntimeError:
# No event loop running - safe to use asyncio.run()
return asyncio.run(self.run_test_async(test, executor_factory))
# Event loop is running - cannot use asyncio.run()
raise RuntimeError(
"Cannot call run_test() from an async context. "
"An event loop is already running. "
"Please use 'await builder.run_test_async(test, executor_factory)' instead."
)
def run_all_tests(self, executor_factory: Callable) -> list[TestResult]:
"""Run all test cases."""
results = []
for test in self.session.test_cases:
result = self.run_test(test, executor_factory)
results.append(result)
return results
# =========================================================================
# APPROVAL
# =========================================================================
def approve(self, comment: str) -> bool:
"""
Approve the current pending change.
Must have a passing validation to approve.
Returns True if approved, False if validation failed.
"""
if self._pending_validation is None:
raise RuntimeError("Nothing to approve. Run validation first.")
if not self._pending_validation.valid:
return False
self.session.approvals.append(
{
"phase": self.session.phase.value,
"comment": comment,
"timestamp": datetime.now().isoformat(),
"validation": self._pending_validation.model_dump(),
}
)
# Advance phase if appropriate
if self.session.phase == BuildPhase.GOAL_DRAFT:
self.session.phase = BuildPhase.GOAL_APPROVED
self._pending_validation = None
self._save_session()
return True
def final_approve(self, comment: str) -> bool:
"""
Final approval for the complete graph.
Requires all tests to pass.
"""
# Run final validation
validation = self.validate()
if not validation.valid:
self._pending_validation = validation
return False
# Check test results
if self.session.test_cases:
failed_tests = [t for t in self.session.test_results if not t.passed]
if failed_tests:
self._pending_validation = ValidationResult(
valid=False,
errors=[f"Failed tests: {[t.test_id for t in failed_tests]}"],
)
return False
self.session.phase = BuildPhase.APPROVED
self.session.approvals.append(
{
"phase": "final",
"comment": comment,
"timestamp": datetime.now().isoformat(),
}
)
self._save_session()
return True
# =========================================================================
# EXPORT
# =========================================================================
def export(self) -> GraphSpec:
"""
Export the approved graph.
Requires final approval.
"""
self._require_phase([BuildPhase.APPROVED])
graph = self._build_graph()
self.session.phase = BuildPhase.EXPORTED
self._save_session()
return graph
def _build_graph(self) -> GraphSpec:
"""Build a GraphSpec from current session."""
# Determine entry node
entry_node = None
for node in self.session.nodes:
if not any(e.target == node.id for e in self.session.edges):
entry_node = node.id
break
# Determine terminal nodes
terminal_nodes = []
for node in self.session.nodes:
if not any(e.source == node.id for e in self.session.edges):
terminal_nodes.append(node.id)
# Collect all memory keys
memory_keys = set()
for node in self.session.nodes:
memory_keys.update(node.input_keys)
memory_keys.update(node.output_keys)
return GraphSpec(
id=f"{self.session.name}-graph",
goal_id=self.session.goal.id if self.session.goal else "",
entry_node=entry_node or "",
terminal_nodes=terminal_nodes,
nodes=self.session.nodes,
edges=self.session.edges,
memory_keys=list(memory_keys),
)
def export_to_file(self, path: Path | str) -> None:
"""Export the graph to a Python file."""
self._require_phase([BuildPhase.APPROVED, BuildPhase.EXPORTED])
graph = self._build_graph()
# Generate Python code
code = self._generate_code(graph)
Path(path).write_text(code, encoding="utf-8")
self.session.phase = BuildPhase.EXPORTED
self._save_session()
def _generate_code(self, graph: GraphSpec) -> str:
"""Generate Python code for the graph."""
lines = [
'"""',
f"Generated agent: {self.session.name}",
f"Generated at: {datetime.now().isoformat()}",
'"""',
"",
"from framework.graph import (",
" Goal, SuccessCriterion, Constraint,",
" NodeSpec, EdgeSpec, EdgeCondition,",
")",
"from framework.graph.edge import GraphSpec",
"from framework.graph.goal import GoalStatus",
"",
"",
"# Goal",
]
if self.session.goal:
goal_json = self.session.goal.model_dump_json(indent=4)
lines.append("GOAL = Goal.model_validate_json('''")
lines.append(goal_json)
lines.append("''')")
else:
lines.append("GOAL = None")
lines.extend(
[
"",
"",
"# Nodes",
"NODES = [",
]
)
for node in self.session.nodes:
node_json = node.model_dump_json(indent=4)
lines.append(" NodeSpec.model_validate_json('''")
lines.append(node_json)
lines.append(" '''),")
lines.extend(
[
"]",
"",
"",
"# Edges",
"EDGES = [",
]
)
for edge in self.session.edges:
edge_json = edge.model_dump_json(indent=4)
lines.append(" EdgeSpec.model_validate_json('''")
lines.append(edge_json)
lines.append(" '''),")
lines.extend(
[
"]",
"",
"",
"# Graph",
]
)
graph_json = graph.model_dump_json(indent=4)
lines.append("GRAPH = GraphSpec.model_validate_json('''")
lines.append(graph_json)
lines.append("''')")
return "\n".join(lines)
# =========================================================================
# SESSION MANAGEMENT
# =========================================================================
def _require_phase(self, allowed: list[BuildPhase]) -> None:
"""Ensure we're in an allowed phase."""
if self.session.phase not in allowed:
raise RuntimeError(
f"Cannot perform this action in phase '{self.session.phase.value}'. "
f"Allowed phases: {[p.value for p in allowed]}"
)
def _save_session(self) -> None:
"""Save session to disk."""
self.session.updated_at = datetime.now()
path = self.storage_path / f"{self.session.id}.json"
path.write_text(self.session.model_dump_json(indent=2), encoding="utf-8")
def _load_session(self, session_id: str) -> BuildSession:
"""Load session from disk."""
path = self.storage_path / f"{session_id}.json"
if not path.exists():
raise FileNotFoundError(f"Session not found: {session_id}")
return BuildSession.model_validate_json(path.read_text(encoding="utf-8"))
@classmethod
def list_sessions(cls, storage_path: Path | str | None = None) -> list[str]:
"""List all saved sessions."""
path = Path(storage_path) if storage_path else Path.home() / ".core" / "builds"
if not path.exists():
return []
return [f.stem for f in path.glob("*.json")]
# =========================================================================
# STATUS
# =========================================================================
def status(self) -> dict[str, Any]:
"""Get current build status."""
return {
"session_id": self.session.id,
"name": self.session.name,
"phase": self.session.phase.value,
"goal": self.session.goal.name if self.session.goal else None,
"nodes": len(self.session.nodes),
"edges": len(self.session.edges),
"tests": len(self.session.test_cases),
"tests_passed": sum(1 for t in self.session.test_results if t.passed),
"approvals": len(self.session.approvals),
"pending_validation": self._pending_validation.model_dump()
if self._pending_validation
else None,
}
def show(self) -> str:
"""Show current graph as text."""
lines = [
f"=== Build: {self.session.name} ===",
f"Phase: {self.session.phase.value}",
"",
]
if self.session.goal:
lines.extend(
[
f"Goal: {self.session.goal.name}",
f" {self.session.goal.description}",
"",
]
)
if self.session.nodes:
lines.append("Nodes:")
for node in self.session.nodes:
lines.append(f" [{node.id}] {node.name} ({node.node_type})")
lines.append("")
if self.session.edges:
lines.append("Edges:")
for edge in self.session.edges:
lines.append(f" {edge.source} --{edge.condition.value}--> {edge.target}")
lines.append("")
if self._pending_validation:
lines.append("Pending Validation:")
lines.append(f" Valid: {self._pending_validation.valid}")
for err in self._pending_validation.errors:
lines.append(f" ERROR: {err}")
for warn in self._pending_validation.warnings:
lines.append(f" WARN: {warn}")
return "\n".join(lines)
+13 -7
View File
@@ -30,6 +30,7 @@ Usage:
from __future__ import annotations
import json as _json
import logging
import os
import time
@@ -260,6 +261,11 @@ class AdenCredentialClient:
self.config = config
self._client: httpx.Client | None = None
@staticmethod
def _parse_json(response: httpx.Response) -> Any:
"""Parse JSON from response, tolerating UTF-8 BOM."""
return _json.loads(response.content.decode("utf-8-sig"))
def _get_client(self) -> httpx.Client:
if self._client is None:
headers = {
@@ -295,7 +301,7 @@ class AdenCredentialClient:
raise AdenAuthenticationError("Agent API key is invalid or revoked")
if response.status_code == 403:
data = response.json()
data = self._parse_json(response)
raise AdenClientError(data.get("message", "Forbidden"))
if response.status_code == 404:
@@ -309,7 +315,7 @@ class AdenCredentialClient:
)
if response.status_code == 400:
data = response.json()
data = self._parse_json(response)
msg = data.get("message", "Bad request")
if data.get("error") == "refresh_failed" or "refresh" in msg.lower():
raise AdenRefreshError(
@@ -356,7 +362,7 @@ class AdenCredentialClient:
alias, status, email, expires_at.
"""
response = self._request_with_retry("GET", "/v1/credentials")
data = response.json()
data = self._parse_json(response)
return [AdenIntegrationInfo.from_dict(item) for item in data.get("integrations", [])]
# Alias
@@ -376,7 +382,7 @@ class AdenCredentialClient:
"""
try:
response = self._request_with_retry("GET", f"/v1/credentials/{integration_id}")
data = response.json()
data = self._parse_json(response)
return AdenCredentialResponse.from_dict(data, integration_id=integration_id)
except AdenNotFoundError:
return None
@@ -394,7 +400,7 @@ class AdenCredentialClient:
AdenCredentialResponse with new access_token.
"""
response = self._request_with_retry("POST", f"/v1/credentials/{integration_id}/refresh")
data = response.json()
data = self._parse_json(response)
return AdenCredentialResponse.from_dict(data, integration_id=integration_id)
def validate_token(self, integration_id: str) -> dict[str, Any]:
@@ -407,7 +413,7 @@ class AdenCredentialClient:
{"valid": bool, "status": str, "expires_at": str, "error": str|null}
"""
response = self._request_with_retry("GET", f"/v1/credentials/{integration_id}/validate")
return response.json()
return self._parse_json(response)
def health_check(self) -> dict[str, Any]:
"""Check Aden server health."""
@@ -415,7 +421,7 @@ class AdenCredentialClient:
client = self._get_client()
response = client.get("/health")
if response.status_code == 200:
data = response.json()
data = self._parse_json(response)
data["latency_ms"] = response.elapsed.total_seconds() * 1000
return data
return {"status": "degraded", "error": f"HTTP {response.status_code}"}
+1 -1
View File
@@ -568,7 +568,7 @@ def _load_nodes_from_python_agent(agent_path: Path) -> list:
def _load_nodes_from_json_agent(agent_json: Path) -> list:
"""Load nodes from a JSON-based agent."""
try:
with open(agent_json, encoding="utf-8") as f:
with open(agent_json, encoding="utf-8-sig") as f:
data = json.load(f)
from framework.graph import NodeSpec
+3 -3
View File
@@ -203,7 +203,7 @@ class EncryptedFileStorage(CredentialStorage):
# Decrypt
try:
json_bytes = self._fernet.decrypt(encrypted)
data = json.loads(json_bytes.decode())
data = json.loads(json_bytes.decode("utf-8-sig"))
except Exception as e:
raise CredentialDecryptionError(
f"Failed to decrypt credential '{credential_id}': {e}"
@@ -227,7 +227,7 @@ class EncryptedFileStorage(CredentialStorage):
index_path = self.base_path / "metadata" / "index.json"
if not index_path.exists():
return []
with open(index_path, encoding="utf-8") as f:
with open(index_path, encoding="utf-8-sig") as f:
index = json.load(f)
return list(index.get("credentials", {}).keys())
@@ -268,7 +268,7 @@ class EncryptedFileStorage(CredentialStorage):
index_path = self.base_path / "metadata" / "index.json"
if index_path.exists():
with open(index_path, encoding="utf-8") as f:
with open(index_path, encoding="utf-8-sig") as f:
index = json.load(f)
else:
index = {"credentials": {}, "version": "1.0"}
+4 -2
View File
@@ -898,9 +898,11 @@ class NodeConversation:
# Build reference message
ref_parts: list[str] = []
if conv_filename:
full_path = str((spill_path / conv_filename).resolve())
ref_parts.append(
f"[Previous conversation saved to '{conv_filename}'. "
f"Use load_data('{conv_filename}') to review if needed.]"
f"[Previous conversation saved to '{full_path}'. "
f"Use load_data('{conv_filename}'), read_file('{full_path}'), "
f"or run_command('cat \"{full_path}\"') to review if needed.]"
)
elif not collapsed_msgs:
ref_parts.append("[Previous freeform messages compacted.]")
+15 -3
View File
@@ -574,9 +574,14 @@ class GraphSpec(BaseModel):
# Default to main entry
return self.entry_node
def validate(self) -> list[str]:
"""Validate the graph structure."""
def validate(self) -> dict[str, list[str]]:
"""Validate the graph structure.
Returns:
Dict with 'errors' (blocking issues) and 'warnings' (non-blocking).
"""
errors = []
warnings = []
# Check entry node exists
if not self.get_node(self.entry_node):
@@ -618,6 +623,13 @@ class GraphSpec(BaseModel):
if not self.get_node(term):
errors.append(f"Terminal node '{term}' not found")
# Suggest at least one terminal node (graphs should have termination points)
if not self.terminal_nodes:
warnings.append(
"Graph has no terminal nodes defined in 'terminal_nodes'. "
"Consider adding a termination point where execution ends."
)
# Check edge references
for edge in self.edges:
if not self.get_node(edge.source):
@@ -749,4 +761,4 @@ class GraphSpec(BaseModel):
"GCU nodes must be declared as subagents of a parent node."
)
return errors
return {"errors": errors, "warnings": warnings}
+503 -78
View File
@@ -165,6 +165,7 @@ class LoopConfig:
max_tool_calls_per_turn: int = 30
judge_every_n_turns: int = 1
stall_detection_threshold: int = 3
stall_similarity_threshold: float = 0.7
max_history_tokens: int = 32_000
store_prefix: str = ""
@@ -205,6 +206,42 @@ class LoopConfig:
cf_grace_turns: int = 1
tool_doom_loop_enabled: bool = True
# --- Lifecycle hooks ---
# Hooks are async callables keyed by event name. Supported events:
# "session_start" — fires once after the first user message is added,
# before the first LLM turn. trigger = initial message.
# "external_message" — fires when inject_notification() delivers a message.
# trigger = injected message text.
# Each hook receives a HookContext and may return a HookResult to patch
# the system prompt and/or inject a follow-up user message.
hooks: dict[str, list] = None # dict[str, list[HookFn]] (None → no hooks)
def __post_init__(self) -> None:
if self.hooks is None:
object.__setattr__(self, "hooks", {})
# ---------------------------------------------------------------------------
# Hook types
# ---------------------------------------------------------------------------
@dataclass
class HookContext:
"""Context passed to every lifecycle hook."""
event: str # event name, e.g. "session_start"
trigger: str | None # message that triggered the hook, if any
system_prompt: str # current system prompt at hook invocation time
@dataclass
class HookResult:
"""What a hook may return to modify node state."""
system_prompt: str | None = None # replace current system prompt
inject: str | None = None # inject an additional user message
# ---------------------------------------------------------------------------
# Output accumulator with write-through persistence
@@ -476,6 +513,9 @@ class EventLoopNode(NodeProtocol):
if initial_message:
await conversation.add_user_message(initial_message)
# Fire session_start hooks (e.g. persona selection)
await self._run_hooks("session_start", conversation, trigger=initial_message)
# 2a. Guard: ensure at least one non-system message exists.
# A restored conversation may have 0 messages if phase_id filtering
# removes them all, or if a prior run stored metadata without messages
@@ -488,13 +528,16 @@ class EventLoopNode(NodeProtocol):
# 2b. Restore spill counter from existing files (resume safety)
self._restore_spill_counter()
# 3. Build tool list: node tools + synthetic set_output + ask_user + delegate tools
# 3. Build tool list: node tools + synthetic framework tools + delegate tools
tools = list(ctx.available_tools)
set_output_tool = self._build_set_output_tool(ctx.node_spec.output_keys)
if set_output_tool:
tools.append(set_output_tool)
if ctx.node_spec.client_facing and not ctx.event_triggered:
tools.append(self._build_ask_user_tool())
# Workers/subagents can escalate blockers to the queen.
if stream_id not in ("queen", "judge"):
tools.append(self._build_escalate_tool())
# Add delegate_to_sub_agent tool if:
# - Node has sub_agents defined
@@ -578,6 +621,7 @@ class EventLoopNode(NodeProtocol):
_synthetic_names = {
"set_output",
"ask_user",
"escalate",
"delegate_to_sub_agent",
"report_to_parent",
}
@@ -586,12 +630,23 @@ class EventLoopNode(NodeProtocol):
tools.extend(ctx.dynamic_tools_provider())
tools.extend(synthetic)
# 6b3. Dynamic prompt refresh (phase switching)
if ctx.dynamic_prompt_provider is not None:
from framework.graph.prompt_composer import _with_datetime
_new_prompt = _with_datetime(ctx.dynamic_prompt_provider())
if _new_prompt != conversation.system_prompt:
conversation.update_system_prompt(_new_prompt)
logger.info("[%s] Dynamic prompt updated (phase switch)", node_id)
# 6c. Publish iteration event
await self._publish_iteration(stream_id, node_id, iteration, execution_id)
# 6d. Pre-turn compaction check (tiered)
_compacted_this_iter = False
if conversation.needs_compaction():
await self._compact(ctx, conversation, accumulator)
_compacted_this_iter = True
# 6e. Run single LLM turn (with transient error retry)
logger.info(
@@ -613,6 +668,9 @@ class EventLoopNode(NodeProtocol):
user_input_requested,
ask_user_prompt,
ask_user_options,
queen_input_requested,
request_system_prompt,
request_messages,
) = await self._run_single_turn(
ctx, conversation, tools, iteration, accumulator
)
@@ -647,6 +705,8 @@ class EventLoopNode(NodeProtocol):
stream_id=stream_id,
execution_id=execution_id,
iteration=iteration,
system_prompt=request_system_prompt,
messages=request_messages,
assistant_text=assistant_text,
tool_calls=logged_tool_calls,
tool_results=real_tool_results,
@@ -790,8 +850,11 @@ class EventLoopNode(NodeProtocol):
if turn_input > 0:
conversation.update_token_count(turn_input)
# 6e''. Post-turn compaction check (catches tool-result bloat)
if conversation.needs_compaction():
# 6e''. Post-turn compaction check (catches tool-result bloat).
# Skip if pre-turn already compacted this iteration — two compactions
# in one iteration produce back-to-back spillover files and leave the
# agent disoriented on the very next turn.
if not _compacted_this_iter and conversation.needs_compaction():
await self._compact(ctx, conversation, accumulator)
# Reset auto-block grace streak when real work happens
@@ -808,6 +871,7 @@ class EventLoopNode(NodeProtocol):
and not real_tool_results
and not outputs_set
and not user_input_requested
and not queen_input_requested
)
if truly_empty and accumulator is not None:
missing = self._get_missing_output_keys(
@@ -959,8 +1023,9 @@ class EventLoopNode(NodeProtocol):
return NodeResult(
success=False,
error=(
f"Node stalled: {self._config.stall_detection_threshold} "
"consecutive identical responses"
f"Node stalled: {self._config.stall_detection_threshold} similar "
f"responses ({self._config.stall_similarity_threshold * 100:.0f}+"
" threshold)"
),
output=accumulator.to_dict(),
tokens_used=total_input_tokens + total_output_tokens,
@@ -977,7 +1042,7 @@ class EventLoopNode(NodeProtocol):
mcp_tool_calls = [
tc
for tc in logged_tool_calls
if tc.get("tool_name") not in ("set_output", "ask_user")
if tc.get("tool_name") not in ("set_output", "ask_user", "escalate")
]
if mcp_tool_calls:
fps = self._fingerprint_tool_calls(mcp_tool_calls)
@@ -1002,7 +1067,25 @@ class EventLoopNode(NodeProtocol):
"same tool calls with identical arguments. "
"Try a different approach or different arguments."
)
if ctx.node_spec.client_facing and not ctx.event_triggered:
if (
ctx.node_spec.client_facing
and not ctx.event_triggered
and stream_id not in ("queen", "judge")
and self._event_bus is not None
):
await self._event_bus.emit_escalation_requested(
stream_id=stream_id,
node_id=node_id,
reason="Tool doom loop detected",
context=doom_desc,
execution_id=execution_id,
)
await conversation.add_user_message(
"[SYSTEM] Escalated tool doom loop to queen for intervention."
)
recent_tool_fingerprints.clear()
recent_responses.clear()
elif ctx.node_spec.client_facing and not ctx.event_triggered:
await conversation.add_user_message(warning_msg)
await self._await_user_input(ctx, prompt=doom_desc)
recent_tool_fingerprints.clear()
@@ -1223,25 +1306,141 @@ class EventLoopNode(NodeProtocol):
if not _outputs_complete:
_cf_text_only_streak = 0
_continue_count += 1
if ctx.runtime_logger:
iter_latency_ms = int((time.time() - iter_start) * 1000)
ctx.runtime_logger.log_step(
node_id=node_id,
node_type="event_loop",
step_index=iteration,
verdict="CONTINUE",
verdict_feedback=("Blocked for ask_user input (skip judge)"),
tool_calls=logged_tool_calls,
llm_text=assistant_text,
input_tokens=turn_tokens.get("input", 0),
output_tokens=turn_tokens.get("output", 0),
latency_ms=iter_latency_ms,
)
self._log_skip_judge(
ctx,
node_id,
iteration,
"Blocked for ask_user input (skip judge)",
logged_tool_calls,
assistant_text,
turn_tokens,
iter_start,
)
continue
# All outputs set -- fall through to judge
# Auto-block beyond grace -- fall through to judge (6i)
# 6h''. Worker wait for queen guidance
# If a worker escalates with wait_for_response=true, pause here and
# skip judge evaluation until queen injects guidance.
if queen_input_requested:
if self._shutdown:
await self._publish_loop_completed(
stream_id, node_id, iteration + 1, execution_id
)
latency_ms = int((time.time() - start_time) * 1000)
_continue_count += 1
self._log_skip_judge(
ctx,
node_id,
iteration,
"Shutdown signaled (waiting for queen input)",
logged_tool_calls,
assistant_text,
turn_tokens,
iter_start,
)
if ctx.runtime_logger:
ctx.runtime_logger.log_node_complete(
node_id=node_id,
node_name=ctx.node_spec.name,
node_type="event_loop",
success=True,
total_steps=iteration + 1,
tokens_used=total_input_tokens + total_output_tokens,
input_tokens=total_input_tokens,
output_tokens=total_output_tokens,
latency_ms=latency_ms,
exit_status="success",
accept_count=_accept_count,
retry_count=_retry_count,
escalate_count=_escalate_count,
continue_count=_continue_count,
)
return NodeResult(
success=True,
output=accumulator.to_dict(),
tokens_used=total_input_tokens + total_output_tokens,
latency_ms=latency_ms,
conversation=conversation if _is_continuous else None,
)
logger.info("[%s] iter=%d: waiting for queen input...", node_id, iteration)
got_input = await self._await_user_input(ctx, prompt="", emit_client_request=False)
logger.info(
"[%s] iter=%d: queen wait unblocked, got_input=%s",
node_id,
iteration,
got_input,
)
if not got_input:
# Blocked by missing user input - emit escalation before returning
if self._event_bus:
await self._event_bus.emit_escalation_requested(
stream_id=stream_id,
node_id=node_id,
reason="Blocked waiting for queen guidance - no input received",
context=(
"Worker escalated but received no queen guidance before shutdown"
),
execution_id=execution_id,
)
await self._publish_loop_completed(
stream_id, node_id, iteration + 1, execution_id
)
latency_ms = int((time.time() - start_time) * 1000)
_continue_count += 1
self._log_skip_judge(
ctx,
node_id,
iteration,
"No queen input received (shutdown during wait)",
logged_tool_calls,
assistant_text,
turn_tokens,
iter_start,
)
if ctx.runtime_logger:
ctx.runtime_logger.log_node_complete(
node_id=node_id,
node_name=ctx.node_spec.name,
node_type="event_loop",
success=True,
total_steps=iteration + 1,
tokens_used=total_input_tokens + total_output_tokens,
input_tokens=total_input_tokens,
output_tokens=total_output_tokens,
latency_ms=latency_ms,
exit_status="success",
accept_count=_accept_count,
retry_count=_retry_count,
escalate_count=_escalate_count,
continue_count=_continue_count,
)
return NodeResult(
success=True,
output=accumulator.to_dict(),
tokens_used=total_input_tokens + total_output_tokens,
latency_ms=latency_ms,
conversation=conversation if _is_continuous else None,
)
recent_responses.clear()
_cf_text_only_streak = 0
_continue_count += 1
self._log_skip_judge(
ctx,
node_id,
iteration,
"Blocked for queen input (skip judge)",
logged_tool_calls,
assistant_text,
turn_tokens,
iter_start,
)
continue
# 6i. Judge evaluation
should_judge = (
ctx.is_subagent_mode # Always evaluate subagents
@@ -1253,20 +1452,16 @@ class EventLoopNode(NodeProtocol):
if not should_judge:
# Gap C: unjudged iteration — log as CONTINUE
_continue_count += 1
if ctx.runtime_logger:
iter_latency_ms = int((time.time() - iter_start) * 1000)
ctx.runtime_logger.log_step(
node_id=node_id,
node_type="event_loop",
step_index=iteration,
verdict="CONTINUE",
verdict_feedback="Unjudged (judge_every_n_turns skip)",
tool_calls=logged_tool_calls,
llm_text=assistant_text,
input_tokens=turn_tokens.get("input", 0),
output_tokens=turn_tokens.get("output", 0),
latency_ms=iter_latency_ms,
)
self._log_skip_judge(
ctx,
node_id,
iteration,
"Unjudged (judge_every_n_turns skip)",
logged_tool_calls,
assistant_text,
turn_tokens,
iter_start,
)
continue
# Judge evaluation (should_judge is always True here)
@@ -1522,6 +1717,7 @@ class EventLoopNode(NodeProtocol):
prompt: str = "",
*,
options: list[str] | None = None,
emit_client_request: bool = True,
) -> bool:
"""Block until user input arrives or shutdown is signaled.
@@ -1535,6 +1731,9 @@ class EventLoopNode(NodeProtocol):
options: Optional predefined choices for the user (from ask_user).
Passed through to the CLIENT_INPUT_REQUESTED event so the
frontend can render a QuestionWidget with buttons.
emit_client_request: When False, wait silently without publishing
CLIENT_INPUT_REQUESTED. Used for worker waits where input is
expected from the queen via inject_worker_message().
Returns True if input arrived, False if shutdown was signaled.
"""
@@ -1549,7 +1748,7 @@ class EventLoopNode(NodeProtocol):
# without injecting, so the wait still blocks until the user types.
self._input_ready.clear()
if self._event_bus:
if emit_client_request and self._event_bus:
await self._event_bus.emit_client_input_requested(
stream_id=ctx.stream_id or ctx.node_id,
node_id=ctx.node_id,
@@ -1576,18 +1775,35 @@ class EventLoopNode(NodeProtocol):
tools: list[Tool],
iteration: int,
accumulator: OutputAccumulator,
) -> tuple[str, list[dict], list[str], dict[str, int], list[dict], bool, str, list[str] | None]:
) -> tuple[
str,
list[dict],
list[str],
dict[str, int],
list[dict],
bool,
str,
list[str] | None,
bool,
str,
list[dict[str, Any]],
]:
"""Run a single LLM turn with streaming and tool execution.
Returns (assistant_text, real_tool_results, outputs_set, token_counts, logged_tool_calls,
user_input_requested, ask_user_prompt, ask_user_options).
user_input_requested, ask_user_prompt, ask_user_options, queen_input_requested,
system_prompt, messages).
``real_tool_results`` contains only results from actual tools (web_search,
etc.), NOT from the synthetic ``set_output`` or ``ask_user`` tools.
etc.), NOT from synthetic framework tools such as ``set_output``,
``ask_user``, or ``escalate``.
``outputs_set`` lists the output keys written via ``set_output`` during
this turn. ``user_input_requested`` is True if the LLM called
``ask_user`` during this turn. This separation lets the caller treat
synthetic tools as framework concerns rather than tool-execution concerns.
``queen_input_requested`` is True when the worker called
``escalate(wait_for_response=true)`` and should wait for
queen guidance before judge evaluation.
``logged_tool_calls`` accumulates ALL tool calls across inner iterations
(real tools, set_output, and discarded calls) for L3 logging. Unlike
@@ -1600,11 +1816,14 @@ class EventLoopNode(NodeProtocol):
token_counts: dict[str, int] = {"input": 0, "output": 0}
tool_call_count = 0
final_text = ""
final_system_prompt = conversation.system_prompt
final_messages: list[dict[str, Any]] = []
# Track output keys set via set_output across all inner iterations
outputs_set_this_turn: list[str] = []
user_input_requested = False
ask_user_prompt = ""
ask_user_options: list[str] | None = None
queen_input_requested = False
# Accumulate ALL tool calls across inner iterations for L3 logging.
# Unlike real_tool_results (reset each inner iteration), this persists.
logged_tool_calls: list[dict] = []
@@ -1635,6 +1854,8 @@ class EventLoopNode(NodeProtocol):
)
await conversation.add_user_message("[Continue working on your current task.]")
messages = conversation.to_llm_messages()
final_system_prompt = conversation.system_prompt
final_messages = messages
accumulated_text = ""
tool_calls: list[ToolCallEvent] = []
@@ -1753,6 +1974,9 @@ class EventLoopNode(NodeProtocol):
user_input_requested,
ask_user_prompt,
ask_user_options,
queen_input_requested,
final_system_prompt,
final_messages,
)
# Execute tool calls — framework tools (set_output, ask_user)
@@ -1896,6 +2120,51 @@ class EventLoopNode(NodeProtocol):
)
results_by_id[tc.tool_use_id] = result
elif tc.tool_name == "escalate":
# --- Framework-level escalate handling ---
reason = str(tc.tool_input.get("reason", "")).strip()
context = str(tc.tool_input.get("context", "")).strip()
# Always wait for queen guidance
if stream_id in ("queen", "judge"):
result = ToolResult(
tool_use_id=tc.tool_use_id,
content=(
"ERROR: escalate is only available to worker "
"nodes/sub-agents, not queen/judge streams."
),
is_error=True,
)
results_by_id[tc.tool_use_id] = result
continue
if self._event_bus is None:
result = ToolResult(
tool_use_id=tc.tool_use_id,
content=(
"ERROR: EventBus unavailable. Could not emit escalation request."
),
is_error=True,
)
results_by_id[tc.tool_use_id] = result
continue
await self._event_bus.emit_escalation_requested(
stream_id=stream_id,
node_id=node_id,
reason=reason,
context=context,
execution_id=execution_id,
)
queen_input_requested = True
result = ToolResult(
tool_use_id=tc.tool_use_id,
content="Escalation requested to hive_coder (queen); waiting for guidance.",
is_error=False,
)
results_by_id[tc.tool_use_id] = result
elif tc.tool_name == "delegate_to_sub_agent":
# --- Framework-level subagent delegation ---
# Queue for parallel execution in Phase 2
@@ -2091,6 +2360,7 @@ class EventLoopNode(NodeProtocol):
if tc.tool_name not in (
"set_output",
"ask_user",
"escalate",
"delegate_to_sub_agent",
"report_to_parent",
):
@@ -2181,6 +2451,9 @@ class EventLoopNode(NodeProtocol):
user_input_requested,
ask_user_prompt,
ask_user_options,
queen_input_requested,
final_system_prompt,
final_messages,
)
# --- Mid-turn pruning: prevent context blowup within a single turn ---
@@ -2197,9 +2470,9 @@ class EventLoopNode(NodeProtocol):
conversation.usage_ratio() * 100,
)
# If ask_user was called, return immediately so the outer loop
# can block for user input instead of re-invoking the LLM.
if user_input_requested:
# If the turn requested external input (ask_user or queen handoff),
# return immediately so the outer loop can block before judge eval.
if user_input_requested or queen_input_requested:
return (
final_text,
real_tool_results,
@@ -2209,12 +2482,17 @@ class EventLoopNode(NodeProtocol):
user_input_requested,
ask_user_prompt,
ask_user_options,
queen_input_requested,
final_system_prompt,
final_messages,
)
# Tool calls processed -- loop back to stream with updated conversation
# -------------------------------------------------------------------
# Synthetic tools: set_output, ask_user
# Synthetic tools: set_output, ask_user, escalate
# ask_user is used by queen
# escalate is used by worker
# -------------------------------------------------------------------
def _build_ask_user_tool(self) -> Tool:
@@ -2299,6 +2577,35 @@ class EventLoopNode(NodeProtocol):
},
)
def _build_escalate_tool(self) -> Tool:
"""Build the synthetic escalate tool for worker -> queen handoff."""
return Tool(
name="escalate",
description=(
"Escalate to the Hive Coder queen when requesting user input, "
"blocked by errors, missing "
"credentials, or ambiguous constraints that require supervisor "
"guidance. Include a concise reason and optional context. "
"The node will pause until the queen injects guidance."
),
parameters={
"type": "object",
"properties": {
"reason": {
"type": "string",
"description": (
"Short reason for escalation (e.g. 'Tool repeatedly failing')."
),
},
"context": {
"type": "string",
"description": "Optional diagnostic details for the queen.",
},
},
"required": ["reason"],
},
)
def _build_delegate_tool(
self, sub_agents: list[str], node_registry: dict[str, Any]
) -> Tool | None:
@@ -2478,6 +2785,10 @@ class EventLoopNode(NodeProtocol):
if self._mark_complete_flag:
return JudgeVerdict(action="ACCEPT")
# Opt-out: node explicitly disables judge (e.g. conversational queen)
if ctx.node_spec.skip_judge:
return JudgeVerdict(action="RETRY", feedback="")
if self._judge is not None:
context = {
"assistant_text": assistant_text,
@@ -2620,13 +2931,47 @@ class EventLoopNode(NodeProtocol):
skip = set(nullable_keys) if nullable_keys else set()
return [k for k in output_keys if k not in skip and accumulator.get(k) is None]
@staticmethod
def _ngram_similarity(s1: str, s2: str, n: int = 2) -> float:
"""Jaccard similarity of n-gram sets.
Returns 0.0-1.0, where 1.0 is exact match.
Fast: O(len(s) + len(s2)) using set operations.
"""
def _ngrams(s: str) -> set[str]:
return {s[i : i + n] for i in range(len(s) - n + 1) if s.strip()}
if not s1 or not s2:
return 0.0
ngrams1, ngrams2 = _ngrams(s1.lower()), _ngrams(s2.lower())
if not ngrams1 or not ngrams2:
return 0.0
intersection = len(ngrams1 & ngrams2)
union = len(ngrams1 | ngrams2)
return intersection / union if union else 0.0
def _is_stalled(self, recent_responses: list[str]) -> bool:
"""Detect stall: N consecutive identical non-empty responses."""
"""Detect stall using n-gram similarity.
Detects when N consecutive responses have similarity >= threshold.
This catches phrases like "I'm still stuck" vs "I'm stuck".
"""
if len(recent_responses) < self._config.stall_detection_threshold:
return False
if not recent_responses[0]:
return False
return all(r == recent_responses[0] for r in recent_responses)
threshold = self._config.stall_similarity_threshold
# Check similarity against all recent responses (excluding self)
for i, resp in enumerate(recent_responses):
# Compare against all previous responses
for prev in recent_responses[:i]:
if self._ngram_similarity(resp, prev) >= threshold:
return True
return False
@staticmethod
def _is_transient_error(exc: BaseException) -> bool:
@@ -2705,7 +3050,10 @@ class EventLoopNode(NodeProtocol):
self,
recent_tool_fingerprints: list[list[tuple[str, str]]],
) -> tuple[bool, str]:
"""Detect doom loop: N consecutive turns with identical tool calls.
"""Detect doom loop using n-gram similarity on tool inputs.
Detects when N consecutive turns have similar tool calls.
Similarity applies to the canonicalized tool input strings.
Returns (is_doom_loop, description).
"""
@@ -2714,15 +3062,27 @@ class EventLoopNode(NodeProtocol):
threshold = self._config.tool_doom_loop_threshold
if len(recent_tool_fingerprints) < threshold:
return False, ""
# All entries must be non-empty and identical
first = recent_tool_fingerprints[0]
if not first:
return False, ""
if all(fp == first for fp in recent_tool_fingerprints):
tool_names = [name for name, _ in first]
# Convert a turn's list of (name, args) pairs to a single comparable string.
def _turn_sig(fp: list[tuple[str, str]]) -> str:
return "|".join(f"{name}:{args}" for name, args in fp)
first_sig = _turn_sig(first)
similarity_threshold = self._config.stall_similarity_threshold
similar_count = sum(
1
for fp in recent_tool_fingerprints
if self._ngram_similarity(_turn_sig(fp), first_sig) >= similarity_threshold
)
if similar_count >= threshold:
tool_names = [name for fp in recent_tool_fingerprints for name, _ in fp]
desc = (
f"Doom loop detected: {threshold} consecutive identical "
f"tool calls ({', '.join(tool_names)})"
f"Doom loop detected: {similar_count}/{len(recent_tool_fingerprints)} "
f"consecutive similar tool calls ({', '.join(tool_names)})"
)
return True, desc
return False, ""
@@ -2933,8 +3293,6 @@ class EventLoopNode(NodeProtocol):
# --- Compaction -----------------------------------------------------------
# Threshold above which LLM compaction is invoked (structural handles 80-95%).
_LLM_COMPACT_THRESHOLD = 0.95
# Max chars of formatted messages before proactively splitting for LLM.
_LLM_COMPACT_CHAR_LIMIT = 240_000
# Max recursion depth for binary-search splitting.
@@ -2949,8 +3307,11 @@ class EventLoopNode(NodeProtocol):
"""Compact conversation history to stay within token budget.
1. Prune old tool results (always, free).
2. Structure-preserving compaction at >=80% (standard, then aggressive).
3. LLM compaction at >95% with recursive binary-search splitting.
2. Structure-preserving compaction (standard, free) removes freeform text
to spillover files, retains tool-call structure.
3. LLM summary compaction generates a summary and places it as the first
message, replacing old messages. Used whenever structural compaction
does not fully resolve the budget.
4. Emergency deterministic summary only if LLM failed or unavailable.
"""
ratio_before = conversation.usage_ratio()
@@ -2973,36 +3334,26 @@ class EventLoopNode(NodeProtocol):
await self._log_compaction(ctx, conversation, ratio_before)
return
# --- Step 2: Structure-preserving compaction (>=80%) ---
# --- Step 2: Standard structure-preserving compaction (free, no LLM) ---
# Removes freeform text to spillover files; keeps tool-call pairs in context.
spill_dir = self._config.spillover_dir
if spill_dir:
pre_structural = conversation.usage_ratio()
await conversation.compact_preserving_structure(
spillover_dir=spill_dir,
keep_recent=4,
phase_graduated=phase_grad,
)
if conversation.usage_ratio() >= 0.9 * pre_structural:
logger.info(
"Standard structural compaction ineffective "
"(%.0f%% -> %.0f%%), trying aggressive",
pre_structural * 100,
conversation.usage_ratio() * 100,
)
await conversation.compact_preserving_structure(
spillover_dir=spill_dir,
keep_recent=4,
phase_graduated=phase_grad,
aggressive=True,
)
if not conversation.needs_compaction():
await self._log_compaction(ctx, conversation, ratio_before)
return
# --- Step 3: LLM compaction at >95% (recursive binary-search) ---
if conversation.usage_ratio() > self._LLM_COMPACT_THRESHOLD and ctx.llm is not None:
# --- Step 3: LLM summary compaction ---
# Structural compaction alone did not hit target. Generate an LLM summary
# and place it as the first message — more reliable for token reduction
# than offloading more content to files.
if ctx.llm is not None:
logger.info(
"LLM compaction triggered (%.0f%% usage)",
"LLM summary compaction triggered (%.0f%% usage)",
conversation.usage_ratio() * 100,
)
try:
@@ -3328,14 +3679,23 @@ class EventLoopNode(NodeProtocol):
data_files = [f for f in all_files if f not in conv_files]
if conv_files:
conv_list = "\n".join(f" - {f}" for f in conv_files)
conv_list = "\n".join(
f" - {f} (full path: {data_dir / f})" for f in conv_files
)
parts.append(
"CONVERSATION HISTORY (freeform messages saved during compaction — "
"use load_data to review earlier dialogue):\n" + conv_list
"use load_data('<filename>'), read_file('<full_path>'), "
"or run_command('cat \"<full_path>\"') to review earlier dialogue):\n"
+ conv_list
)
if data_files:
file_list = "\n".join(f" - {f}" for f in data_files[:30])
parts.append("DATA FILES (use load_data to read):\n" + file_list)
file_list = "\n".join(
f" - {f} (full path: {data_dir / f})" for f in data_files[:30]
)
parts.append(
"DATA FILES (use load_data('<filename>'), read_file('<full_path>'), "
"or run_command('cat \"<full_path>\"') to read):\n" + file_list
)
if not all_files:
parts.append(
"NOTE: Large tool results may have been saved to files. "
@@ -3577,6 +3937,45 @@ class EventLoopNode(NodeProtocol):
except Exception as e:
logger.warning("Action plan generation failed for node '%s': %s", node_id, e)
async def _run_hooks(
self,
event: str,
conversation: NodeConversation,
trigger: str | None = None,
) -> None:
"""Run all registered hooks for *event*, applying their results.
Each hook receives a HookContext and may return a HookResult that:
- replaces the system prompt (result.system_prompt)
- injects an extra user message (result.inject)
Hooks run in registration order; each sees the prompt as left by the
previous hook.
"""
hook_list = self._config.hooks.get(event, [])
if not hook_list:
return
for hook in hook_list:
ctx = HookContext(
event=event,
trigger=trigger,
system_prompt=conversation.system_prompt,
)
try:
result = await hook(ctx)
except Exception:
import logging
logging.getLogger(__name__).warning(
"Hook '%s' raised an exception", event, exc_info=True
)
continue
if result is None:
continue
if result.system_prompt:
conversation.update_system_prompt(result.system_prompt)
if result.inject:
await conversation.add_user_message(result.inject)
async def _publish_iteration(
self, stream_id: str, node_id: str, iteration: int, execution_id: str = ""
) -> None:
@@ -3611,6 +4010,32 @@ class EventLoopNode(NodeProtocol):
iteration=iteration,
)
def _log_skip_judge(
self,
ctx: NodeContext,
node_id: str,
iteration: int,
feedback: str,
tool_calls: list[dict],
llm_text: str,
turn_tokens: dict[str, int],
iter_start: float,
) -> None:
"""Log a CONTINUE step that skips judge evaluation (e.g., waiting for input)."""
if ctx.runtime_logger:
ctx.runtime_logger.log_step(
node_id=node_id,
node_type="event_loop",
step_index=iteration,
verdict="CONTINUE",
verdict_feedback=feedback,
tool_calls=tool_calls,
llm_text=llm_text,
input_tokens=turn_tokens.get("input", 0),
output_tokens=turn_tokens.get("output", 0),
latency_ms=int((time.time() - iter_start) * 1000),
)
async def _publish_loop_completed(
self, stream_id: str, node_id: str, iterations: int, execution_id: str = ""
) -> None:
@@ -3627,7 +4052,7 @@ class EventLoopNode(NodeProtocol):
await self._event_bus.emit_node_stalled(
stream_id=stream_id,
node_id=node_id,
reason="Consecutive identical responses detected",
reason="Consecutive similar responses detected",
execution_id=execution_id,
)
+16 -89
View File
@@ -26,7 +26,6 @@ from framework.graph.node import (
NodeSpec,
SharedMemory,
)
from framework.graph.output_cleaner import CleansingConfig, OutputCleaner
from framework.graph.validator import OutputValidator
from framework.llm.provider import LLMProvider, Tool
from framework.observability import set_trace_context
@@ -126,7 +125,6 @@ class GraphExecutor:
tool_executor: Callable | None = None,
node_registry: dict[str, NodeProtocol] | None = None,
approval_callback: Callable | None = None,
cleansing_config: CleansingConfig | None = None,
enable_parallel_execution: bool = True,
parallel_config: ParallelExecutionConfig | None = None,
event_bus: Any | None = None,
@@ -139,6 +137,7 @@ class GraphExecutor:
accounts_data: list[dict] | None = None,
tool_provider_map: dict[str, str] | None = None,
dynamic_tools_provider: Callable | None = None,
dynamic_prompt_provider: Callable | None = None,
):
"""
Initialize the executor.
@@ -150,7 +149,6 @@ class GraphExecutor:
tool_executor: Function to execute tools
node_registry: Custom node implementations by ID
approval_callback: Optional callback for human-in-the-loop approval
cleansing_config: Optional output cleansing configuration
enable_parallel_execution: Enable parallel fan-out execution (default True)
parallel_config: Configuration for parallel execution behavior
event_bus: Optional event bus for emitting node lifecycle events
@@ -163,6 +161,8 @@ class GraphExecutor:
tool_provider_map: Tool name to provider name mapping for account routing
dynamic_tools_provider: Optional callback returning current
tool list (for mode switching)
dynamic_prompt_provider: Optional callback returning current
system prompt (for phase switching)
"""
self.runtime = runtime
self.llm = llm
@@ -182,14 +182,7 @@ class GraphExecutor:
self.accounts_data = accounts_data
self.tool_provider_map = tool_provider_map
self.dynamic_tools_provider = dynamic_tools_provider
# Initialize output cleaner — uses its own dedicated fast model (CEREBRAS_API_KEY),
# never the main agent LLM. Passing the main LLM here would cause expensive
# Anthropic calls for output cleaning whenever ANTHROPIC_API_KEY is set.
self.cleansing_config = cleansing_config or CleansingConfig()
self.output_cleaner = OutputCleaner(
config=self.cleansing_config,
)
self.dynamic_prompt_provider = dynamic_prompt_provider
# Parallel execution settings
self.enable_parallel_execution = enable_parallel_execution
@@ -417,6 +410,7 @@ class GraphExecutor:
input_data: dict[str, Any] | None = None,
session_state: dict[str, Any] | None = None,
checkpoint_config: "CheckpointConfig | None" = None,
validate_graph: bool = True,
) -> ExecutionResult:
"""
Execute a graph for a goal.
@@ -426,6 +420,8 @@ class GraphExecutor:
goal: The goal driving execution
input_data: Initial input data
session_state: Optional session state to resume from (with paused_at, memory, etc.)
validate_graph: If False, skip graph validation (for test graphs that
intentionally break rules)
Returns:
ExecutionResult with output and metrics
@@ -434,12 +430,13 @@ class GraphExecutor:
set_trace_context(agent_id=graph.id)
# Validate graph
errors = graph.validate()
if errors:
return ExecutionResult(
success=False,
error=f"Invalid graph: {errors}",
)
if validate_graph:
result = graph.validate()
if result["errors"]:
return ExecutionResult(
success=False,
error=f"Invalid graph: {result['errors']}",
)
# Validate tool availability
tool_errors = self._validate_tools(graph)
@@ -1801,6 +1798,7 @@ class GraphExecutor:
all_tools=list(self.tools), # Full catalog for subagent tool resolution
shared_node_registry=self.node_registry, # For subagent escalation routing
dynamic_tools_provider=self.dynamic_tools_provider,
dynamic_prompt_provider=self.dynamic_prompt_provider,
)
VALID_NODE_TYPES = {
@@ -1877,6 +1875,7 @@ class GraphExecutor:
max_history_tokens=lc.get("max_history_tokens", 32000),
max_tool_result_chars=lc.get("max_tool_result_chars", 30_000),
spillover_dir=spillover,
hooks=lc.get("hooks", {}),
),
tool_executor=self.tool_executor,
conversation_store=conv_store,
@@ -1912,52 +1911,6 @@ class GraphExecutor:
source_node_name=current_node_spec.name if current_node_spec else current_node_id,
target_node_name=target_node_spec.name if target_node_spec else edge.target,
):
# Validate and clean output before mapping inputs.
# Use full memory state (not just result.output) because
# target input_keys may come from earlier nodes in the
# graph, not only from the immediate source node.
if self.cleansing_config.enabled and target_node_spec:
output_to_validate = memory.read_all()
validation = self.output_cleaner.validate_output(
output=output_to_validate,
source_node_id=current_node_id,
target_node_spec=target_node_spec,
)
if not validation.valid:
self.logger.warning(f"⚠ Output validation failed: {validation.errors}")
# Clean the output
cleaned_output = await self.output_cleaner.clean_output(
output=output_to_validate,
source_node_id=current_node_id,
target_node_spec=target_node_spec,
validation_errors=validation.errors,
)
# Update result with cleaned output
result.output = cleaned_output
# Write cleaned output back to memory (skip validation for LLM output)
for key, value in cleaned_output.items():
memory.write(key, value, validate=False)
# Revalidate
revalidation = self.output_cleaner.validate_output(
output=cleaned_output,
source_node_id=current_node_id,
target_node_spec=target_node_spec,
)
if revalidation.valid:
self.logger.info("✓ Output cleaned and validated successfully")
else:
self.logger.error(
f"✗ Cleaning failed, errors remain: {revalidation.errors}"
)
# Continue anyway if fallback_to_raw is True
# Map inputs (skip validation for processed LLM output)
mapped = edge.map_inputs(result.output, memory.read_all())
for key, value in mapped.items():
@@ -2119,32 +2072,6 @@ class GraphExecutor:
branch.status = "running"
try:
# Validate and clean output before mapping inputs (same as _follow_edges).
# Use full memory state since target input_keys may come
# from earlier nodes, not just the immediate source.
if self.cleansing_config.enabled and node_spec:
mem_snapshot = memory.read_all()
validation = self.output_cleaner.validate_output(
output=mem_snapshot,
source_node_id=source_node_spec.id if source_node_spec else "unknown",
target_node_spec=node_spec,
)
if not validation.valid:
self.logger.warning(
f"⚠ Output validation failed for branch "
f"{branch.node_id}: {validation.errors}"
)
cleaned_output = await self.output_cleaner.clean_output(
output=mem_snapshot,
source_node_id=source_node_spec.id if source_node_spec else "unknown",
target_node_spec=node_spec,
validation_errors=validation.errors,
)
# Write cleaned output to memory
for key, value in cleaned_output.items():
await memory.write_async(key, value)
# Map inputs via edge
mapped = branch.edge.map_inputs(source_result.output, memory.read_all())
for key, value in mapped.items():
+16
View File
@@ -262,6 +262,16 @@ class NodeSpec(BaseModel):
),
)
# Opt out of judge evaluation entirely (no feedback injected, loop continues normally)
skip_judge: bool = Field(
default=False,
description=(
"When True, the implicit judge is bypassed entirely — no feedback is "
"injected and the loop continues naturally. Intended for conversational "
"nodes (e.g., the queen) that should never receive tool-use pressure."
),
)
model_config = {"extra": "allow", "arbitrary_types_allowed": True}
@@ -549,6 +559,12 @@ class NodeContext:
# the queen to switch between building-mode and running-mode tools.
dynamic_tools_provider: Any = None # Callable[[], list[Tool]] | None
# Dynamic prompt provider — when set, EventLoopNode checks each
# iteration and updates the system prompt if it changed. Used by
# the queen to switch between phase-specific prompts (building /
# staging / running) without restarting the conversation.
dynamic_prompt_provider: Any = None # Callable[[], str] | None
@dataclass
class NodeResult:
-395
View File
@@ -1,395 +0,0 @@
"""
Output Cleaner - Framework-level I/O validation and cleaning.
Validates node outputs match expected schemas and uses fast LLM
to clean malformed outputs before they flow to the next node.
This prevents cascading failures and dramatically improves execution success rates.
"""
import json
import logging
import re
from dataclasses import dataclass, field
from typing import Any
logger = logging.getLogger(__name__)
def _heuristic_repair(text: str) -> dict | None:
"""
Attempt to repair JSON without an LLM call.
Handles common errors:
- Markdown code blocks
- Python booleans/None (True -> true)
- Single quotes instead of double quotes
"""
if not isinstance(text, str):
return None
# 1. Strip Markdown code blocks
text = re.sub(r"^```(?:json)?\s*", "", text, flags=re.MULTILINE)
text = re.sub(r"\s*```$", "", text, flags=re.MULTILINE)
text = text.strip()
# 2. Find outermost JSON-like structure (greedy match)
match = re.search(r"(\{.*\}|\[.*\])", text, re.DOTALL)
if match:
candidate = match.group(1)
# 3. Common fixes
# Fix Python constants
candidate = re.sub(r"\bTrue\b", "true", candidate)
candidate = re.sub(r"\bFalse\b", "false", candidate)
candidate = re.sub(r"\bNone\b", "null", candidate)
# 4. Attempt load
try:
return json.loads(candidate)
except json.JSONDecodeError:
# 5. Advanced: Try swapping single quotes if double quotes fail
# This is risky but effective for simple dicts
try:
if "'" in candidate and '"' not in candidate:
candidate_swapped = candidate.replace("'", '"')
return json.loads(candidate_swapped)
except json.JSONDecodeError:
pass
return None
@dataclass
class CleansingConfig:
"""Configuration for output cleansing."""
enabled: bool = True
fast_model: str = "cerebras/llama-3.3-70b" # Fast, cheap model for cleaning
max_retries: int = 2
cache_successful_patterns: bool = True
fallback_to_raw: bool = True # If cleaning fails, pass raw output
log_cleanings: bool = True # Log when cleansing happens
@dataclass
class ValidationResult:
"""Result of output validation."""
valid: bool
errors: list[str] = field(default_factory=list)
warnings: list[str] = field(default_factory=list)
cleaned_output: dict[str, Any] | None = None
class OutputCleaner:
"""
Framework-level output validation and cleaning.
Uses heuristics and fast LLM to clean malformed outputs
before they flow to the next node.
"""
def __init__(self, config: CleansingConfig, llm_provider=None):
"""
Initialize the output cleaner.
Args:
config: Cleansing configuration
llm_provider: Optional LLM provider.
"""
self.config = config
self.success_cache: dict[str, Any] = {} # Cache successful patterns
self.failure_count: dict[str, int] = {} # Track edge failures
self.cleansing_count = 0 # Track total cleanings performed
# Initialize LLM provider for cleaning
if llm_provider:
self.llm = llm_provider
elif config.enabled:
# Create dedicated fast LLM provider for cleaning
try:
import os
from framework.llm.litellm import LiteLLMProvider
api_key = os.environ.get("CEREBRAS_API_KEY")
if api_key:
self.llm = LiteLLMProvider(
api_key=api_key,
model=config.fast_model,
)
logger.info(f"✓ Initialized OutputCleaner with {config.fast_model}")
else:
logger.warning("⚠ CEREBRAS_API_KEY not found, output cleaning will be disabled")
self.llm = None
except ImportError:
logger.warning("⚠ LiteLLMProvider not available, output cleaning disabled")
self.llm = None
else:
self.llm = None
def validate_output(
self,
output: dict[str, Any],
source_node_id: str,
target_node_spec: Any, # NodeSpec
) -> ValidationResult:
"""
Validate output matches target node's expected input schema.
Returns:
ValidationResult with errors and optionally cleaned output
"""
errors = []
warnings = []
# Check 1: Required input keys present (skip nullable keys)
nullable = set(getattr(target_node_spec, "nullable_output_keys", None) or [])
for key in target_node_spec.input_keys:
if key in nullable:
continue
if key not in output:
errors.append(f"Missing required key: '{key}'")
continue
value = output[key]
# Check 2: Detect if value is JSON string (the JSON parsing trap!)
if isinstance(value, str):
# Try parsing as JSON to detect the trap
try:
parsed = json.loads(value)
if isinstance(parsed, dict):
if key in parsed:
# Key exists in parsed JSON - classic parsing failure!
errors.append(
f"Key '{key}' contains JSON string with nested '{key}' field - "
f"likely parsing failure from LLM node"
)
elif len(value) > 100:
# Large JSON string, but doesn't contain the key
warnings.append(
f"Key '{key}' contains JSON string ({len(value)} chars)"
)
except json.JSONDecodeError:
# Not JSON, check if suspiciously large
if len(value) > 500:
warnings.append(
f"Key '{key}' contains large string ({len(value)} chars), "
f"possibly entire LLM response"
)
# Check 3: Type validation (if schema provided)
if hasattr(target_node_spec, "input_schema") and target_node_spec.input_schema:
expected_schema = target_node_spec.input_schema.get(key)
if expected_schema:
expected_type = expected_schema.get("type")
if expected_type and not self._type_matches(value, expected_type):
actual_type = type(value).__name__
errors.append(
f"Key '{key}': expected type '{expected_type}', got '{actual_type}'"
)
# Warnings don't make validation fail, but errors do
is_valid = len(errors) == 0
if not is_valid and self.config.log_cleanings:
logger.warning(
f"⚠ Output validation failed for {source_node_id}{target_node_spec.id}: "
f"{len(errors)} error(s), {len(warnings)} warning(s)"
)
return ValidationResult(
valid=is_valid,
errors=errors,
warnings=warnings,
)
async def clean_output(
self,
output: dict[str, Any],
source_node_id: str,
target_node_spec: Any, # NodeSpec
validation_errors: list[str],
) -> dict[str, Any]:
"""
Use heuristics and fast LLM to clean malformed output.
Args:
output: Raw output from source node
source_node_id: ID of source node
target_node_spec: Target node spec (for schema)
validation_errors: Errors from validation
Returns:
Cleaned output matching target schema
"""
if not self.config.enabled:
logger.warning("⚠ Output cleansing disabled in config")
return output
# --- PHASE 1: Fast Heuristic Repair (Avoids LLM call) ---
# Often the output is just a string containing JSON, or has minor syntax errors
# If output is a dictionary but malformed, we might need to serialize it first
# to try and fix the underlying string representation if it came from raw text
# Heuristic: Check if any value is actually a JSON string that should be promoted
# This handles the "JSON Parsing Trap" where LLM returns {"key": "{\"nested\": ...}"}
heuristic_fixed = False
fixed_output = output.copy()
for key, value in output.items():
if isinstance(value, str):
repaired = _heuristic_repair(value)
if repaired and isinstance(repaired, dict | list):
# Check if this repaired structure looks like what we want
# e.g. if the key is 'data' and the string contained valid JSON
fixed_output[key] = repaired
heuristic_fixed = True
# If we fixed something, re-validate manually to see if it's enough
if heuristic_fixed:
logger.info("⚡ Heuristic repair applied (nested JSON expansion)")
return fixed_output
# --- PHASE 2: LLM-based Repair ---
if not self.llm:
logger.warning("⚠ No LLM provider available for cleansing")
return output
# Build schema description for target node
schema_desc = self._build_schema_description(target_node_spec)
# Create cleansing prompt
prompt = f"""Clean this malformed agent output to match the expected schema.
VALIDATION ERRORS:
{chr(10).join(f"- {e}" for e in validation_errors)}
EXPECTED SCHEMA for node '{target_node_spec.id}':
{schema_desc}
RAW OUTPUT from node '{source_node_id}':
{json.dumps(output, indent=2, default=str)}
INSTRUCTIONS:
1. Extract values that match the expected schema keys
2. If a value is a JSON string, parse it and extract the correct field
3. Convert types to match the schema (string, dict, list, number, boolean)
4. Remove extra fields not in the expected schema
5. Ensure all required keys are present
Return ONLY valid JSON matching the expected schema. No explanations, no markdown."""
try:
if self.config.log_cleanings:
logger.info(
f"🧹 Cleaning output from '{source_node_id}' using {self.config.fast_model}"
)
response = await self.llm.acomplete(
messages=[{"role": "user", "content": prompt}],
system=(
"You clean malformed agent outputs. Return only valid JSON matching the schema."
),
max_tokens=2048, # Sufficient for cleaning most outputs
)
# Parse cleaned output
cleaned_text = response.content.strip()
# Apply heuristic repair to the LLM's output too (just in case)
cleaned = _heuristic_repair(cleaned_text)
if not cleaned:
# Fallback to standard load if heuristic returns None (unlikely for LLM output)
cleaned = json.loads(cleaned_text)
if isinstance(cleaned, dict):
self.cleansing_count += 1
if self.config.log_cleanings:
logger.info(
f"✓ Output cleaned successfully (total cleanings: {self.cleansing_count})"
)
return cleaned
else:
logger.warning(f"⚠ Cleaned output is not a dict: {type(cleaned)}")
if self.config.fallback_to_raw:
return output
else:
raise ValueError(f"Cleaning produced {type(cleaned)}, expected dict")
except json.JSONDecodeError as e:
logger.error(f"✗ Failed to parse cleaned JSON: {e}")
if self.config.fallback_to_raw:
logger.info("↩ Falling back to raw output")
return output
else:
raise
except Exception as e:
logger.error(f"✗ Output cleaning failed: {e}")
if self.config.fallback_to_raw:
logger.info("↩ Falling back to raw output")
return output
else:
raise
def _build_schema_description(self, node_spec: Any) -> str:
"""Build human-readable schema description from NodeSpec."""
lines = ["{"]
for key in node_spec.input_keys:
# Get type hint and description if available
if hasattr(node_spec, "input_schema") and node_spec.input_schema:
schema = node_spec.input_schema.get(key, {})
type_hint = schema.get("type", "any")
description = schema.get("description", "")
required = schema.get("required", True)
line = f' "{key}": {type_hint}'
if description:
line += f" // {description}"
if required:
line += " (required)"
lines.append(line + ",")
else:
# No schema, just show the key
lines.append(f' "{key}": any // (required)')
lines.append("}")
return "\n".join(lines)
def _type_matches(self, value: Any, expected_type: str) -> bool:
"""Check if value matches expected type."""
type_map = {
"string": str,
"str": str,
"int": int,
"integer": int,
"float": float,
"number": (int, float),
"bool": bool,
"boolean": bool,
"dict": dict,
"object": dict,
"list": list,
"array": list,
"any": object, # Matches everything
}
expected_class = type_map.get(expected_type.lower())
if expected_class:
return isinstance(value, expected_class)
# Unknown type, allow it
return True
def get_stats(self) -> dict[str, Any]:
"""Get cleansing statistics."""
return {
"total_cleanings": self.cleansing_count,
"failure_count": dict(self.failure_count),
"cache_size": len(self.success_cache),
}
@@ -1,242 +0,0 @@
"""
Test OutputCleaner with real Cerebras LLM.
Demonstrates how OutputCleaner fixes the JSON parsing trap using llama-3.3-70b.
"""
import json
import os
from framework.graph.node import NodeSpec
from framework.graph.output_cleaner import CleansingConfig, OutputCleaner
from framework.llm.litellm import LiteLLMProvider
def test_cleaning_with_cerebras():
"""Test that cleaning fixes malformed output using Cerebras llama-3.3-70b."""
print("\n" + "=" * 80)
print("LIVE TEST: Cleaning with Cerebras llama-3.3-70b")
print("=" * 80)
# Get API key
api_key = os.environ.get("CEREBRAS_API_KEY")
if not api_key:
print("\n⚠ Skipping: CEREBRAS_API_KEY not found in environment")
return
# Initialize LLM
llm = LiteLLMProvider(
api_key=api_key,
model="cerebras/llama-3.3-70b",
)
# Initialize cleaner with Cerebras
cleaner = OutputCleaner(
config=CleansingConfig(
enabled=True,
fast_model="cerebras/llama-3.3-70b",
log_cleanings=True,
),
llm_provider=llm,
)
# Scenario 1: JSON parsing trap (entire response in one key)
print("\n--- Scenario 1: JSON Parsing Trap ---")
malformed_output = {
"recommendation": (
'{\n "approval_decision": "APPROVED",\n "risk_score": 3.5,\n '
'"reason": "Standard terms, low risk"\n}'
),
}
target_spec = NodeSpec(
id="generate-recommendation",
name="Generate Recommendation",
description="Test",
input_keys=["recommendation"],
output_keys=["result"],
input_schema={
"recommendation": {
"type": "dict",
"required": True,
"description": "Recommendation with approval_decision and risk_score",
},
},
)
# Validate
validation = cleaner.validate_output(
output=malformed_output,
source_node_id="analyze-contract",
target_node_spec=target_spec,
)
print("\nMalformed output:")
print(json.dumps(malformed_output, indent=2))
print(f"\nValidation errors: {validation.errors}")
# Clean the output
print("\n🧹 Cleaning with Cerebras llama-3.3-70b...")
cleaned = cleaner.clean_output(
output=malformed_output,
source_node_id="analyze-contract",
target_node_spec=target_spec,
validation_errors=validation.errors,
)
print("\n✓ Cleaned output:")
print(json.dumps(cleaned, indent=2))
assert isinstance(cleaned, dict), "Should return dict"
assert "approval_decision" in str(cleaned) or isinstance(cleaned.get("recommendation"), dict), (
"Should have recommendation structure"
)
# Scenario 2: Multiple keys with JSON string
print("\n\n--- Scenario 2: Multiple Keys, JSON String ---")
malformed_output2 = {
"analysis": (
'{"high_risk_clauses": ["unlimited liability"], '
'"compliance_issues": [], "category": "high-risk"}'
),
"risk_score": "7.5", # String instead of number
}
target_spec2 = NodeSpec(
id="next-node",
name="Next Node",
description="Test",
input_keys=["analysis", "risk_score"],
output_keys=["result"],
input_schema={
"analysis": {"type": "dict", "required": True},
"risk_score": {"type": "number", "required": True},
},
)
validation2 = cleaner.validate_output(
output=malformed_output2,
source_node_id="analyze",
target_node_spec=target_spec2,
)
print("\nMalformed output:")
print(json.dumps(malformed_output2, indent=2))
print(f"\nValidation errors: {validation2.errors}")
if not validation2.valid:
print("\n🧹 Cleaning with Cerebras llama-3.3-70b...")
cleaned2 = cleaner.clean_output(
output=malformed_output2,
source_node_id="analyze",
target_node_spec=target_spec2,
validation_errors=validation2.errors,
)
print("\n✓ Cleaned output:")
print(json.dumps(cleaned2, indent=2))
assert isinstance(cleaned2, dict), "Should return dict"
assert isinstance(cleaned2.get("analysis"), dict), "analysis should be dict"
assert isinstance(cleaned2.get("risk_score"), (int, float)), "risk_score should be number"
# Stats
stats = cleaner.get_stats()
print("\n\nCleaner Statistics:")
print(f" Total cleanings: {stats['total_cleanings']}")
print(f" Cache size: {stats['cache_size']}")
print("\n" + "=" * 80)
print("✓ LIVE TEST PASSED")
print("=" * 80)
def test_validation_only():
"""Test validation without LLM (no cleaning)."""
print("\n" + "=" * 80)
print("TEST: Validation Only (No LLM)")
print("=" * 80)
cleaner = OutputCleaner(
config=CleansingConfig(enabled=True),
llm_provider=None, # No LLM
)
# Test 1: JSON parsing trap detection
malformed = {
"approval_decision": '{"approval_decision": "APPROVED", "risk_score": 3}',
}
target = NodeSpec(
id="target",
name="Target",
description="Test",
input_keys=["approval_decision"],
output_keys=["result"],
)
result = cleaner.validate_output(
output=malformed,
source_node_id="source",
target_node_spec=target,
)
print(f"\nInput: {json.dumps(malformed, indent=2)}")
print(f"Errors: {result.errors}")
print(f"Warnings: {result.warnings}")
assert not result.valid or len(result.warnings) > 0, "Should detect JSON string"
print("✓ Detected JSON parsing trap")
# Test 2: Missing keys
malformed2 = {"field1": "value"}
target2 = NodeSpec(
id="target",
name="Target",
description="Test",
input_keys=["field1", "field2"],
output_keys=["result"],
)
result2 = cleaner.validate_output(
output=malformed2,
source_node_id="source",
target_node_spec=target2,
)
print(f"\nInput: {json.dumps(malformed2, indent=2)}")
print(f"Errors: {result2.errors}")
assert not result2.valid, "Should be invalid"
assert "field2" in result2.errors[0], "Should mention missing field"
print("✓ Detected missing keys")
print("\n✓ Validation tests passed")
if __name__ == "__main__":
print("\n" + "=" * 80)
print("OUTPUT CLEANER LIVE TEST SUITE (with Cerebras)")
print("=" * 80)
try:
# Test validation (no LLM needed)
test_validation_only()
# Test cleaning with Cerebras
test_cleaning_with_cerebras()
print("\n" + "=" * 80)
print("ALL TESTS PASSED ✓")
print("=" * 80)
print("\nOutputCleaner is working with Cerebras llama-3.3-70b!")
print("- Fast cleaning (~200-500ms per operation)")
print("- Fixes JSON parsing trap")
print("- Converts types to match schema")
print("- Low cost (~$0.001 per cleaning)")
except Exception as e:
print(f"\n✗ TEST FAILED: {e}")
import traceback
traceback.print_exc()
raise
+98 -2
View File
@@ -117,6 +117,7 @@ if litellm is not None:
RATE_LIMIT_MAX_RETRIES = 10
RATE_LIMIT_BACKOFF_BASE = 2 # seconds
RATE_LIMIT_MAX_DELAY = 120 # seconds - cap to prevent absurd waits
MINIMAX_API_BASE = "https://api.minimax.io/v1"
# Empty-stream retries use a short fixed delay, not the rate-limit backoff.
# Conversation-structure issues are deterministic — long waits don't help.
@@ -324,11 +325,13 @@ class LiteLLMProvider(LLMProvider):
"""
self.model = model
self.api_key = api_key
self.api_base = api_base
self.api_base = api_base or self._default_api_base_for_model(model)
self.extra_kwargs = kwargs
# The Codex ChatGPT backend (chatgpt.com/backend-api/codex) rejects
# several standard OpenAI params: max_output_tokens, stream_options.
self._codex_backend = bool(api_base and "chatgpt.com/backend-api/codex" in api_base)
self._codex_backend = bool(
self.api_base and "chatgpt.com/backend-api/codex" in self.api_base
)
if litellm is None:
raise ImportError(
@@ -341,6 +344,14 @@ class LiteLLMProvider(LLMProvider):
# override the mode. The responses_api_bridge in litellm handles
# converting Chat Completions requests to Responses API format.
@staticmethod
def _default_api_base_for_model(model: str) -> str | None:
"""Return provider-specific default API base when required."""
model_lower = model.lower()
if model_lower.startswith("minimax/") or model_lower.startswith("minimax-"):
return MINIMAX_API_BASE
return None
def _completion_with_rate_limit_retry(
self, max_retries: int | None = None, **kwargs: Any
) -> Any:
@@ -735,6 +746,77 @@ class LiteLLMProvider(LLMProvider):
},
}
def _is_minimax_model(self) -> bool:
"""Return True when the configured model targets MiniMax."""
model = (self.model or "").lower()
return model.startswith("minimax/") or model.startswith("minimax-")
async def _stream_via_nonstream_completion(
self,
messages: list[dict[str, Any]],
system: str,
tools: list[Tool] | None,
max_tokens: int,
response_format: dict[str, Any] | None,
json_mode: bool,
) -> AsyncIterator[StreamEvent]:
"""Fallback path: convert non-stream completion to stream events.
Some providers currently fail in LiteLLM's chunk parser for stream=True.
For those providers we do a regular async completion and emit equivalent
stream events so higher layers continue to work.
"""
from framework.llm.stream_events import (
FinishEvent,
StreamErrorEvent,
TextDeltaEvent,
TextEndEvent,
ToolCallEvent,
)
try:
response = await self.acomplete(
messages=messages,
system=system,
tools=tools,
max_tokens=max_tokens,
response_format=response_format,
json_mode=json_mode,
)
except Exception as e:
yield StreamErrorEvent(error=str(e), recoverable=False)
return
raw = response.raw_response
tool_calls = []
if raw and hasattr(raw, "choices") and raw.choices:
msg = raw.choices[0].message
tool_calls = msg.tool_calls or []
for tc in tool_calls:
parsed_args: Any
args = tc.function.arguments if tc.function else ""
try:
parsed_args = json.loads(args) if args else {}
except json.JSONDecodeError:
parsed_args = {"_raw": args}
yield ToolCallEvent(
tool_use_id=getattr(tc, "id", ""),
tool_name=tc.function.name if tc.function else "",
tool_input=parsed_args,
)
if response.content:
yield TextDeltaEvent(content=response.content, snapshot=response.content)
yield TextEndEvent(full_text=response.content)
yield FinishEvent(
stop_reason=response.stop_reason or "stop",
input_tokens=response.input_tokens,
output_tokens=response.output_tokens,
model=response.model,
)
async def stream(
self,
messages: list[dict[str, Any]],
@@ -762,6 +844,20 @@ class LiteLLMProvider(LLMProvider):
ToolCallEvent,
)
# MiniMax currently fails in litellm's stream chunk parser for some
# responses (missing "id" in stream chunks). Use non-stream fallback.
if self._is_minimax_model():
async for event in self._stream_via_nonstream_completion(
messages=messages,
system=system,
tools=tools,
max_tokens=max_tokens,
response_format=response_format,
json_mode=json_mode,
):
yield event
return
full_messages: list[dict[str, Any]] = []
if system:
full_messages.append({"role": "system", "content": system})
File diff suppressed because it is too large Load Diff
+1 -1
View File
@@ -244,7 +244,7 @@ judge_graph = GraphSpec(
version="1.0.0",
entry_node="judge",
entry_points={"health_check": "judge"},
terminal_nodes=[], # Forever-alive: fires on every timer tick
terminal_nodes=["judge"], # Judge node can terminate after each check
pause_nodes=[],
nodes=[judge_node],
edges=[],
+3 -1
View File
@@ -40,8 +40,10 @@ def validate_graph_structure(graph: GraphSpec) -> list[str]:
Delegates to GraphSpec.validate() which checks entry/terminal nodes,
edge references, reachability, fan-out rules, and GCU constraints.
Returns only errors (warnings are not blocking).
"""
return graph.validate()
result = graph.validate()
return result["errors"]
def validate_credentials(
+16 -6
View File
@@ -959,11 +959,16 @@ class AgentRunner:
if not agent_json_path.is_file():
raise FileNotFoundError(f"No agent.py or agent.json found in {agent_path}")
content = agent_json_path.read_text(encoding="utf-8").strip()
if not content:
raise FileNotFoundError(f"agent.json is empty: {agent_json_path}")
with open(agent_json_path, encoding="utf-8") as f:
export_data = f.read()
graph, goal = load_agent_export(content)
if not export_data.strip():
raise ValueError(f"Empty agent export file: {agent_json_path}")
try:
graph, goal = load_agent_export(export_data)
except json.JSONDecodeError as exc:
raise ValueError(f"Invalid JSON in agent export file: {agent_json_path}") from exc
return cls(
agent_path=agent_path,
@@ -1307,6 +1312,8 @@ class AgentRunner:
return "REPLICATE_API_KEY"
elif model_lower.startswith("together/"):
return "TOGETHER_API_KEY"
elif model_lower.startswith("minimax/") or model_lower.startswith("minimax-"):
return "MINIMAX_API_KEY"
else:
# Default: assume OpenAI-compatible
return "OPENAI_API_KEY"
@@ -1325,6 +1332,8 @@ class AgentRunner:
cred_id = None
if model_lower.startswith("anthropic/") or model_lower.startswith("claude"):
cred_id = "anthropic"
elif model_lower.startswith("minimax/") or model_lower.startswith("minimax-"):
cred_id = "minimax"
# Add more mappings as providers are added to LLM_CREDENTIALS
if cred_id is None:
@@ -1742,8 +1751,9 @@ class AgentRunner:
missing_tools = []
# Validate graph structure
graph_errors = self.graph.validate()
errors.extend(graph_errors)
graph_result = self.graph.validate()
errors.extend(graph_result["errors"])
warnings.extend(graph_result["warnings"])
# Check goal has success criteria
if not self.goal.success_criteria:
+10 -2
View File
@@ -52,6 +52,10 @@ A graph execution finished successfully.
**Emitted by:** `ExecutionStream._run_execution()`
**Queen notification:** When a worker execution completes, the session manager \
injects a `[WORKER_TERMINAL]` notification into the queen with the output summary. \
The queen reports to the user and asks what to do next.
---
### `execution_failed`
@@ -64,6 +68,10 @@ A graph execution failed with an error.
**Emitted by:** `ExecutionStream._run_execution()`
**Queen notification:** When a worker execution fails, the session manager \
injects a `[WORKER_TERMINAL]` notification into the queen with the error. \
The queen reports to the user and helps troubleshoot.
---
### `execution_paused`
@@ -433,14 +441,14 @@ Note: `node_id` is not set on this event; `stream_id` is the webhook source ID.
### `escalation_requested`
An agent has requested handoff to the Hive Coder (via the `escalate_to_coder` synthetic tool).
An agent has requested handoff to the Hive Coder (via the `escalate` synthetic tool).
| Data Field | Type | Description |
| ---------- | ----- | ------------------------------- |
| `reason` | `str` | Why escalation is needed |
| `context` | `str` | Additional context for the coder|
**Emitted by:** `EventLoopNode` when the LLM calls `escalate_to_coder`.
**Emitted by:** `EventLoopNode` when the LLM calls `escalate`.
---
+2 -2
View File
@@ -561,7 +561,7 @@ The runtime logger automatically flags issues based on execution metrics:
### Attention Categories
Used by `/hive-debugger` skill for issue categorization:
Used for runtime issue categorization:
1. **Missing Outputs**: Node didn't set required output keys
2. **Tool Errors**: Tool calls failed (API errors, timeouts)
@@ -690,7 +690,7 @@ rm -rf session_2025*
**Documentation:**
- `EXECUTION_STORAGE_REDESIGN.md` - Unified session storage design
- `/.claude/skills/hive-debugger/SKILL.md` - Interactive debugging skill
- `docs/developer-guide.md` - Debugging and troubleshooting workflows
**Related:**
- `core/framework/schemas/session_state.py` - Session state schema
+5 -2
View File
@@ -137,8 +137,11 @@ class EventType(StrEnum):
WORKER_LOADED = "worker_loaded"
CREDENTIALS_REQUIRED = "credentials_required"
# Queen mode changes (building running)
QUEEN_MODE_CHANGED = "queen_mode_changed"
# Queen phase changes (building <-> staging <-> running)
QUEEN_PHASE_CHANGED = "queen_phase_changed"
# Queen thinking hook — persona selected for the current building session
QUEEN_PERSONA_SELECTED = "queen_persona_selected"
# Subagent reports (one-way progress updates from sub-agents)
SUBAGENT_REPORT = "subagent_report"
+14 -27
View File
@@ -1,45 +1,30 @@
"""HIVE_LLM_DEBUG — write every LLM turn to a JSONL file for replay/debugging.
"""Write every LLM turn to ~/.hive/llm_logs/<ts>.jsonl for replay/debugging.
Set the env var to enable:
HIVE_LLM_DEBUG=1 writes to ~/.hive/llm_logs/<ts>.jsonl
HIVE_LLM_DEBUG=/some/path writes to that directory
Each line is a JSON object with the full LLM turn: assistant text, tool calls,
tool results, and token counts. The file is opened lazily on first call and
flushed after every write. Errors are silently swallowed this must never
break the agent.
Each line is a JSON object with the full LLM turn: the request payload
(system prompt + messages), assistant text, tool calls, tool results, and
token counts. The file is opened lazily on first call and flushed after every
write. Errors are silently swallowed this must never break the agent.
"""
import json
import logging
import os
from datetime import datetime
from pathlib import Path
from typing import IO, Any
logger = logging.getLogger(__name__)
_LLM_DEBUG_RAW = os.environ.get("HIVE_LLM_DEBUG", "").strip()
_LLM_DEBUG_ENABLED = _LLM_DEBUG_RAW.lower() in ("1", "true") or (
bool(_LLM_DEBUG_RAW) and _LLM_DEBUG_RAW.lower() not in ("0", "false", "")
)
_LLM_DEBUG_DIR = Path.home() / ".hive" / "llm_logs"
_log_file: IO[str] | None = None
_log_ready = False # lazy init guard
def _open_log() -> IO[str] | None:
"""Open a JSONL log file. Returns None if disabled."""
if not _LLM_DEBUG_ENABLED:
return None
raw = _LLM_DEBUG_RAW
if raw.lower() in ("1", "true"):
log_dir = Path.home() / ".hive" / "llm_logs"
else:
log_dir = Path(raw)
log_dir.mkdir(parents=True, exist_ok=True)
"""Open the JSONL log file for this process."""
_LLM_DEBUG_DIR.mkdir(parents=True, exist_ok=True)
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
path = log_dir / f"{ts}.jsonl"
path = _LLM_DEBUG_DIR / f"{ts}.jsonl"
logger.info("LLM debug log → %s", path)
return open(path, "a", encoding="utf-8") # noqa: SIM115
@@ -50,6 +35,8 @@ def log_llm_turn(
stream_id: str,
execution_id: str,
iteration: int,
system_prompt: str,
messages: list[dict[str, Any]],
assistant_text: str,
tool_calls: list[dict[str, Any]],
tool_results: list[dict[str, Any]],
@@ -57,10 +44,8 @@ def log_llm_turn(
) -> None:
"""Write one JSONL line capturing a complete LLM turn.
No-op when HIVE_LLM_DEBUG is not set. Never raises.
Never raises.
"""
if not _LLM_DEBUG_ENABLED:
return
try:
global _log_file, _log_ready # noqa: PLW0603
if not _log_ready:
@@ -74,6 +59,8 @@ def log_llm_turn(
"stream_id": stream_id,
"execution_id": execution_id,
"iteration": iteration,
"system_prompt": system_prompt,
"messages": messages,
"assistant_text": assistant_text,
"tool_calls": tool_calls,
"tool_results": tool_results,
@@ -561,7 +561,7 @@ class TestGraphSpecValidation:
edges=[],
)
errors = graph.validate()
errors = graph.validate()["errors"]
assert any("nonexistent-node" in e for e in errors)
# Invalid isolation level
@@ -582,7 +582,7 @@ class TestGraphSpecValidation:
edges=[],
)
errors2 = graph2.validate()
errors2 = graph2.validate()["errors"]
assert any("isolation_level" in e for e in errors2)
# Invalid trigger type
@@ -602,7 +602,7 @@ class TestGraphSpecValidation:
edges=[],
)
errors3 = graph3.validate()
errors3 = graph3.validate()["errors"]
assert any("trigger_type" in e for e in errors3)
+8 -2
View File
@@ -39,7 +39,7 @@ DEFAULT_EVENT_TYPES = [
EventType.WORKER_LOADED,
EventType.CREDENTIALS_REQUIRED,
EventType.SUBAGENT_REPORT,
EventType.QUEEN_MODE_CHANGED,
EventType.QUEEN_PHASE_CHANGED,
]
# Keepalive interval in seconds
@@ -93,7 +93,7 @@ async def handle_events(request: web.Request) -> web.StreamResponse:
"node_loop_started",
"credentials_required",
"worker_loaded",
"queen_mode_changed",
"queen_phase_changed",
}
client_disconnected = asyncio.Event()
@@ -180,6 +180,12 @@ async def handle_events(request: web.Request) -> web.StreamResponse:
except (ConnectionResetError, ConnectionError, _AiohttpConnReset):
close_reason = "client_disconnected"
break
except RuntimeError as exc:
if "closing transport" in str(exc).lower():
close_reason = "client_disconnected"
else:
close_reason = f"error: {exc}"
break
except Exception as exc:
close_reason = f"error: {exc}"
break
+34 -7
View File
@@ -3,6 +3,7 @@
import asyncio
import json
import logging
from typing import Any
from aiohttp import web
@@ -64,15 +65,15 @@ async def handle_trigger(request: web.Request) -> web.Response:
session_state=session_state,
)
# Cancel queen's in-progress LLM turn so it picks up the mode change cleanly
# Cancel queen's in-progress LLM turn so it picks up the phase change cleanly
if session.queen_executor:
node = session.queen_executor.node_registry.get("queen")
if node and hasattr(node, "cancel_current_turn"):
node.cancel_current_turn()
# Switch queen to running mode (mirrors run_agent_with_input tool behavior)
if session.mode_state is not None:
await session.mode_state.switch_to_running(source="frontend")
# Switch queen to running phase (mirrors run_agent_with_input tool behavior)
if session.phase_state is not None:
await session.phase_state.switch_to_running(source="frontend")
return web.json_response({"execution_id": execution_id})
@@ -131,7 +132,19 @@ async def handle_chat(request: web.Request) -> web.Response:
}
)
return web.json_response({"error": "Queen not available"}, status=503)
# Queen is dead — try to revive her
manager: Any = request.app["manager"]
try:
await manager.revive_queen(session, initial_prompt=message)
return web.json_response(
{
"status": "queen_revived",
"delivered": True,
}
)
except Exception as e:
logger.error("Failed to revive queen: %s", e)
return web.json_response({"error": "Queen not available"}, status=503)
async def handle_queen_context(request: web.Request) -> web.Response:
@@ -160,6 +173,20 @@ async def handle_queen_context(request: web.Request) -> web.Response:
await node.inject_event(message, is_client_input=False)
return web.json_response({"status": "queued", "delivered": True})
# Queen is dead — try to revive her
manager: Any = request.app["manager"]
try:
await manager.revive_queen(session)
# After revival, deliver the message
queen_executor = session.queen_executor
if queen_executor is not None:
node = queen_executor.node_registry.get("queen")
if node is not None and hasattr(node, "inject_event"):
await node.inject_event(message, is_client_input=False)
return web.json_response({"status": "queued_revived", "delivered": True})
except Exception as e:
logger.error("Failed to revive queen for context: %s", e)
return web.json_response({"error": "Queen not available"}, status=503)
@@ -382,8 +409,8 @@ async def handle_stop(request: web.Request) -> web.Response:
node.cancel_current_turn()
# Switch to staging (agent still loaded, ready to re-run)
if session.mode_state is not None:
await session.mode_state.switch_to_staging(source="frontend")
if session.phase_state is not None:
await session.phase_state.switch_to_staging(source="frontend")
return web.json_response(
{
+2 -2
View File
@@ -48,7 +48,7 @@ def _get_manager(request: web.Request) -> SessionManager:
def _session_to_live_dict(session) -> dict:
"""Serialize a live Session to the session-primary JSON shape."""
info = session.worker_info
mode_state = getattr(session, "mode_state", None)
phase_state = getattr(session, "phase_state", None)
return {
"session_id": session.id,
"worker_id": session.worker_id,
@@ -61,7 +61,7 @@ def _session_to_live_dict(session) -> dict:
"loaded_at": session.loaded_at,
"uptime_seconds": round(time.time() - session.loaded_at, 1),
"intro_message": getattr(session.runner, "intro_message", "") or "",
"queen_mode": mode_state.mode if mode_state else "building",
"queen_phase": phase_state.phase if phase_state else "building",
}
+204 -26
View File
@@ -40,11 +40,12 @@ class Session:
runner: Any | None = None # AgentRunner
worker_runtime: Any | None = None # AgentRuntime
worker_info: Any | None = None # AgentInfo
# Queen mode state (building/staging/running)
mode_state: Any = None # QueenModeState
# Queen phase state (building/staging/running)
phase_state: Any = None # QueenPhaseState
# Judge (active when worker is loaded)
judge_task: asyncio.Task | None = None
escalation_sub: str | None = None
worker_handoff_sub: str | None = None
# Session directory resumption:
# When set, _start_queen writes queen conversations to this existing session's
# directory instead of creating a new one. This lets cold-restores accumulate
@@ -380,6 +381,12 @@ class SessionManager:
# Stop judge
self._stop_judge(session)
if session.worker_handoff_sub is not None:
try:
session.event_bus.unsubscribe(session.worker_handoff_sub)
except Exception:
pass
session.worker_handoff_sub = None
# Stop queen
if session.queen_task is not None:
@@ -401,6 +408,47 @@ class SessionManager:
# Queen startup
# ------------------------------------------------------------------
async def _handle_worker_handoff(self, session: Session, executor: Any, event: Any) -> None:
"""Route worker escalation events into the queen conversation."""
if event.stream_id in ("queen", "judge"):
return
reason = str(event.data.get("reason", "")).strip()
context = str(event.data.get("context", "")).strip()
node_label = event.node_id or "unknown_node"
stream_label = event.stream_id or "unknown_stream"
handoff = (
"[WORKER_ESCALATION_REQUEST]\n"
f"stream_id: {stream_label}\n"
f"node_id: {node_label}\n"
f"reason: {reason or 'unspecified'}\n"
)
if context:
handoff += f"context:\n{context}\n"
node = executor.node_registry.get("queen")
if node is not None and hasattr(node, "inject_event"):
await node.inject_event(handoff, is_client_input=False)
else:
logger.warning("Worker handoff received but queen node not ready")
def _subscribe_worker_handoffs(self, session: Session, executor: Any) -> None:
"""Subscribe queen to worker/subagent escalation handoff events."""
from framework.runtime.event_bus import EventType as _ET
if session.worker_handoff_sub is not None:
session.event_bus.unsubscribe(session.worker_handoff_sub)
session.worker_handoff_sub = None
async def _on_worker_handoff(event):
await self._handle_worker_handoff(session, executor, event)
session.worker_handoff_sub = session.event_bus.subscribe(
event_types=[_ET.ESCALATION_REQUESTED],
handler=_on_worker_handoff,
)
async def _start_queen(
self,
session: Session,
@@ -470,16 +518,16 @@ class SessionManager:
except Exception:
logger.warning("Queen: MCP config failed to load", exc_info=True)
# Mode state for building/running mode switching
# Phase state for building/running phase switching
from framework.tools.queen_lifecycle_tools import (
QueenModeState,
QueenPhaseState,
register_queen_lifecycle_tools,
)
# Start in staging when the caller provided an agent, building otherwise.
initial_mode = "staging" if worker_identity else "building"
mode_state = QueenModeState(mode=initial_mode, event_bus=session.event_bus)
session.mode_state = mode_state
initial_phase = "staging" if worker_identity else "building"
phase_state = QueenPhaseState(phase=initial_phase, event_bus=session.event_bus)
session.phase_state = phase_state
# Always register lifecycle tools — they check session.worker_runtime
# at call time, so they work even if no worker is loaded yet.
@@ -489,7 +537,7 @@ class SessionManager:
session_id=session.id,
session_manager=self,
manager_session_id=session.id,
mode_state=mode_state,
phase_state=phase_state,
)
# Monitoring tools need concrete worker paths — only register when present
@@ -507,11 +555,26 @@ class SessionManager:
queen_tools = list(queen_registry.get_tools().values())
queen_tool_executor = queen_registry.get_executor()
# Partition tools into mode-specific sets
# Partition tools into phase-specific sets and import prompt segments
from framework.agents.hive_coder.nodes import (
_QUEEN_BUILDING_TOOLS,
_QUEEN_RUNNING_TOOLS,
_QUEEN_STAGING_TOOLS,
_appendices,
_gcu_building_section,
_package_builder_knowledge,
_queen_behavior_always,
_queen_behavior_building,
_queen_behavior_running,
_queen_behavior_staging,
_queen_identity_building,
_queen_identity_running,
_queen_identity_staging,
_queen_phase_7,
_queen_style,
_queen_tools_building,
_queen_tools_running,
_queen_tools_staging,
)
building_names = set(_QUEEN_BUILDING_TOOLS)
@@ -529,13 +592,12 @@ class SessionManager:
)
logger.info("Queen: registered tools: %s", sorted(registered_names))
mode_state.building_tools = [t for t in queen_tools if t.name in building_names]
mode_state.staging_tools = [t for t in queen_tools if t.name in staging_names]
mode_state.running_tools = [t for t in queen_tools if t.name in running_names]
phase_state.building_tools = [t for t in queen_tools if t.name in building_names]
phase_state.staging_tools = [t for t in queen_tools if t.name in staging_names]
phase_state.running_tools = [t for t in queen_tools if t.name in running_names]
# Build queen graph with adjusted prompt + tools
_orig_node = _queen_graph.nodes[0]
base_prompt = _orig_node.system_prompt or ""
if worker_identity is None:
worker_identity = (
@@ -544,12 +606,67 @@ class SessionManager:
"Handle all tasks directly using your coding tools."
)
# Compose phase-specific prompts.
_building_body = (
_queen_style
+ _queen_tools_building
+ _queen_behavior_always
+ _queen_behavior_building
+ _package_builder_knowledge
+ _gcu_building_section
+ _queen_phase_7
+ _appendices
+ worker_identity
)
phase_state.prompt_building = _queen_identity_building + _building_body
phase_state.prompt_staging = (
_queen_identity_staging
+ _queen_style
+ _queen_tools_staging
+ _queen_behavior_always
+ _queen_behavior_staging
+ worker_identity
)
phase_state.prompt_running = (
_queen_identity_running
+ _queen_style
+ _queen_tools_running
+ _queen_behavior_always
+ _queen_behavior_running
+ worker_identity
)
# Build the session_start hook: selects the best-fit expert persona
# from the user's opening message and replaces the identity prefix.
from framework.agents.hive_coder.nodes.thinking_hook import select_expert_persona
from framework.graph.event_loop_node import HookContext, HookResult
from framework.runtime.event_bus import AgentEvent, EventType
_session_llm = session.llm
_session_event_bus = session.event_bus
async def _persona_hook(ctx: HookContext) -> HookResult | None:
persona = await select_expert_persona(ctx.trigger or "", _session_llm)
if not persona:
return None
if _session_event_bus is not None:
await _session_event_bus.publish(
AgentEvent(
type=EventType.QUEEN_PERSONA_SELECTED,
stream_id="queen",
data={"persona": persona},
)
)
return HookResult(system_prompt=persona + "\n\n" + _building_body)
initial_prompt_text = phase_state.get_current_prompt()
registered_tool_names = set(queen_registry.get_tools().keys())
declared_tools = _orig_node.tools or []
available_tools = [t for t in declared_tools if t in registered_tool_names]
node_updates: dict = {
"system_prompt": base_prompt + worker_identity,
"system_prompt": initial_prompt_text,
}
if set(available_tools) != set(declared_tools):
missing = sorted(set(declared_tools) - registered_tool_names)
@@ -558,7 +675,13 @@ class SessionManager:
node_updates["tools"] = available_tools
adjusted_node = _orig_node.model_copy(update=node_updates)
queen_graph = _queen_graph.model_copy(update={"nodes": [adjusted_node]})
_queen_loop_config = {
**(_queen_graph.loop_config or {}),
"hooks": {"session_start": [_persona_hook]},
}
queen_graph = _queen_graph.model_copy(
update={"nodes": [adjusted_node], "loop_config": _queen_loop_config}
)
queen_runtime = Runtime(hive_home / "queen")
@@ -572,39 +695,73 @@ class SessionManager:
event_bus=session.event_bus,
stream_id="queen",
storage_path=queen_dir,
loop_config=queen_graph.loop_config,
loop_config=_queen_loop_config,
execution_id=session.id,
dynamic_tools_provider=mode_state.get_current_tools,
dynamic_tools_provider=phase_state.get_current_tools,
dynamic_prompt_provider=phase_state.get_current_prompt,
)
session.queen_executor = executor
# Wire inject_notification so mode switches notify the queen LLM
async def _inject_mode_notification(content: str) -> None:
# Wire inject_notification so phase switches notify the queen LLM
async def _inject_phase_notification(content: str) -> None:
node = executor.node_registry.get("queen")
if node is not None and hasattr(node, "inject_event"):
await node.inject_event(content)
mode_state.inject_notification = _inject_mode_notification
phase_state.inject_notification = _inject_phase_notification
# Auto-switch to staging when worker execution finishes naturally
# and notify the queen about the termination
from framework.runtime.event_bus import EventType as _ET
async def _on_worker_done(event):
if event.stream_id == "queen":
return
if mode_state.mode == "running":
await mode_state.switch_to_staging(source="auto")
if phase_state.phase == "running":
# Build termination notification for the queen
if event.type == _ET.EXECUTION_COMPLETED:
output = event.data.get("output", {})
output_summary = ""
if output:
# Summarize key outputs for the queen
for key, value in output.items():
val_str = str(value)
if len(val_str) > 200:
val_str = val_str[:200] + "..."
output_summary += f"\n {key}: {val_str}"
_out = output_summary or " (no output keys set)"
notification = (
"[WORKER_TERMINAL] Worker finished successfully.\n"
f"Output:{_out}\n"
"Report this to the user. "
"Ask if they want to continue with another run."
)
else: # EXECUTION_FAILED
error = event.data.get("error", "Unknown error")
notification = (
"[WORKER_TERMINAL] Worker failed.\n"
f"Error: {error}\n"
"Report this to the user and help them troubleshoot."
)
# Inject notification to queen before phase switch
node = executor.node_registry.get("queen")
if node is not None and hasattr(node, "inject_event"):
await node.inject_event(notification)
await phase_state.switch_to_staging(source="auto")
session.event_bus.subscribe(
event_types=[_ET.EXECUTION_COMPLETED, _ET.EXECUTION_FAILED],
handler=_on_worker_done,
)
self._subscribe_worker_handoffs(session, executor)
logger.info(
"Queen starting in %s mode with %d tools: %s",
mode_state.mode,
len(mode_state.get_current_tools()),
[t.name for t in mode_state.get_current_tools()],
"Queen starting in %s phase with %d tools: %s",
phase_state.phase,
len(phase_state.get_current_tools()),
[t.name for t in phase_state.get_current_tools()],
)
result = await executor.execute(
graph=queen_graph,
@@ -787,6 +944,27 @@ class SessionManager:
"Handle all tasks directly using your coding tools."
)
async def revive_queen(self, session: Session, initial_prompt: str | None = None) -> None:
"""Revive a dead queen executor on an existing session.
Restarts the queen with the same session context (worker profile, tools, etc.).
"""
from framework.tools.queen_lifecycle_tools import build_worker_profile
# Build worker identity if worker is loaded
worker_identity = (
build_worker_profile(session.worker_runtime, agent_path=session.worker_path)
if session.worker_runtime
else None
)
# Start queen with existing session context
await self._start_queen(
session, worker_identity=worker_identity, initial_prompt=initial_prompt
)
logger.info("Queen revived for session '%s'", session.id)
# ------------------------------------------------------------------
# Lookups
# ------------------------------------------------------------------

Some files were not shown because too many files have changed in this diff Show More